http://www.mediawiki.org/wiki/Special:Code/MediaWiki/76401
Revision: 76401 Author: diederik Date: 2010-11-09 18:01:23 +0000 (Tue, 09 Nov 2010) Log Message: ----------- Fixed nasty bug where in rare cases the process builder would span unlimited child processes. Modified Paths: -------------- trunk/tools/editor_trends/map_wiki_editors.py trunk/tools/editor_trends/run.py trunk/tools/editor_trends/settings.py trunk/tools/editor_trends/utils/models.py trunk/tools/editor_trends/utils/process_constructor.py trunk/tools/editor_trends/utils/sort.py Modified: trunk/tools/editor_trends/map_wiki_editors.py =================================================================== --- trunk/tools/editor_trends/map_wiki_editors.py 2010-11-09 17:49:33 UTC (rev 76400) +++ trunk/tools/editor_trends/map_wiki_editors.py 2010-11-09 18:01:23 UTC (rev 76401) @@ -120,7 +120,7 @@ output.put(vars) vars['date'] = utils.convert_timestamp_to_date(vars['date']) elif destination == 'file': - data =[] + data = [] for head in headers: data.append(vars[head]) utils.write_list_to_csv(data, output) @@ -207,10 +207,10 @@ if pbar: print file, xml_queue.qsize() #utils.update_progressbar(pbar, xml_queue) - + if debug: break - + except Empty: break @@ -234,7 +234,7 @@ collection = mongo['editors'] mongo.collection.ensure_index('editor') editor_cache = cache.EditorCache(collection) - + while True: try: edit = data_queue.get(block=False) @@ -305,7 +305,7 @@ ids = load_bot_ids() input = os.path.join(location, language, project) output = os.path.join(input, 'txt') - + kwargs = {'bots': ids, 'dbname': language + project, 'language': language, @@ -317,22 +317,15 @@ 'input': input, 'output': output, } -# chunks = {} source = os.path.join(location, language, project) files = utils.retrieve_file_list(source, 'xml') -# parts = int(round(float(len(files)) / settings.NUMBER_OF_PROCESSES, 0)) -# a = 0 - + if not os.path.exists(input): utils.create_directory(input) if not os.path.exists(output): utils.create_directory(output) - -# for x in xrange(settings.NUMBER_OF_PROCESSES): -# b = a + parts -# chunks[x] = files[a:b] -# a = (x + 1) * parts - chunks = utils.split_list(files ,settings.NUMBER_OF_PROCESSES) + + chunks = utils.split_list(files , settings.NUMBER_OF_PROCESSES) pc.build_scaffolding(pc.load_queue, parse_editors, chunks, False, False, **kwargs) Modified: trunk/tools/editor_trends/run.py =================================================================== --- trunk/tools/editor_trends/run.py 2010-11-09 17:49:33 UTC (rev 76400) +++ trunk/tools/editor_trends/run.py 2010-11-09 18:01:23 UTC (rev 76401) @@ -31,7 +31,7 @@ from utils import sort input = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki', 'txt') output = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki', 'sorted') -#sort.mergesort_launcher(input, output) - -#sort.debug_mergesort(input,output) -sort.merge_sorted_files_launcher(output, output) \ No newline at end of file +dbname = 'enwiki' +#sort.debug_mergesort_feeder(input, output) +sort.mergesort_launcher(input, output) +#sort.mergesort_external_launcher(dbname, output, output) \ No newline at end of file Modified: trunk/tools/editor_trends/settings.py =================================================================== --- trunk/tools/editor_trends/settings.py 2010-11-09 17:49:33 UTC (rev 76400) +++ trunk/tools/editor_trends/settings.py 2010-11-09 18:01:23 UTC (rev 76401) @@ -27,13 +27,14 @@ import os import sys import platform +#try: +# from pywin import win32file +# '''increase the maximum number of open files on Windows to 1024''' +# win32file._setmaxstdio(1024) +#except ImportError: +# pass + try: - from pywin import win32file - '''increase the maximum number of open files on Windows to 1024''' - win32file._setmaxstdio(1024) -except ImportError: - pass -try: import resource except ImportError: pass @@ -47,6 +48,8 @@ if op() != ('', '', '') and op() != ('', ('', '', ''), ''): OS = ops[op] +ARCH = platform.machine() + WORKING_DIRECTORY = os.getcwd() IGNORE_DIRS = ['wikistats', 'zips'] ROOT = '/' if OS != 'Windows' else 'c:\\' @@ -107,7 +110,12 @@ # ==64Mb, see http://hadoop.apache.org/common/docs/r0.20.0/hdfs_design.html#Large+Data+Sets for reason MAX_XML_FILE_SIZE = 67108864 -MAX_FILES_OPEN = win32file._getmaxstdio() if OS == 'Windows' else resource.getrlimit('RLIMIT_NOFILE') +if OS == 'Windows' and ARCH == 'i386': + MAX_FILES_OPEN = win32file._getmaxstdio() +elif OS != 'Windows': + MAX_FILES_OPEN = resource.getrlimit(resource.RLIMIT_NOFILE) +else: + MAX_FILES_OPEN = 500 ENCODING = 'utf-8' Modified: trunk/tools/editor_trends/utils/models.py =================================================================== --- trunk/tools/editor_trends/utils/models.py 2010-11-09 17:49:33 UTC (rev 76400) +++ trunk/tools/editor_trends/utils/models.py 2010-11-09 18:01:23 UTC (rev 76401) @@ -13,6 +13,9 @@ ''' __author__ = '''\n'''.join(['Diederik van Liere (dvanli...@gmail.com)', ]) +__author__email = 'dvanliere at gmail dot com' +__date__ = '2010-11-09' +__version__ = '0.1' import multiprocessing @@ -30,11 +33,10 @@ def run(self): proc_name = self.name kwargs = {} - IGNORE = [self.input_queue, self.result_queue, self.target] + IGNORE = ['input_queue', 'result_queue', 'target'] for kw in self.__dict__: if kw not in IGNORE and not kw.startswith('_'): kwargs[kw] = getattr(self, kw) - self.target(self.input_queue, self.result_queue, **kwargs) @@ -50,10 +52,9 @@ def run(self): proc_name = self.name - kwargs= {} - IGNORE = [self.result_queue, self.target] + kwargs = {} + IGNORE = ['result_queue', 'target'] for kw in self.__dict__: if kw not in IGNORE and not kw.startswith('_'): kwargs[kw] = getattr(self, kw) - self.target(self.result_queue, **kwargs) Modified: trunk/tools/editor_trends/utils/process_constructor.py =================================================================== --- trunk/tools/editor_trends/utils/process_constructor.py 2010-11-09 17:49:33 UTC (rev 76400) +++ trunk/tools/editor_trends/utils/process_constructor.py 2010-11-09 18:01:23 UTC (rev 76401) @@ -80,7 +80,7 @@ **kwargs) for i in xrange(nr_input_processors)] for input_process in input_processes: - input_process.start() + input_process.run() pids = [p.pid for p in input_processes] kwargs['pids'] = pids Modified: trunk/tools/editor_trends/utils/sort.py =================================================================== --- trunk/tools/editor_trends/utils/sort.py 2010-11-09 17:49:33 UTC (rev 76400) +++ trunk/tools/editor_trends/utils/sort.py 2010-11-09 18:01:23 UTC (rev 76401) @@ -99,7 +99,7 @@ fh.close() -def store_editors(input, dbname): +def store_editors(input, filename, dbname): fh = utils.create_txt_filehandle(input, filename, 'r', settings.ENCODING) mongo = db.init_mongo_db(dbname) collection = mongo['editors'] @@ -116,34 +116,29 @@ fh.close() -def merge_sorted_files_launcher(dbname, input, output): +def mergesort_external_launcher(dbname, input, output): files = utils.retrieve_file_list(input, 'txt', mask='') x = 0 + maxval = 99999 while maxval >= settings.MAX_FILES_OPEN: x += 1.0 maxval = round(len(files) / x) chunks = utils.split_list(files, int(x)) '''1st iteration external mergesort''' for chunk in chunks: - #filehandles = [utils.create_txt_filehandle(input, file, 'r', settings.ENCODING) for file in chunks[chunk]] - #filename = merge_sorted_files(output, filehandles, chunk) - #filehandles = [fh.close() for fh in filehandles] - pass + filehandles = [utils.create_txt_filehandle(input, file, 'r', settings.ENCODING) for file in chunks[chunk]] + filename = merge_sorted_files(output, filehandles, chunk) + filehandles = [fh.close() for fh in filehandles] '''2nd iteration external mergesort, if necessary''' if len(chunks) > 1: files = utils.retrieve_file_list(output, 'txt', mask='[merged]') filehandles = [utils.create_txt_filehandle(output, file, 'r', settings.ENCODING) for file in files] - filename = merge_sorted_files(output, filehandles, chunk) + filename = merge_sorted_files(output, filehandles, 'final') filehandles = [fh.close() for fh in filehandles] store_editors(output, filename, dbname) -def debug_mergesort(input, output): - files = utils.retrieve_file_list(input, 'txt', mask='((?!_sorted)\d)') - for file in files: - pass - -def mergesort_feeder(input_queue, **kwargs): +def mergesort_feeder(input_queue, result_queue, **kwargs): input = kwargs.get('input', None) output = kwargs.get('output', None) while True: @@ -160,7 +155,6 @@ break - def mergesort_launcher(input, output): kwargs = {'pbar': True, 'nr_input_processors': settings.NUMBER_OF_PROCESSES, @@ -168,23 +162,23 @@ 'input': input, 'output': output, } - chunks = {} - files = utils.retrieve_file_list(input, 'txt') - parts = int(round(float(len(files)) / settings.NUMBER_OF_PROCESSES, 0)) - a = 0 - - for x in xrange(settings.NUMBER_OF_PROCESSES): - b = a + parts - chunks[x] = files[a:b] - a = (x + 1) * parts - + chunks = utils.split_list(files, settings.NUMBER_OF_PROCESSES) pc.build_scaffolding(pc.load_queue, mergesort_feeder, chunks, False, False, **kwargs) - merge_sorted_files(input, output) +def debug_mergesort_feeder(input, output): + kwargs = { + 'input': input, + 'output': output, + } + files = utils.retrieve_file_list(input, 'txt') + chunks = utils.split_list(files, settings.NUMBER_OF_PROCESSES) + q = pc.load_queue(chunks[0]) + mergesort_feeder(q, False, **kwargs) + if __name__ == '__main__': input = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki', 'txt') output = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki', 'sorted') + dbname = 'enwiki' mergesort_launcher(input, output) - #debug_mergesort(input, output) - #debug_merge_sorted_files(input, output) + mergesort_external_launcher(dbname, output, output) _______________________________________________ MediaWiki-CVS mailing list MediaWiki-CVS@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-cvs