http://www.mediawiki.org/wiki/Special:Code/MediaWiki/76417
Revision: 76417 Author: diederik Date: 2010-11-09 22:43:51 +0000 (Tue, 09 Nov 2010) Log Message: ----------- Changed MongoDB schema to handle cases where editors have more than 4Mb of edit observations. Modified Paths: -------------- trunk/tools/editor_trends/construct_datasets.py trunk/tools/editor_trends/database/cache.py trunk/tools/editor_trends/optimize_editors.py trunk/tools/editor_trends/run.py trunk/tools/editor_trends/settings.py trunk/tools/editor_trends/utils/sort.py trunk/tools/editor_trends/utils/utils.py Modified: trunk/tools/editor_trends/construct_datasets.py =================================================================== --- trunk/tools/editor_trends/construct_datasets.py 2010-11-09 22:33:54 UTC (rev 76416) +++ trunk/tools/editor_trends/construct_datasets.py 2010-11-09 22:43:51 UTC (rev 76417) @@ -82,7 +82,7 @@ return headers -def generate_editor_dataset(input_queue, data_queue, pbar, kwargs): +def generate_editor_dataset(input_queue, data_queue, pbar, **kwargs): debug = kwargs.pop('debug') dbname = kwargs.pop('dbname') mongo = db.init_mongo_db(dbname) @@ -143,16 +143,17 @@ 'dbname': dbname, } ids = retrieve_editor_ids_mongo(dbname, 'editors') - chunks = {} - parts = int(round(float(len(ids)) / 1, 0)) - a = 0 - for x in xrange(settings.NUMBER_OF_PROCESSES): - b = a + parts - chunks[x] = ids[a:b] - a = (x + 1) * parts - if a >= len(ids): - break - + chunks = utils.split_list(ids, settings.NUMBER_OF_PROCESSES) +# chunks = {} +# parts = int(round(float(len(ids)) / 1, 0)) +# a = 0 +# for x in xrange(settings.NUMBER_OF_PROCESSES): +# b = a + parts +# chunks[x] = ids[a:b] +# a = (x + 1) * parts +# if a >= len(ids): +# break +# pc.build_scaffolding(pc.load_queue, generate_editor_dataset, chunks, False, False, **kwargs) @@ -169,5 +170,5 @@ if __name__ == '__main__': #generate_editor_dataset_debug('test') - generate_editor_dataset_launcher('test') + generate_editor_dataset_launcher('enwiki') #debug_retrieve_edits_by_contributor_launcher() Modified: trunk/tools/editor_trends/database/cache.py =================================================================== --- trunk/tools/editor_trends/database/cache.py 2010-11-09 22:33:54 UTC (rev 76416) +++ trunk/tools/editor_trends/database/cache.py 2010-11-09 22:43:51 UTC (rev 76417) @@ -64,12 +64,14 @@ return sum([self.editors[k].get('obs', 0) for k in self.editors]) def add(self, key, value): - if key == 'NEXT': + if value == 'NEXT': for editor in self.treshold_editors: - self.update(editor, self.editors[editor]['edits']) + self.insert (editor, self.editors[editor]['edits']) self.n -= self.editors[editor]['obs'] self.number_editors -= 1 del self.editors[editor] + if key in self.editors: + del self.editors[key] self.treshold_editors = set() else: self.cumulative_n += 1 @@ -77,19 +79,33 @@ if key not in self.editors: self.editors[key] = {} self.editors[key]['obs'] = 0 - self.editors[key]['edits'] = [] + self.editors[key]['edits'] = {} + self.add_years(key) self.number_editors += 1 - + id = str(self.editors[key]['obs']) - self.editors[key]['edits'].append(value) + year = str(value['date'].year) + self.editors[key]['edits'][year].append(value) self.editors[key]['obs'] += 1 if self.editors[key]['obs'] == self.treshold: self.treshold_editors.add(key) + def add_years(self, key): + now = datetime.datetime.now().year + 1 + for year in xrange(2001, now): + self.editors[key]['edits'][str(year)] = [] + + def update(self, editor, values): self.collection.update({'editor': editor}, {'$pushAll': {'edits': values}}, upsert=True) + def insert(self, editor, values): + try: + self.collection.insert({'editor': editor, 'edits': values}) + except: + pass + def store(self): utils.store_object(self, settings.BINARY_OBJECT_FILE_LOCATION, self.__repr__()) Modified: trunk/tools/editor_trends/optimize_editors.py =================================================================== --- trunk/tools/editor_trends/optimize_editors.py 2010-11-09 22:33:54 UTC (rev 76416) +++ trunk/tools/editor_trends/optimize_editors.py 2010-11-09 22:43:51 UTC (rev 76417) @@ -25,6 +25,7 @@ import settings from database import db from utils import process_constructor as pc +from utils import utils import construct_datasets @@ -67,20 +68,25 @@ return articles -def optimize_editors(input_queue, result_queue, pbar, kwargs): +def sort_edits(edits): + edits = utils.merge_list(edits) + return sorted(edits, key=itemgetter('date')) + + +def optimize_editors(input_queue, result_queue, pbar, **kwargs): dbname = kwargs.pop('dbname') mongo = db.init_mongo_db(dbname) input = mongo['editors'] output = mongo['dataset'] - mongo.output.ensure_index('editor') - mongo.output.ensure_index('year_joined') + output.ensure_index('editor') + output.ensure_index('year_joined') definition = kwargs.pop('definition') while True: try: id = input_queue.get(block=False) editor = input.find_one({'editor': id}) edits = editor['edits'] - edits = sorted(edits, key=itemgetter('date')) + edits = sort_edits(edits) edit_count = len(edits) new_wikipedian = edits[9]['date'] first_edit = edits[0]['date'] @@ -100,6 +106,7 @@ except Empty: break + def run_optimize_editors(dbname): ids = construct_datasets.retrieve_editor_ids_mongo(dbname, 'editors') kwargs = {'definition': 'traditional', @@ -108,15 +115,16 @@ 'nr_input_processors': 1, 'nr_output_processors': 0, } - chunks = {} - parts = int(round(float(len(ids)) / 1, 0)) - a = 0 - for x in xrange(settings.NUMBER_OF_PROCESSES): - b = a + parts - chunks[x] = ids[a:b] - a = (x + 1) * parts - if a >= len(ids): - break + chunks = utils.split_list(ids, settings.NUMBER_OF_PROCESSES) +# chunks = {} +# parts = int(round(float(len(ids)) / 1, 0)) +# a = 0 +# for x in xrange(settings.NUMBER_OF_PROCESSES): +# b = a + parts +# chunks[x] = ids[a:b] +# a = (x + 1) * parts +# if a >= len(ids): +# break pc.build_scaffolding(pc.load_queue, optimize_editors, chunks, False, False, **kwargs) @@ -131,5 +139,5 @@ if __name__ == '__main__': - debug_optimize_editors('test') - #run_optimize_editors('test') + #debug_optimize_editors('test') + run_optimize_editors('enwiki') Modified: trunk/tools/editor_trends/run.py =================================================================== --- trunk/tools/editor_trends/run.py 2010-11-09 22:33:54 UTC (rev 76416) +++ trunk/tools/editor_trends/run.py 2010-11-09 22:43:51 UTC (rev 76417) @@ -33,5 +33,5 @@ output = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki', 'sorted') dbname = 'enwiki' #sort.debug_mergesort_feeder(input, output) -sort.mergesort_launcher(input, output) -#sort.mergesort_external_launcher(dbname, output, output) \ No newline at end of file +#sort.mergesort_launcher(input, output) +sort.mergesort_external_launcher(dbname, output, output) \ No newline at end of file Modified: trunk/tools/editor_trends/settings.py =================================================================== --- trunk/tools/editor_trends/settings.py 2010-11-09 22:33:54 UTC (rev 76416) +++ trunk/tools/editor_trends/settings.py 2010-11-09 22:43:51 UTC (rev 76417) @@ -22,19 +22,19 @@ the datasets as part of the Editor Dynamics and Anti-Vandalism projects. ''' - from multiprocessing import cpu_count import os import sys import platform -#try: -# from pywin import win32file -# '''increase the maximum number of open files on Windows to 1024''' -# win32file._setmaxstdio(1024) -#except ImportError: -# pass try: + from pywin import win32file + '''increase the maximum number of open files on Windows to 1024''' + win32file._setmaxstdio(1024) +except ImportError: + pass + +try: import resource except ImportError: pass @@ -151,7 +151,7 @@ NUMBER_OF_PROCESSES = cpu_count() * 1 #Extensions of ascii files, this is used to determine the filemode to use -ASCII = ['txt', 'csv', 'xml', 'sql'] +ASCII = ['txt', 'csv', 'xml', 'sql', 'json'] WP_DUMP_LOCATION = 'http://download.wikimedia.org' Modified: trunk/tools/editor_trends/utils/sort.py =================================================================== --- trunk/tools/editor_trends/utils/sort.py 2010-11-09 22:33:54 UTC (rev 76416) +++ trunk/tools/editor_trends/utils/sort.py 2010-11-09 22:43:51 UTC (rev 76417) @@ -32,8 +32,8 @@ import utils import process_constructor as pc from database import cache +from database import db - def quick_sort(obs): if obs == []: return [] @@ -106,14 +106,28 @@ mongo.collection.ensure_index('editor') editor_cache = cache.EditorCache(collection) prev_contributor = '' - for line in readline(file): + x = 0 + edits = 0 + editors = set() + for line in readline(fh): contributor = line[0] + if prev_contributor != contributor: - editor_cache.add('NEXT', '') - value = {'date': line[1], 'article': line[2]} + result = editor_cache.add(prev_contributor, 'NEXT') + print 'Stored %s editors' % x + edits = 0 + x += 1 + else: + edits += 1 + if edits == 10: + editors.add(contributor) + date = utils.convert_timestamp_to_date(line[1]) + article_id = int(line[2]) + value = {'date': date, 'article': article_id} editor_cache.add(contributor, value) prev_contributor = contributor fh.close() + utils.store_object(editors, settings.BINARY_OBJECT_FILE_LOCATION, 'editors') def mergesort_external_launcher(dbname, input, output): @@ -126,15 +140,17 @@ chunks = utils.split_list(files, int(x)) '''1st iteration external mergesort''' for chunk in chunks: - filehandles = [utils.create_txt_filehandle(input, file, 'r', settings.ENCODING) for file in chunks[chunk]] - filename = merge_sorted_files(output, filehandles, chunk) - filehandles = [fh.close() for fh in filehandles] +# filehandles = [utils.create_txt_filehandle(input, file, 'r', settings.ENCODING) for file in chunks[chunk]] +# filename = merge_sorted_files(output, filehandles, chunk) +# filehandles = [fh.close() for fh in filehandles] + pass '''2nd iteration external mergesort, if necessary''' if len(chunks) > 1: - files = utils.retrieve_file_list(output, 'txt', mask='[merged]') - filehandles = [utils.create_txt_filehandle(output, file, 'r', settings.ENCODING) for file in files] - filename = merge_sorted_files(output, filehandles, 'final') - filehandles = [fh.close() for fh in filehandles] +# files = utils.retrieve_file_list(output, 'txt', mask='[merged]') +# filehandles = [utils.create_txt_filehandle(output, file, 'r', settings.ENCODING) for file in files] +# filename = merge_sorted_files(output, filehandles, 'final') +# filehandles = [fh.close() for fh in filehandles] + filename = 'merged_final.txt' store_editors(output, filename, dbname) Modified: trunk/tools/editor_trends/utils/utils.py =================================================================== --- trunk/tools/editor_trends/utils/utils.py 2010-11-09 22:33:54 UTC (rev 76416) +++ trunk/tools/editor_trends/utils/utils.py 2010-11-09 22:43:51 UTC (rev 76417) @@ -326,6 +326,13 @@ files.append('.'.join(file)) return files +def merge_list(datalist): + merged = [] + for d in datalist: + for x in datalist[d]: + merged.append(x) + return merged + def split_list(datalist, maxval): chunks = {} a = 0 _______________________________________________ MediaWiki-CVS mailing list MediaWiki-CVS@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-cvs