http://www.mediawiki.org/wiki/Special:Code/MediaWiki/76401

Revision: 76401
Author:   diederik
Date:     2010-11-09 18:01:23 +0000 (Tue, 09 Nov 2010)
Log Message:
-----------
Fixed nasty bug where in rare cases the process builder would span unlimited 
child processes. 

Modified Paths:
--------------
    trunk/tools/editor_trends/map_wiki_editors.py
    trunk/tools/editor_trends/run.py
    trunk/tools/editor_trends/settings.py
    trunk/tools/editor_trends/utils/models.py
    trunk/tools/editor_trends/utils/process_constructor.py
    trunk/tools/editor_trends/utils/sort.py

Modified: trunk/tools/editor_trends/map_wiki_editors.py
===================================================================
--- trunk/tools/editor_trends/map_wiki_editors.py       2010-11-09 17:49:33 UTC 
(rev 76400)
+++ trunk/tools/editor_trends/map_wiki_editors.py       2010-11-09 18:01:23 UTC 
(rev 76401)
@@ -120,7 +120,7 @@
                 output.put(vars)
                 vars['date'] = utils.convert_timestamp_to_date(vars['date'])
             elif destination == 'file':
-                data =[]
+                data = []
                 for head in headers:
                     data.append(vars[head])
                 utils.write_list_to_csv(data, output)
@@ -207,10 +207,10 @@
             if pbar:
                 print file, xml_queue.qsize()
                 #utils.update_progressbar(pbar, xml_queue)
-                
+
             if debug:
                 break
-            
+
         except Empty:
             break
 
@@ -234,7 +234,7 @@
     collection = mongo['editors']
     mongo.collection.ensure_index('editor')
     editor_cache = cache.EditorCache(collection)
-    
+
     while True:
         try:
             edit = data_queue.get(block=False)
@@ -305,7 +305,7 @@
     ids = load_bot_ids()
     input = os.path.join(location, language, project)
     output = os.path.join(input, 'txt')
- 
+
     kwargs = {'bots': ids,
               'dbname': language + project,
               'language': language,
@@ -317,22 +317,15 @@
               'input': input,
               'output': output,
               }
-#    chunks = {}
     source = os.path.join(location, language, project)
     files = utils.retrieve_file_list(source, 'xml')
-#    parts = int(round(float(len(files)) / settings.NUMBER_OF_PROCESSES, 0))
-#    a = 0
-    
+
     if not os.path.exists(input):
         utils.create_directory(input)
     if not os.path.exists(output):
         utils.create_directory(output)
-        
-#    for x in xrange(settings.NUMBER_OF_PROCESSES):
-#        b = a + parts
-#        chunks[x] = files[a:b]
-#        a = (x + 1) * parts
-    chunks = utils.split_list(files ,settings.NUMBER_OF_PROCESSES)
+
+    chunks = utils.split_list(files , settings.NUMBER_OF_PROCESSES)
     pc.build_scaffolding(pc.load_queue, parse_editors, chunks, False, False, 
**kwargs)
 
 

Modified: trunk/tools/editor_trends/run.py
===================================================================
--- trunk/tools/editor_trends/run.py    2010-11-09 17:49:33 UTC (rev 76400)
+++ trunk/tools/editor_trends/run.py    2010-11-09 18:01:23 UTC (rev 76401)
@@ -31,7 +31,7 @@
 from utils import sort
 input = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki', 'txt')
 output = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki', 'sorted')
-#sort.mergesort_launcher(input, output)
-
-#sort.debug_mergesort(input,output)
-sort.merge_sorted_files_launcher(output, output)
\ No newline at end of file
+dbname = 'enwiki'
+#sort.debug_mergesort_feeder(input, output)
+sort.mergesort_launcher(input, output)
+#sort.mergesort_external_launcher(dbname, output, output)
\ No newline at end of file

Modified: trunk/tools/editor_trends/settings.py
===================================================================
--- trunk/tools/editor_trends/settings.py       2010-11-09 17:49:33 UTC (rev 
76400)
+++ trunk/tools/editor_trends/settings.py       2010-11-09 18:01:23 UTC (rev 
76401)
@@ -27,13 +27,14 @@
 import os
 import sys
 import platform
