http://www.mediawiki.org/wiki/Special:Code/MediaWiki/90365

Revision: 90365
Author:   halfak
Date:     2011-06-18 21:07:42 +0000 (Sat, 18 Jun 2011)
Log Message:
-----------
Working dump processor

Modified Paths:
--------------
    trunk/tools/wsor/scripts/page_process.py
    trunk/tools/wsor/scripts/process_dumps.py

Modified: trunk/tools/wsor/scripts/page_process.py
===================================================================
--- trunk/tools/wsor/scripts/page_process.py    2011-06-18 20:37:02 UTC (rev 
90364)
+++ trunk/tools/wsor/scripts/page_process.py    2011-06-18 21:07:42 UTC (rev 
90365)
@@ -1,2 +1,2 @@
-def process(page, out):
-       out.put([page.getId(), page.getTitle()])
+def process(dump, page):
+       yield (page.getId(), page.getTitle())

Modified: trunk/tools/wsor/scripts/process_dumps.py
===================================================================
--- trunk/tools/wsor/scripts/process_dumps.py   2011-06-18 20:37:02 UTC (rev 
90364)
+++ trunk/tools/wsor/scripts/process_dumps.py   2011-06-18 21:07:42 UTC (rev 
90365)
@@ -1,5 +1,5 @@
-import sys, logging, re, types, argparse, os
-from multiprocessing import Process, Queue, Lock, cpu_count
+import sys, logging, re, types, argparse, os, subprocess
+from multiprocessing import Process, Queue, Lock, cpu_count, Value
 from Queue import Empty
 from gl import wp
 
@@ -30,23 +30,26 @@
 
 class Processor(Process):
        
-       def __init__(self, fileQueue, process, output, logger):
-               self.fileQueue = fileQueue
-               self.process   = process
-               self.output    = output
-               self.logger    = logger
+       def __init__(self, input, processPage, output, callback, logger):
+               self.input       = input
+               self.processPage = processPage
+               self.output      = output
+               self.callback    = callback
+               self.logger      = logger
                Process.__init__(self)
        
-       def start(self):
+       def run(self):
                try:
                        while True:
-                               fn = self.fileQueue.get(block=False)
+                               foo = self.input.qsize()
+                               fn = self.input.get(block=False)
                                self.logger.info("Processing dump file %s." % 
fn)
-                               dump = wp.dump.Iterator(fn)
-                               for page in dump:
-                                       self.logger.debug("Processing dump file 
%s." % fn)
+                               dump = wp.dump.Iterator(openDumpFile(fn))
+                               for page in dump.readPages():
+                                       self.logger.debug("Processing page 
%s:%s." % (page.getId(), page.getTitle()))
                                        try:
-                                               self.process(page, output)
+                                               for out in 
self.processPage(dump, page):
+                                                       self.output.put(out)
                                        except Exception as e:
                                                self.logger.error(
                                                        "Failed to process page 
%s:%s - %s" % (
@@ -55,11 +58,16 @@
                                                                e
                                                        )
                                                )
+                                       
+                               
                        
+                       
                except Empty:
                        self.logger.info("Nothing left to do.  Shutting down 
thread.")
-               except Exception as e:
-                       raise e
+               finally:
+                       self.callback()
+               
+       
 
 
 def main(args):
@@ -69,37 +77,49 @@
        logging.basicConfig(
                level=level,
                stream=LOGGING_STREAM,
-               format='%(asctime)s %(levelname)-8s %(message)s',
+               format='%(name)s: %(asctime)s %(levelname)-8s %(message)s',
                datefmt='%b-%d %H:%M:%S'
        )
-       logging.info("Starting dump processor with %s threads." % args.threads)
+       logging.info("Starting dump processor with %s threads." % 
min(args.threads, len(args.dump)))
+       for row in process_dumps(args.dump, args.processor.process, 
args.threads):
+               print('\t'.join(encode(v) for v in row))
+
+def process_dumps(dumps, processPage, threads):
+       input       = dumpFiles(dumps)
+       output      = Queue(maxsize=10000)
+       running     = Value('i', 0)
        
-       dumpQueue = dumpFiles(args.dump)
-       output = SafeOutput(args.out)
-       processors = []
-       for i in range(0, min(args.threads, len(args.dump))):
-               p = Processor(
-                       dumpQueue, 
-                       args.processor.process, 
+       def dec(): running.value -= 1
+       
+       for i in range(0, min(threads, input.qsize())):
+               running.value += 1
+               Processor(
+                       input, 
+                       processPage,
                        output, 
+                       dec,
                        logging.getLogger("Process %s" % i)
-               )
-               processors.append(p)
-               
+               ).start()
+       
+       
+       #output while processes are running
+       while running.value > 0:
+               try:          yield output.get(timeout=.25)
+               except Empty: pass
+       
+       #finish yielding output buffer
        try:
-               for i in processors:
-                       processor.join()
-               
-       except KeyboardInterrupt:
-               logging
-               
+               while True: yield output.get(block=False) 
+       except Empty: 
+               pass
        
 
 
 EXTENSIONS = {
        'xml': "cat",
        'bz2': "bzcat",
-       '7z':  "7z e -so"
+       '7z':  "7z e -so 2>/dev/null",
+       'lzma':"lzcat"
 }
 
 EXT_RE = re.compile(r'\.([^\.]+)$')
@@ -120,6 +140,16 @@
        q = Queue()
        for path in paths: q.put(dumpFile(path))
        return q
+
+def openDumpFile(path):
+       match = EXT_RE.search(path)
+       ext = match.groups()[0]
+       p = subprocess.Popen(
+               "%s %s" % (EXTENSIONS[ext], path), 
+               shell=True, 
+               stdout=subprocess.PIPE
+       )
+       return p.stdout
        
 
 if __name__ == "__main__":
@@ -153,4 +183,4 @@
        )
        args = parser.parse_args()
        main(args)
-       
+


_______________________________________________
MediaWiki-CVS mailing list
MediaWiki-CVS@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-cvs

Reply via email to