Allow parsing to be done in parallel with -jN. Handy for testing the code against parallelism problems, also faster!
Issue: #241 Signed-off-by: Daniel Axtens <d...@axtens.net> --- v2: i was having problems with one thread 'running ahead' and merging unrelated spins of poorly threaded series. This gives a 100 message limit to how far any one thread can 'run ahead' of any other. 100 is arbitrary, it's a speed/accuracy trade off. v3: make the locking a bit clearer/more correct/avoid RMW issues, derp. I still think it's probably not worth merging, but I want to have the code out there so people can use it if they want, esp for development. I'm still trying to hammer out a series parsing change that doesn't involve arbitrary retries. I think I'm getting somewhere. --- patchwork/management/commands/parsearchive.py | 147 +++++++++++++----- 1 file changed, 110 insertions(+), 37 deletions(-) diff --git a/patchwork/management/commands/parsearchive.py b/patchwork/management/commands/parsearchive.py index b7f1ea7313c2..45e942034812 100644 --- a/patchwork/management/commands/parsearchive.py +++ b/patchwork/management/commands/parsearchive.py @@ -7,7 +7,9 @@ import logging import mailbox import os import sys +import multiprocessing +from django import db from django.core.management.base import BaseCommand from patchwork import models @@ -16,6 +18,17 @@ from patchwork.parser import DuplicateMailError logger = logging.getLogger(__name__) +TYPE_CONVERSION = { + models.Patch: 0, + models.CoverLetter: 1, + models.Comment: 2, +} +DUPLICATE = 3 +DROPPED = 4 +ERROR = 5 +NUM_TYPES = 6 + +RUN_AHEAD_LIMIT = 100 class Command(BaseCommand): help = 'Parse an mbox archive file and store any patches/comments found.' @@ -28,17 +41,12 @@ class Command(BaseCommand): '--list-id', help='mailing list ID. If not supplied, this will be ' 'extracted from the mail headers.') + parser.add_argument( + '--jobs', '-j', + help='process the archive in N parallel jobs', + type=int, default=1) def handle(self, *args, **options): - results = { - models.Patch: 0, - models.CoverLetter: 0, - models.Comment: 0, - } - duplicates = 0 - dropped = 0 - errors = 0 - verbosity = int(options['verbosity']) if not verbosity: level = logging.CRITICAL @@ -53,6 +61,11 @@ class Command(BaseCommand): logger.setLevel(level) logging.getLogger('patchwork.parser').setLevel(level) + jobs = options['jobs'] + if jobs < 1: + logger.error('Invalid number of jobs %d, must be at least 1') + sys.exit(1) + # TODO(stephenfin): Support passing via stdin path = args and args[0] or options['infile'] if not os.path.exists(path): @@ -65,8 +78,6 @@ class Command(BaseCommand): else: mbox = mailbox.Maildir(path, create=False) - count = len(mbox) - # Iterate through the mbox. This will pick up exceptions that are only # thrown when a broken email is found part way through. Without this # block, we'd get the exception thrown in enumerate(mbox) below, which @@ -84,26 +95,39 @@ class Command(BaseCommand): logger.error('Broken mbox/Maildir, aborting') return - logger.info('Parsing %d mails', count) - for i, msg in enumerate(mbox): - try: - obj = parse_mail(msg, options['list_id']) - if obj: - results[type(obj)] += 1 - else: - dropped += 1 - except DuplicateMailError as exc: - duplicates += 1 - logger.warning('Duplicate mail for message ID %s', exc.msgid) - except (ValueError, Exception) as exc: - errors += 1 - logger.warning('Invalid mail: %s', repr(exc)) + # we need to close the db connection so each process gets its own + # see e.g. https://stackoverflow.com/a/10684672 + db.connections.close_all() + + threads = [] + processed = multiprocessing.Value('i') + results = multiprocessing.Array('i', NUM_TYPES) + run_ahead_barrier = multiprocessing.Condition() + latest_msg = multiprocessing.Value('i') + for job in range(jobs): + thread = multiprocessing.Process(target=self.parse_mbox, + kwargs={ + 'path': path, + 'list_id': options['list_id'], + 'job': job, + 'num_jobs': jobs, + 'processed': processed, + 'results': results, + 'run_ahead_barrier': run_ahead_barrier, + 'latest_msg': latest_msg, + }) + print("starting", thread) + thread.daemon = True # this makes Ctrl-C work + thread.start() + threads += [thread] - if verbosity < 3 and (i % 10) == 0: - self.stdout.write('%06d/%06d\r' % (i, count), ending='') - self.stdout.flush() - - mbox.close() + count = len(mbox) + for thread in threads: + while thread.is_alive(): + thread.join(1) + if True or verbosity < 3: + self.stdout.write('%06d/%06d\r' % (processed.value, count), ending='') + self.stdout.flush() if not verbosity: return @@ -118,11 +142,60 @@ class Command(BaseCommand): ' %(errors)4d errors\n' 'Total: %(new)s new entries' % { 'total': count, - 'covers': results[models.CoverLetter], - 'patches': results[models.Patch], - 'comments': results[models.Comment], - 'duplicates': duplicates, - 'dropped': dropped, - 'errors': errors, - 'new': count - duplicates - dropped - errors, + 'covers': results[TYPE_CONVERSION[models.CoverLetter]], + 'patches': results[TYPE_CONVERSION[models.Patch]], + 'comments': results[TYPE_CONVERSION[models.Comment]], + 'duplicates': results[DUPLICATE], + 'dropped': results[DROPPED], + 'errors': results[ERROR], + 'new': count - results[DUPLICATE] - results[DROPPED] - results[ERROR], }) + + def parse_mbox(self, path, list_id, job, num_jobs, processed, results, + run_ahead_barrier, latest_msg): + if os.path.isfile(path): + mbox = mailbox.mbox(path, create=False) + else: + mbox = mailbox.Maildir(path, create=False) + + count = len(mbox) + + if num_jobs == 1: + logger.info('Parsing %d mails', count) + else: + logger.info('Parsing %d total mails, job %d of %d', + count, job + 1, num_jobs) + for i, msg in enumerate(mbox): + + if i % num_jobs != job: + continue + + with run_ahead_barrier: + run_ahead_barrier.wait_for(lambda: i - latest_msg.value <= RUN_AHEAD_LIMIT) + + try: + obj = parse_mail(msg, list_id) + with results.get_lock(): + if obj: + results[TYPE_CONVERSION[type(obj)]] += 1 + else: + results[DROPPED] += 1 + except DuplicateMailError as exc: + with results.get_lock(): + results[DUPLICATE] += 1 + logger.warning('Duplicate mail %d for message ID %s', i, exc.msgid) + except (ValueError, Exception) as exc: + with results.get_lock(): + results[ERROR] += 1 + logger.warning('Invalid mail %d: %s', i, repr(exc)) + + with processed.get_lock(): + processed.value += 1 + + with run_ahead_barrier: + with latest_msg.get_lock(): + if i > latest_msg.value: + latest_msg.value = i + run_ahead_barrier.notify_all() + + mbox.close() -- 2.20.1 _______________________________________________ Patchwork mailing list Patchwork@lists.ozlabs.org https://lists.ozlabs.org/listinfo/patchwork