+#try:
+#    from pywin import win32file
+#    '''increase the maximum number of open files on Windows to 1024'''
+#    win32file._setmaxstdio(1024)
+#except ImportError:
+#    pass
+
 try:
-    from pywin import win32file
-    '''increase the maximum number of open files on Windows to 1024'''
-    win32file._setmaxstdio(1024)
-except ImportError:
-    pass
-try:
     import resource
 except ImportError:
     pass
@@ -47,6 +48,8 @@
     if op() != ('', '', '') and op() != ('', ('', '', ''), ''):
         OS = ops[op]
 
+ARCH = platform.machine()
+
 WORKING_DIRECTORY = os.getcwd()
 IGNORE_DIRS = ['wikistats', 'zips']
 ROOT = '/' if OS != 'Windows' else 'c:\\'
@@ -107,7 +110,12 @@
 # ==64Mb, see 
http://hadoop.apache.org/common/docs/r0.20.0/hdfs_design.html#Large+Data+Sets 
for reason
 MAX_XML_FILE_SIZE = 67108864
 
-MAX_FILES_OPEN = win32file._getmaxstdio() if OS == 'Windows' else 
resource.getrlimit('RLIMIT_NOFILE')
+if OS == 'Windows' and ARCH == 'i386':
+    MAX_FILES_OPEN = win32file._getmaxstdio() 
+elif OS != 'Windows':
+    MAX_FILES_OPEN = resource.getrlimit(resource.RLIMIT_NOFILE)
+else:
+    MAX_FILES_OPEN = 500
 
 ENCODING = 'utf-8'
 

