Newer
Older
mailpiler / util / rtindex.py
@Janos SUTO Janos SUTO on 27 Aug 2022 3 KB rtindex.py fixes
#!/usr/bin/python3

import configparser
import MySQLdb as dbapi
import argparse
import getpass
import os
import sys
import syslog
import time

SQL_SELECT_QUERY = "SELECT id, `from`, `to`, fromdomain, todomain, subject, " + \
                   "arrived, sent, body, size, direction, folder, attachments, " + \
                   "attachment_types FROM sph_index"
SQL_INSERT_QUERY = "INSERT INTO piler1 (id, sender, rcpt, senderdomain, " + \
                   "rcptdomain, subject, arrived, sent, body, size, direction, " + \
                   "folder, attachments, attachment_types) VALUES (%s, %s, %s, " + \
                   "%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
SQL_DELETE_QUERY = "DELETE FROM sph_index WHERE id IN (%s)"
SLEEP_DELAY = 5

opts = {}


def read_options(filename="", opts={}):
    s = "[piler]\n" + open(filename, 'r').read()
    config = configparser.ConfigParser()
    config.read_string(s)

    if config.has_option('piler', 'mysqlhost'):
        opts['dbhost'] = config.get('piler', 'mysqlhost')
    else:
        opts['dbhost'] = 'localhost'

    opts['username'] = config.get('piler', 'mysqluser')
    opts['password'] = config.get('piler', 'mysqlpwd')
    opts['database'] = config.get('piler', 'mysqldb')


def process_batch(opts):
    try:
        opts['db'] = dbapi.connect(opts['dbhost'], opts['username'],
                                   opts['password'], opts['database'])

        cursor = opts['db'].cursor()

        while True:
            cursor.execute(SQL_SELECT_QUERY)
            rows = cursor.fetchmany(opts['batch_size'])
            if rows == ():
                time.sleep(SLEEP_DELAY)
                break

            ids = [x[0] for x in rows]

            opts['sphx'] = dbapi.connect(host=opts['sphinx_host'],
                                         port=opts['sphinx_port'])
            sphx_cursor = opts['sphx'].cursor()

            sphx_cursor.executemany(SQL_INSERT_QUERY, rows)
            opts['sphx'].commit()
            opts['sphx'].close()

            syslog.syslog("%d records inserted" % (sphx_cursor.rowcount))

            format = ", ".join(['%s'] * len(ids))
            cursor.execute(SQL_DELETE_QUERY % (format), ids)
            opts['db'].commit()

    except dbapi.DatabaseError as e:
        syslog.syslog("Error %s" % e)
        time.sleep(SLEEP_DELAY)

    if opts['db']:
        opts['db'].close()


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("-c", "--config", type=str, help="piler.conf path",
                        default="/etc/piler/piler.conf")
    parser.add_argument("-b", "--batch-size", type=int, help="batch size " +
                        "to process", default=1000)
    parser.add_argument("-s", "--sphinx", type=str, help="sphinx server",
                        default="127.0.0.1")
    parser.add_argument("-p", "--port", type=int, help="sphinx sql port",
                        default=9306)
    parser.add_argument("-d", "--dry-run", help="dry run", action='store_true')
    parser.add_argument("-v", "--verbose", help="verbose mode",
                        action='store_true')

    args = parser.parse_args()

    if getpass.getuser() not in ['root', 'piler']:
        print("Please run me as user 'piler'")
        sys.exit(1)

    opts['dry_run'] = args.dry_run
    opts['verbose'] = args.verbose
    opts['sphinx_host'] = args.sphinx
    opts['sphinx_port'] = args.port
    opts['batch_size'] = args.batch_size
    opts['db'] = None
    opts['sphx'] = None

    syslog.openlog(logoption=syslog.LOG_PID, facility=syslog.LOG_MAIL)

    read_options(args.config, opts)

    while True:
        process_batch(opts)


if __name__ == "__main__":
    main()