http://www.mediawiki.org/wiki/Special:Code/MediaWiki/90365
Revision: 90365 Author: halfak Date: 2011-06-18 21:07:42 +0000 (Sat, 18 Jun 2011) Log Message: ----------- Working dump processor Modified Paths: -------------- trunk/tools/wsor/scripts/page_process.py trunk/tools/wsor/scripts/process_dumps.py Modified: trunk/tools/wsor/scripts/page_process.py =================================================================== --- trunk/tools/wsor/scripts/page_process.py 2011-06-18 20:37:02 UTC (rev 90364) +++ trunk/tools/wsor/scripts/page_process.py 2011-06-18 21:07:42 UTC (rev 90365) @@ -1,2 +1,2 @@ -def process(page, out): - out.put([page.getId(), page.getTitle()]) +def process(dump, page): + yield (page.getId(), page.getTitle()) Modified: trunk/tools/wsor/scripts/process_dumps.py =================================================================== --- trunk/tools/wsor/scripts/process_dumps.py 2011-06-18 20:37:02 UTC (rev 90364) +++ trunk/tools/wsor/scripts/process_dumps.py 2011-06-18 21:07:42 UTC (rev 90365) @@ -1,5 +1,5 @@ -import sys, logging, re, types, argparse, os -from multiprocessing import Process, Queue, Lock, cpu_count +import sys, logging, re, types, argparse, os, subprocess +from multiprocessing import Process, Queue, Lock, cpu_count, Value from Queue import Empty from gl import wp @@ -30,23 +30,26 @@ class Processor(Process): - def __init__(self, fileQueue, process, output, logger): - self.fileQueue = fileQueue - self.process = process - self.output = output - self.logger = logger + def __init__(self, input, processPage, output, callback, logger): + self.input = input + self.processPage = processPage + self.output = output + self.callback = callback + self.logger = logger Process.__init__(self) - def start(self): + def run(self): try: while True: - fn = self.fileQueue.get(block=False) + foo = self.input.qsize() + fn = self.input.get(block=False) self.logger.info("Processing dump file %s." % fn) - dump = wp.dump.Iterator(fn) - for page in dump: - self.logger.debug("Processing dump file %s." % fn) + dump = wp.dump.Iterator(openDumpFile(fn)) + for page in dump.readPages(): + self.logger.debug("Processing page %s:%s." % (page.getId(), page.getTitle())) try: - self.process(page, output) + for out in self.processPage(dump, page): + self.output.put(out) except Exception as e: self.logger.error( "Failed to process page %s:%s - %s" % ( @@ -55,11 +58,16 @@ e ) ) + + + except Empty: self.logger.info("Nothing left to do. Shutting down thread.") - except Exception as e: - raise e + finally: + self.callback() + + def main(args): @@ -69,37 +77,49 @@ logging.basicConfig( level=level, stream=LOGGING_STREAM, - format='%(asctime)s %(levelname)-8s %(message)s', + format='%(name)s: %(asctime)s %(levelname)-8s %(message)s', datefmt='%b-%d %H:%M:%S' ) - logging.info("Starting dump processor with %s threads." % args.threads) + logging.info("Starting dump processor with %s threads." % min(args.threads, len(args.dump))) + for row in process_dumps(args.dump, args.processor.process, args.threads): + print('\t'.join(encode(v) for v in row)) + +def process_dumps(dumps, processPage, threads): + input = dumpFiles(dumps) + output = Queue(maxsize=10000) + running = Value('i', 0) - dumpQueue = dumpFiles(args.dump) - output = SafeOutput(args.out) - processors = [] - for i in range(0, min(args.threads, len(args.dump))): - p = Processor( - dumpQueue, - args.processor.process, + def dec(): running.value -= 1 + + for i in range(0, min(threads, input.qsize())): + running.value += 1 + Processor( + input, + processPage, output, + dec, logging.getLogger("Process %s" % i) - ) - processors.append(p) - + ).start() + + + #output while processes are running + while running.value > 0: + try: yield output.get(timeout=.25) + except Empty: pass + + #finish yielding output buffer try: - for i in processors: - processor.join() - - except KeyboardInterrupt: - logging - + while True: yield output.get(block=False) + except Empty: + pass EXTENSIONS = { 'xml': "cat", 'bz2': "bzcat", - '7z': "7z e -so" + '7z': "7z e -so 2>/dev/null", + 'lzma':"lzcat" } EXT_RE = re.compile(r'\.([^\.]+)$') @@ -120,6 +140,16 @@ q = Queue() for path in paths: q.put(dumpFile(path)) return q + +def openDumpFile(path): + match = EXT_RE.search(path) + ext = match.groups()[0] + p = subprocess.Popen( + "%s %s" % (EXTENSIONS[ext], path), + shell=True, + stdout=subprocess.PIPE + ) + return p.stdout if __name__ == "__main__": @@ -153,4 +183,4 @@ ) args = parser.parse_args() main(args) - + _______________________________________________ MediaWiki-CVS mailing list MediaWiki-CVS@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-cvs