Modified: trunk/tools/editor_trends/utils/models.py
===================================================================
--- trunk/tools/editor_trends/utils/models.py   2010-11-09 17:49:33 UTC (rev 
76400)
+++ trunk/tools/editor_trends/utils/models.py   2010-11-09 18:01:23 UTC (rev 
76401)
@@ -13,6 +13,9 @@
 '''
 
 __author__ = '''\n'''.join(['Diederik van Liere (dvanli...@gmail.com)', ])
+__author__email = 'dvanliere at gmail dot com'
+__date__ = '2010-11-09'
+__version__ = '0.1'
 
 import multiprocessing
 
@@ -30,11 +33,10 @@
     def run(self):
         proc_name = self.name
         kwargs = {}
-        IGNORE = [self.input_queue, self.result_queue, self.target]
+        IGNORE = ['input_queue', 'result_queue', 'target']
         for kw in self.__dict__:
             if kw not in IGNORE and not kw.startswith('_'):
                 kwargs[kw] = getattr(self, kw)
-
         self.target(self.input_queue, self.result_queue, **kwargs)
 
 
@@ -50,10 +52,9 @@
 
     def run(self):
         proc_name = self.name
-        kwargs= {}
-        IGNORE = [self.result_queue, self.target]
+        kwargs = {}
+        IGNORE = ['result_queue', 'target']
         for kw in self.__dict__:
             if kw not in IGNORE and not kw.startswith('_'):
                 kwargs[kw] = getattr(self, kw)
-        
         self.target(self.result_queue, **kwargs)

Modified: trunk/tools/editor_trends/utils/process_constructor.py
===================================================================
--- trunk/tools/editor_trends/utils/process_constructor.py      2010-11-09 
17:49:33 UTC (rev 76400)
+++ trunk/tools/editor_trends/utils/process_constructor.py      2010-11-09 
18:01:23 UTC (rev 76401)
@@ -80,7 +80,7 @@
                         **kwargs) for i in xrange(nr_input_processors)]
 
     for input_process in input_processes:
-        input_process.start()
+        input_process.run()
     pids = [p.pid for p in input_processes]
     kwargs['pids'] = pids
 

Modified: trunk/tools/editor_trends/utils/sort.py
===================================================================
--- trunk/tools/editor_trends/utils/sort.py     2010-11-09 17:49:33 UTC (rev 
76400)
+++ trunk/tools/editor_trends/utils/sort.py     2010-11-09 18:01:23 UTC (rev 
76401)
@@ -99,7 +99,7 @@
     fh.close()
 
 
-def store_editors(input, dbname):
+def store_editors(input, filename, dbname):
     fh = utils.create_txt_filehandle(input, filename, 'r', settings.ENCODING)
     mongo = db.init_mongo_db(dbname)
     collection = mongo['editors']
@@ -116,34 +116,29 @@
     fh.close()
 
 
-def merge_sorted_files_launcher(dbname, input, output):
+def mergesort_external_launcher(dbname, input, output):
     files = utils.retrieve_file_list(input, 'txt', mask='')
     x = 0
+    maxval = 99999
     while maxval >= settings.MAX_FILES_OPEN:
         x += 1.0
         maxval = round(len(files) / x)
     chunks = utils.split_list(files, int(x))
     '''1st iteration external mergesort'''
     for chunk in chunks:
-        #filehandles = [utils.create_txt_filehandle(input, file, 'r', 
settings.ENCODING) for file in chunks[chunk]]
-        #filename = merge_sorted_files(output, filehandles, chunk)
-        #filehandles = [fh.close() for fh in filehandles]
-        pass
+        filehandles = [utils.create_txt_filehandle(input, file, 'r', 
settings.ENCODING) for file in chunks[chunk]]
+        filename = merge_sorted_files(output, filehandles, chunk)
+        filehandles = [fh.close() for fh in filehandles]
     '''2nd iteration external mergesort, if necessary'''
     if len(chunks) > 1:
         files = utils.retrieve_file_list(output, 'txt', mask='[merged]')
         filehandles = [utils.create_txt_filehandle(output, file, 'r', 
settings.ENCODING) for file in files]
-        filename = merge_sorted_files(output, filehandles, chunk)
+        filename = merge_sorted_files(output, filehandles, 'final')
         filehandles = [fh.close() for fh in filehandles]
     store_editors(output, filename, dbname)
 
 
-def debug_mergesort(input, output):
-    files = utils.retrieve_file_list(input, 'txt', mask='((?!_sorted)\d)')
-    for file in files:
-        pass
-
-def mergesort_feeder(input_queue, **kwargs):
+def mergesort_feeder(input_queue, result_queue, **kwargs):
     input = kwargs.get('input', None)
     output = kwargs.get('output', None)
     while True:
@@ -160,7 +155,6 @@
             break
 
 
-
 def mergesort_launcher(input, output):
     kwargs = {'pbar': True,
               'nr_input_processors': settings.NUMBER_OF_PROCESSES,
@@ -168,23 +162,23 @@
               'input': input,
               'output': output,
               }
-    chunks = {}
-
     files = utils.retrieve_file_list(input, 'txt')
-    parts = int(round(float(len(files)) / settings.NUMBER_OF_PROCESSES, 0))
-    a = 0
-
-    for x in xrange(settings.NUMBER_OF_PROCESSES):
-        b = a + parts
-        chunks[x] = files[a:b]
-        a = (x + 1) * parts
-
+    chunks = utils.split_list(files, settings.NUMBER_OF_PROCESSES)
     pc.build_scaffolding(pc.load_queue, mergesort_feeder, chunks, False, 
False, **kwargs)
-    merge_sorted_files(input, output)
 
+def debug_mergesort_feeder(input, output):
+    kwargs = {
+              'input': input,
+              'output': output,
+              }
+    files = utils.retrieve_file_list(input, 'txt')
+    chunks = utils.split_list(files, settings.NUMBER_OF_PROCESSES)
+    q = pc.load_queue(chunks[0])
+    mergesort_feeder(q, False, **kwargs)
+
 if __name__ == '__main__':
     input = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki', 'txt')
     output = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki', 'sorted')
+    dbname = 'enwiki'
     mergesort_launcher(input, output)
-    #debug_mergesort(input, output)
-    #debug_merge_sorted_files(input, output)
+    mergesort_external_launcher(dbname, output, output)


_______________________________________________
MediaWiki-CVS mailing list
MediaWiki-CVS@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-cvs

Reply via email to