http://www.mediawiki.org/wiki/Special:Code/MediaWiki/76846
Revision: 76846 Author: diederik Date: 2010-11-16 23:13:29 +0000 (Tue, 16 Nov 2010) Log Message: ----------- Major refactoring: * moved files to etl (extract-transform-load) directory this commit will contain some small issues. Added Paths: ----------- trunk/tools/editor_trends/etl/ trunk/tools/editor_trends/etl/__init__.py trunk/tools/editor_trends/etl/bots.py trunk/tools/editor_trends/etl/chunker.py trunk/tools/editor_trends/etl/construct_datasets.py trunk/tools/editor_trends/etl/extract.py trunk/tools/editor_trends/etl/loader.py trunk/tools/editor_trends/etl/optimize_editors.py trunk/tools/editor_trends/etl/xml2pig.py Property changes on: trunk/tools/editor_trends/etl/__init__.py ___________________________________________________________________ Added: svn:eol-style + native Added: trunk/tools/editor_trends/etl/bots.py =================================================================== --- trunk/tools/editor_trends/etl/bots.py (rev 0) +++ trunk/tools/editor_trends/etl/bots.py 2010-11-16 23:13:29 UTC (rev 76846) @@ -0,0 +1,123 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +''' +Copyright (C) 2010 by Diederik van Liere (dvanli...@gmail.com) +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License version 2 +as published by the Free Software Foundation. +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +See the GNU General Public License for more details, at +http://www.fsf.org/licenses/gpl.html +''' + +__author__ = '''\n'''.join(['Diederik van Liere (dvanli...@gmail.com)', ]) + +import os +import cStringIO +import xml.etree.cElementTree as cElementTree + + +import configuration +settings = configuration.Settings() +from wikitree import xml +from database import db +from database import db_settings +from utils import utils +from utils import process_constructor as pc + +try: + import psyco + psyco.full() +except ImportError: + pass + + +def create_bot_ids_db_mongo(): + ids = utils.create_dict_from_csv_file(add_id_to_botnames, settings.encoding) + mongo = db.init_mongo_db('bots') + collection = mongo['ids'] + + db.remove_documents_from_mongo_db(collection, None) + + for id, name in ids.iteritems(): + collection.insert({'id': id, 'name': name}) + + print collection.count() + + +def lookup_username(input_queue, result_queue, progressbar, bots, debug=False): + ''' + This function is used to find the id's belonging to the different bots that + are patrolling the Wikipedia sites. + @input_queue contains a list of xml files to parse + + @result_queue should be set to false as the results are directly written to + a csv file. + + @progressbar depends on settings + + @bots is a dictionary containing the names of the bots to lookup + ''' + + #if len(bots.keys()) == 1: + bots = bots['bots'] + #print bots.keys() + + if settings.debug: + messages = {} + + while True: + if debug: + file = input_queue + else: + file = input_queue.get(block=False) + + if file == None: + break + + data = xml.read_input(utils.open_txt_file(settings.input_location + + file, 'r', encoding=settings.encoding)) + + for raw_data in data: + xml_buffer = cStringIO.StringIO() + raw_data.insert(0, '<?xml version="1.0" encoding="UTF-8" ?>\n') + raw_data = ''.join(raw_data) + raw_data = raw_data.encode('utf-8') + xml_buffer.write(raw_data) + + try: + xml_nodes = cElementTree.XML(xml_buffer.getvalue()) + revisions = xml_nodes.findall('revision') + for revision in revisions: + contributor = xml.retrieve_xml_node(revision, 'contributor') + username = contributor.find('username') + if username == None: + continue + username = xml.extract_text(username) + #print username.encode('utf-8') + + if username in bots: + id = contributor.find('id') + id = xml.extract_text(id) + #print username.encode('utf-8'), id + utils.write_data_to_csv({username: [id]}, add_id_to_botnames, settings.encoding) + bots.pop(username) + if bots == {}: + print 'Mission accomplished' + return + except Exception, error: + print error + if settings.debug: + messages = utils.track_errors(xml_buffer, error, file, + messages) + + if settings.debug: + utils.report_error_messages(messages, lookup_username) + + +if __name__ == '__main__': + #debug() + #add_id_to_botnames() + create_bot_ids_db_mongo() Property changes on: trunk/tools/editor_trends/etl/bots.py ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:eol-style + native Added: trunk/tools/editor_trends/etl/chunker.py =================================================================== --- trunk/tools/editor_trends/etl/chunker.py (rev 0) +++ trunk/tools/editor_trends/etl/chunker.py 2010-11-16 23:13:29 UTC (rev 76846) @@ -0,0 +1,211 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +''' +Copyright (C) 2010 by Diederik van Liere (dvanli...@gmail.com) +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License version 2 +as published by the Free Software Foundation. +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +See the GNU General Public License for more details, at +http://www.fsf.org/licenses/gpl.html +''' + +__author__ = '''\n'''.join(['Diederik van Liere (dvanli...@gmail.com)', ]) +__author__email = 'dvanliere at gmail dot com' +__date__ = '2010-10-21' +__version__ = '0.1' + +import xml.etree.cElementTree as cElementTree +import sys +import codecs +import re +import json +import os + +import progressbar + + +sys.path.append('..') +import configuration +from utils import utils +from wikitree import xml +settings = configuration.Settings() + +try: + import psyco + psyco.full() +except ImportError: + pass + + +RE_NUMERIC_CHARACTER = re.compile('&#(\d+);') + + +def remove_numeric_character_references(text): + return re.sub(RE_NUMERIC_CHARACTER, lenient_deccharref, text).encode('utf-8') + + +def lenient_deccharref(m): + try: + return unichr(int(m.group(1))) + except ValueError: + ''' + There are a few articles that raise a Value Error here, the reason is + that I am using a narrow Python build (UCS2) instead of a wide build + (UCS4). The quick fix is to return an empty string... + Real solution is to rebuild Python with UCS4 support..... + ''' + return '' + + +def remove_namespace(element, namespace): + '''Remove namespace from the XML document.''' + ns = u'{%s}' % namespace + nsl = len(ns) + for elem in element.getiterator(): + if elem.tag.startswith(ns): + elem.tag = elem.tag[nsl:] + return element + + +def load_namespace(language): + file = '%s_ns.json' % language + fh = utils.create_txt_filehandle(settings.namespace_location, file, 'r', settings.encoding) + ns = json.load(fh) + fh.close() + ns = ns['query']['namespaces'] + return ns + + +def build_namespaces_locale(namespaces): + ''' + Construct a list of all the non-main namespaces + ''' + ns = [] + for namespace in namespaces: + value = namespaces[namespace].get(u'*', None) + if value != None and value != '': + ns.append(value) + return ns + + +def parse_comments(xml, function): + revisions = xml.findall('revision') + for revision in revisions: + comment = revision.find('comment') + timestamp = revision.find('timestamp').text + if comment != None and comment.text != None: + comment.text = function(comment.text) + return xml + + +def is_article_main_namespace(elem, namespace): + ''' + checks whether the article belongs to the main namespace + ''' + title = elem.find('title').text + for ns in namespace: + if title.startswith(ns): + return False + return True + + +def write_xml_file(element, fh, counter, language): + '''Get file handle and write xml element to file''' + size = len(cElementTree.tostring(element)) + fh, counter = create_file_handle(fh, counter, size, language) + try: + fh.write(cElementTree.tostring(element)) + except MemoryError: + print 'Add error capturing logic' + fh.write('\n') + return fh, counter + + +def create_file_handle(fh, counter, size, language): + '''Create file handle if none is supplied or if file size > max file size.''' + if not counter: + counter = 0 + path = os.path.join(settings.input_location, language, '%s.xml' % counter) + if not fh: + fh = codecs.open(path, 'w', encoding=settings.encoding) + return fh, counter + elif (fh.tell() + size) > settings.binary_location: + print 'Created chunk %s' % counter + fh.close + counter += 1 + fh = codecs.open(path, 'w', encoding=settings.encoding) + return fh, counter + else: + return fh, counter + + +def flatten_xml_elements(data, page): + flat = [] + for x, elems in enumerate(data): + flat.append([page]) + for elem in elems: + if elem.tag != 'id': + if len(elem.getchildren()) > 0: + for el in elem.getchildren(): + flat[x].append(xml.extract_text(elem, None)) + else: + flat[x].append(xml.extract_text(elem, None)) + return flat + + +def split_file(output, input, project, language_code, language, format='xml'): + '''Reads xml file and splits it in N chunks''' + #location = os.path.join(settings.input_location, language) + output = os.path.join(output, language_code, project) + settings.verify_environment([output]) + if format == 'xml': + fh = None + else: + f = input.replace('.xml', '') + fh = utils.create_txt_filehandle(output, '%s.tsv' % f, 'w', settings.encoding) + + ns = load_namespace(language_code) + ns = build_namespaces_locale(ns) + + + counter = None + tag = '{%s}page' % settings.xml_namespace + + + context = cElementTree.iterparse(input, events=('start', 'end')) + context = iter(context) + event, root = context.next() #get the root element of the XML doc + + try: + for event, elem in context: + if event == 'end': + if elem.tag == tag: + elem = remove_namespace(elem, settings.xml_namespace) + if is_article_main_namespace(elem, ns): + page = elem.find('id').text + elem = parse_comments(elem, remove_numeric_character_references) + if format == 'xml': + fh, counter = write_settings.input_filename(elem, fh, counter, language_code) + else: + data = [el.getchildren() for el in elem if el.tag == 'revision'] + data = flatten_xml_elements(data, page) + utils.write_list_to_csv(data, fh, recursive=False, newline=True) + root.clear() # when done parsing a section clear the tree to safe memory + except SyntaxError: + f = utils.create_txt_filehandle(settings.log_location, 'split_xml', 'w', settings.encoding) + f.write(cElementTree.tostring(elem)) + f.close() + finally: + fh.close() + +if __name__ == "__main__": + kwargs = {'output': settings.input_location, + 'input': settings.input_filename, + 'project':'wiki', + 'language_code':'en', + 'format': 'tsv' + } + split_file(**kwargs) Property changes on: trunk/tools/editor_trends/etl/chunker.py ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:eol-style + native Added: trunk/tools/editor_trends/etl/construct_datasets.py =================================================================== --- trunk/tools/editor_trends/etl/construct_datasets.py (rev 0) +++ trunk/tools/editor_trends/etl/construct_datasets.py 2010-11-16 23:13:29 UTC (rev 76846) @@ -0,0 +1,255 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +''' +Copyright (C) 2010 by Diederik van Liere (dvanli...@gmail.com) +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License version 2 +as published by the Free Software Foundation. +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +See the GNU General Public License for more details, at +http://www.fsf.org/licenses/gpl.html +''' + +__author__ = '''\n'''.join(['Diederik van Liere (dvanli...@gmail.com)', ]) +__author__email = 'dvanliere at gmail dot com' +__date__ = '2010-10-21' +__version__ = '0.1' + +from multiprocessing import Queue +from Queue import Empty +import datetime +from dateutil.relativedelta import * + +import progressbar + +import configuration +settings = configuration.Settings() +from utils import models, utils +from database import db +from utils import process_constructor as pc + +try: + import psyco + psyco.full() +except ImportError: + pass + + +def retrieve_editor_ids_mongo(dbname, collection): + if utils.check_file_exists(settings.binary_location, + 'editors.bin'): + ids = utils.load_object(settings.binary_location, + 'editors.bin') + else: + mongo = db.init_mongo_db(dbname) + editors = mongo[collection] + ids = editors.distinct('editor') + utils.store_object(ids, settings.binary_location, retrieve_editor_ids_mongo) + return ids + + +def expand_edits(edits): + data = [] + for edit in edits: + data.append(edit['date']) + return data + + +def expand_observations(obs, vars_to_expand): + for var in vars_to_expand: + if var == 'edits': + obs[var] = expand_edits(obs[var]) + elif var == 'edits_by_year': + keys = obs[var].keys() + keys.sort() + edits = [] + for key in keys: + edits.append(str(obs[var][key])) + obs[var] = edits + return obs + +def write_longitudinal_data(id, edits, fh): + years = edits.keys() + years.sort() + for year in years: + months = edits[year].keys() + months = [int(m) for m in months] + months.sort() + for m in months: + date = datetime.date(int(year), int(m), 1) + fh.write('%s\t%s\t%s\n' % (id, date, edits[year][str(m)])) + + +def expand_headers(headers, vars_to_expand, obs): + for var in vars_to_expand: + l = len(obs[var]) + pos = headers.index(var) + for i in xrange(l): + if var.endswith('year'): + suffix = 2001 + i + elif var.endswith('edits'): + suffix = 1 + i + headers.insert(pos + i, '%s_%s' % (var, suffix)) + headers.remove(var) + return headers + + +def generate_long_editor_dataset(input_queue, data_queue, pbar, **kwargs): + debug = kwargs.pop('debug') + dbname = kwargs.pop('dbname') + mongo = db.init_mongo_db(dbname) + editors = mongo['dataset'] + name = dbname + '_long_editors.csv' + fh = utils.create_txt_filehandle(settings.dataset_location, name, 'a', settings.encoding) + x = 0 + vars_to_expand = [] + while True: + try: + id = input_queue.get(block=False) + obs = editors.find_one({'editor': id}, {'monthly_edits': 1}) + if x == 0: + headers = obs.keys() + headers.sort() + headers = expand_headers(headers, vars_to_expand, obs) + utils.write_list_to_csv(headers, fh) + write_longitudinal_data(id, obs['monthly_edits'], fh) + #utils.write_list_to_csv(data, fh) + x += 1 + except Empty: + break + + +def generate_cohort_analysis(input_queue, data_queue, pbar, **kwargs): + dbname = kwargs.get('dbname') + pbar = kwargs.get('pbar') + mongo = db.init_mongo_db(dbname) + editors = mongo['dataset'] + year = datetime.datetime.now().year + 1 + begin = year - 2001 + p = [3, 6, 9] + periods = [y * 12 for y in xrange(1, begin)] + periods = p + periods + data = {} + while True: + try: + id = input_queue.get(block=False) + obs = editors.find_one({'editor': id}, {'first_edit': 1, 'final_edit': 1}) + first_edit = obs['first_edit'] + last_edit = obs['final_edit'] + for y in xrange(2001, year): + if y == 2010 and first_edit > datetime.datetime(2010, 1, 1): + print 'debug' + if y not in data: + data[y] = {} + data[y]['n'] = 0 + window_end = datetime.datetime(y, 12, 31) + if window_end > datetime.datetime.now(): + now = datetime.datetime.now() + m = now.month - 1 #Dump files are always lagging at least one month.... + d = now.day + window_end = datetime.datetime(y, m, d) + edits = [] + for period in periods: + if period not in data[y]: + data[y][period] = 0 + window_start = datetime.datetime(y, 12, 31) - relativedelta(months=period) + if window_start < datetime.datetime(2001, 1, 1): + window_start = datetime.datetime(2001, 1, 1) + if date_falls_in_window(window_start, window_end, first_edit, last_edit): + edits.append(period) + if edits != []: + p = min(edits) + data[y]['n'] += 1 + data[y][p] += 1 + #pbar.update(+1) + except Empty: + break + utils.store_object(data, settings.binary_location, 'cohort_data') + +def date_falls_in_window(window_start, window_end, first_edit, last_edit): + if first_edit >= window_start and first_edit <= window_end: + return True + else: + return False + + +def generate_wide_editor_dataset(input_queue, data_queue, pbar, **kwargs): + dbname = kwargs.pop('dbname') + mongo = db.init_mongo_db(dbname) + editors = mongo['dataset'] + name = dbname + '_wide_editors.csv' + fh = utils.create_txt_filehandle(settings.dataset_location, name, 'a', settings.encoding) + x = 0 + vars_to_expand = ['edits', 'edits_by_year', 'articles_by_year'] + while True: + try: + if debug: + id = u'99797' + else: + id = input_queue.get(block=False) + print input_queue.qsize() + obs = editors.find_one({'editor': id}) + obs = expand_observations(obs, vars_to_expand) + if x == 0: + headers = obs.keys() + headers.sort() + headers = expand_headers(headers, vars_to_expand, obs) + utils.write_list_to_csv(headers, fh) + data = [] + keys = obs.keys() + keys.sort() + for key in keys: + data.append(obs[key]) + utils.write_list_to_csv(data, fh) + + x += 1 + except Empty: + break + fh.close() + + +def retrieve_edits_by_contributor_launcher(): + pc.build_scaffolding(pc.load_queue, retrieve_edits_by_contributor, 'contributors') + + +def debug_retrieve_edits_by_contributor_launcher(dbname): + kwargs = {'debug': False, + 'dbname': dbname, + } + ids = retrieve_editor_ids_mongo(dbname, 'editors') + input_queue = pc.load_queue(ids) + q = Queue() + generate_editor_dataset(input_queue, q, False, kwargs) + + +def generate_editor_dataset_launcher(dbname): + kwargs = {'nr_input_processors': 1, + 'nr_output_processors': 1, + 'debug': False, + 'dbname': dbname, + 'poison_pill':False, + 'pbar': True + } + ids = retrieve_editor_ids_mongo(dbname, 'editors') + ids = list(ids) + chunks = dict({0: ids}) + pc.build_scaffolding(pc.load_queue, generate_cohort_analysis, chunks, False, False, **kwargs) + + +def generate_editor_dataset_debug(dbname): + ids = retrieve_editor_ids_mongo(dbname, 'editors') + input_queue = pc.load_queue(ids) + kwargs = {'nr_input_processors': 1, + 'nr_output_processors': 1, + 'debug': True, + 'dbname': dbname, + } + generate_editor_dataset(input_queue, False, False, kwargs) + + +if __name__ == '__main__': + #generate_editor_dataset_debug('test') + generate_editor_dataset_launcher('enwiki') + #debug_retrieve_edits_by_contributor_launcher() Property changes on: trunk/tools/editor_trends/etl/construct_datasets.py ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:eol-style + native Added: trunk/tools/editor_trends/etl/extract.py =================================================================== --- trunk/tools/editor_trends/etl/extract.py (rev 0) +++ trunk/tools/editor_trends/etl/extract.py 2010-11-16 23:13:29 UTC (rev 76846) @@ -0,0 +1,338 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +''' +Copyright (C) 2010 by Diederik van Liere (dvanli...@gmail.com) +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License version 2 +as published by the Free Software Foundation. +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +See the GNU General Public License for more details, at +http://www.fsf.org/licenses/gpl.html +''' + +__author__ = '''\n'''.join(['Diederik van Liere (dvanli...@gmail.com)', ]) +__author__email = 'dvanliere at gmail dot com' +__date__ = '2010-10-21' +__version__ = '0.1' + +#Default Python libraries (Python => 2.6) +import sys +import os +import time +import datetime +import codecs +import math +import cStringIO +import re +from operator import itemgetter +import xml.etree.cElementTree as cElementTree +from multiprocessing import Queue, JoinableQueue +from Queue import Empty +import pymongo + +# Custom written files +import configuration +settings = configuration.Settings() +from utils import utils, models +from database import db_settings +from database import db +from database import cache +from wikitree import xml +from statistics import dataset +from utils import process_constructor as pc + + +try: + import psyco + psyco.full() +except ImportError: + pass + + +def determine_username_is_bot(username, kwargs): + ''' + @username is the xml element containing the id of the user + @kwargs should have a list with all the bot ids + + @Return False if username id is not in bot list id or True if username id + is a bot id. + ''' + ids = kwargs.get('bots', []) + if ids == None: + ids = [] + if username != None and username.text != None: + id = username.text + if id in ids: + return 1 + else: + return 0 + + +def extract_contributor_id(contributor, kwargs): + ''' + @contributor is the xml contributor node containing a number of attributes + + Currently, we are only interested in registered contributors, hence we + ignore anonymous editors. If you are interested in collecting data on + anonymous editors then add the string 'ip' to the tags variable. + ''' + tags = ['id'] + if contributor.get('deleted'): + return - 1 # ASK: Not sure if this is the best way to code deleted contributors. + for elem in contributor: + if elem.tag in tags: + if elem.text != None: + return elem.text.decode('utf-8') + else: + return - 1 + + +def output_editor_information(elem, output, **kwargs): + ''' + @elem is an XML element containing 1 revision from a page + @output is where to store the data, either a queue or a filehandle + @**kwargs contains extra information + + the variable tags determines which attributes are being parsed, the values in + this dictionary are the functions used to extract the data. + ''' + tags = {'contributor': {'editor': extract_contributor_id, + 'bot': determine_username_is_bot}, + 'timestamp': {'date': xml.extract_text}, + } + vars = {} + headers = ['editor', 'date', 'article'] + destination = kwargs.pop('destination') + revisions = elem.findall('revision') + for revision in revisions: + vars['article'] = elem.find('id').text.decode(settings.encoding) + elements = revision.getchildren() + for tag, functions in tags.iteritems(): + xml_node = xml.retrieve_xml_node(elements, tag) + for var, function in functions.iteritems(): + vars[var] = function(xml_node, kwargs) + + #print '%s\t%s\t%s\t%s\t' % (vars['article'], vars['contributor'], vars['timestamp'], vars['bot']) + if vars['bot'] == 0 and vars['editor'] != -1 and vars['editor'] != None: + vars.pop('bot') + if destination == 'queue': + output.put(vars) + vars['date'] = utils.convert_timestamp_to_date(vars['date']) + elif destination == 'file': + data = [] + for head in headers: + data.append(vars[head]) + utils.write_list_to_csv(data, output) + vars = {} + + +def parse_editors(xml_queue, data_queue, **kwargs): + ''' + @xml_queue contains the filenames of the files to be parsed + @data_queue is an instance of Queue where the extracted data is stored for + further processing + @pbar is an instance of progressbar to display the progress + @bots is a list of id's of known Wikipedia bots + @debug is a flag to indicate whether the function is called for debugging. + + Output is the data_queue that will be used by store_editors() + ''' + input = kwargs.get('input', None) + output = kwargs.get('output', None) + debug = kwargs.get('debug', False) + destination = kwargs.get('destination', 'file') + bots = kwargs.get('bots', None) + pbar = kwargs.get('pbar', None) + if settings.debug: + messages = {} + vars = {} + + while True: + try: + if debug: + file = xml_queue + else: + file = xml_queue.get(block=False) + if file == None: + print 'Swallowed a poison pill' + break + + data = xml.read_input(utils.create_txt_filehandle(input, + file, 'r', + encoding=settings.encoding)) + if destination == 'file': + name = file[:-4] + '.txt' + fh = utils.create_txt_filehandle(output, name, 'w', settings.encoding) + for raw_data in data: + xml_buffer = cStringIO.StringIO() + raw_data.insert(0, '<?xml version="1.0" encoding="UTF-8" ?>\n') + + try: + raw_data = ''.join(raw_data) + xml_buffer.write(raw_data) + elem = cElementTree.XML(xml_buffer.getvalue()) + output_editor_information(elem, fh, bots=bots, destination=destination) + except SyntaxError, error: + print error + ''' + There are few cases with invalid tokens, they are fixed + here and then reinserted into the XML DOM + data = convert_html_entities(xml_buffer.getvalue()) + elem = cElementTree.XML(data) + output_editor_information(elem) + ''' + if settings.debug: + utils.track_errors(xml_buffer, error, file, messages) + except UnicodeEncodeError, error: + print error + if settings.debug: + utils.track_errors(xml_buffer, error, file, messages) + except MemoryError, error: + print file, error + print raw_data[:12] + print 'String was supposed to be %s characters long' % sum([len(raw) for raw in raw_data]) + if destination == 'queue': + output.put('NEXT') + while True: + if output.qsize() < 100000: + break + else: + time.sleep(10) + print 'Still sleeping, queue is %s items long' % output.qsize() + + else: + fh.close() + + if pbar: + print file, xml_queue.qsize() + #utils.update_progressbar(pbar, xml_queue) + + if debug: + break + + except Empty: + break + + if destination == 'queue': + data_queue.put(None) + + if settings.debug: + utils.report_error_messages(messages, parse_editors) + + +def store_editors(data_queue, **kwargs): + ''' + @data_queue is an instance of Queue containing information extracted by + parse_editors() + @pids is a list of PIDs used to check if other processes are finished + running + @dbname is the name of the MongoDB collection where to store the information. + ''' + dbname = kwargs.get('dbname', None) + mongo = db.init_mongo_db(dbname) + collection = mongo['editors'] + mongo.collection.ensure_index('editor') + editor_cache = cache.EditorCache(collection) + + while True: + try: + edit = data_queue.get(block=False) + data_queue.task_done() + if edit == None: + print 'Swallowing poison pill' + break + elif edit == 'NEXT': + editor_cache.add('NEXT', '') + else: + contributor = edit['editor'] + value = {'date': edit['date'], 'article': edit['article']} + editor_cache.add(contributor, value) + #collection.update({'editor': contributor}, {'$push': {'edits': value}}, True) + #'$inc': {'edit_count': 1}, + + except Empty: + ''' + This checks whether the Queue is empty because the preprocessors are + finished or because this function is faster in emptying the Queue + then the preprocessors are able to fill it. If the preprocessors + are finished and this Queue is empty than break, else wait for the + Queue to fill. + ''' + pass + + print 'Emptying entire cache.' + editor_cache.store() + print 'Time elapsed: %s and processed %s items.' % (datetime.datetime.now() - editor_cache.init_time, editor_cache.cumulative_n) + + +def load_cache_objects(): + cache = {} + files = utils.retrieve_file_list(settings.binary_location, '.bin') + for x, file in enumerate(files): + cache[x] = utils.load_object(settings.binary_location, file) + return cache + + +def search_cache_for_missed_editors(dbname): + mongo = db.init_mongo_db(dbname) + collection = mongo['editors'] + editor_cache = cache.EditorCache(collection) + cache = load_cache_objects() + for c in cache: + for editor in cache[c]: + editor_cache.add(editor, cache[c][editor]) + cache[c] = {} + editor_cache.add('NEXT', '') + cache = {} + + + +def load_bot_ids(): + ''' + Loader function to retrieve list of id's of known Wikipedia bots. + ''' + ids = {} + mongo = db.init_mongo_db('bots') + bots = mongo['ids'] + cursor = bots.find() + for bot in cursor: + ids[bot['id']] = bot['name'] + return ids + + +def run_parse_editors(location, language, project): + ids = load_bot_ids() + base = os.path.join(location, language, project) + input = os.path.join(base, 'chunks') + output = os.path.join(base, 'txt') + settings.verify_environment([input, output]) + files = utils.retrieve_file_list(input, 'xml') + + kwargs = {'bots': ids, + 'dbname': language + project, + 'language': language, + 'project': project, + 'pbar': True, + 'destination': 'file', + 'nr_input_processors': settings.number_of_processes, + 'nr_output_processors': settings.number_of_processes, + 'input': input, + 'output': output, + } + + chunks = utils.split_list(files, settings.number_of_processes) + pc.build_scaffolding(pc.load_queue, parse_editors, chunks, False, False, **kwargs) + + +def debug_parse_editors(dbname): + q = JoinableQueue() + parse_editors('522.xml', q, None, None, debug=True, destination='file') + store_editors(q, [], dbname) + + +if __name__ == "__main__": + #debug_parse_editors('test2') + run_parse_editors(settings.input_location, 'en', 'wiki') + pass Property changes on: trunk/tools/editor_trends/etl/extract.py ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:eol-style + native Added: trunk/tools/editor_trends/etl/loader.py =================================================================== --- trunk/tools/editor_trends/etl/loader.py (rev 0) +++ trunk/tools/editor_trends/etl/loader.py 2010-11-16 23:13:29 UTC (rev 76846) @@ -0,0 +1,140 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +''' +Copyright (C) 2010 by Diederik van Liere (dvanli...@gmail.com) +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License version 2 +as published by the Free Software Foundation. +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +See the GNU General Public License for more details, at +http://www.fsf.org/licenses/gpl.html +''' + +__author__ = '''\n'''.join(['Diederik van Liere (dvanli...@gmail.com)', ]) +__author__email = 'dvanliere at gmail dot com' +__date__ = '2010-11-16' +__version__ = '0.1' + + +import sys + +sys.path.append('..') +import configuration +settings = configuration.Settings() +from database import db +from database import cache +from utils import utils +import process_constructor as pc + + +def store_editors(input, filename, dbname): + fh = utils.create_txt_filehandle(input, filename, 'r', settings.encoding) + mongo = db.init_mongo_db(dbname) + collection = mongo['test'] + mongo.collection.ensure_index('editor') + mongo.collection.create_index('editor') + editor_cache = cache.EditorCache(collection) + prev_contributor = -1 + x = 0 + edits = 0 + editors = set() + for line in readline(fh): + if len(line) == 0: + continue + contributor = int(line[0]) + if contributor == 5767932: + print 'debug' + if prev_contributor != contributor: + if edits >= 10: + result = editor_cache.add(prev_contributor, 'NEXT') + if result: + editors.add(prev_contributor) + result = None + x += 1 + print 'Stored %s editors' % x + else: + editor_cache.clear(prev_contributor) + edits = 0 + edits += 1 + date = utils.convert_timestamp_to_date(line[1]) #+ datetime.timedelta(days=1) + article_id = int(line[2]) + value = {'date': date, 'article': article_id} + editor_cache.add(contributor, value) + prev_contributor = contributor + fh.close() + utils.store_object(editors, settings.binary_location, 'editors') + + +def mergesort_external_launcher(dbname, input, output): + files = utils.retrieve_file_list(input, 'txt', mask='') + x = 0 + maxval = 99999 + while maxval >= settings.max_filehandles: + x += 1.0 + maxval = round(len(files) / x) + chunks = utils.split_list(files, int(x)) + '''1st iteration external mergesort''' + for chunk in chunks: + filehandles = [utils.create_txt_filehandle(input, file, 'r', settings.encoding) for file in chunks[chunk]] + filename = merge_sorted_files(output, filehandles, chunk) + filehandles = [fh.close() for fh in filehandles] + pass + '''2nd iteration external mergesort, if necessary''' + if len(chunks) > 1: + files = utils.retrieve_file_list(output, 'txt', mask='[merged]') + filehandles = [utils.create_txt_filehandle(output, file, 'r', settings.encoding) for file in files] + filename = merge_sorted_files(output, filehandles, 'final') + filehandles = [fh.close() for fh in filehandles] + filename = 'merged_final.txt' + store_editors(output, filename, dbname) + + +def mergesort_feeder(input_queue, result_queue, **kwargs): + input = kwargs.get('input', None) + output = kwargs.get('output', None) + while True: + try: + file = input_queue.get(block=False) + fh = utils.create_txt_filehandle(input, file, 'r', settings.encoding) + data = fh.readlines() + fh.close() + data = [d.replace('\n', '') for d in data] + data = [d.split('\t') for d in data] + sorted_data = mergesort(data) + write_sorted_file(sorted_data, file, output) + except Empty: + break + + +def mergesort_launcher(input, output): + kwargs = {'pbar': True, + 'nr_input_processors': settings.number_of_processes, + 'nr_output_processors': settings.number_of_processes, + 'input': input, + 'output': output, + 'poison_pill': False + } + files = utils.retrieve_file_list(input, 'txt') + chunks = utils.split_list(files, settings.number_of_processes) + pc.build_scaffolding(pc.load_queue, mergesort_feeder, chunks, False, False, **kwargs) + + +def debug_mergesort_feeder(input, output): + kwargs = { + 'input': input, + 'output': output, + } + files = utils.retrieve_file_list(input, 'txt') + chunks = utils.split_list(files, settings.number_of_processes) + q = pc.load_queue(chunks[0]) + mergesort_feeder(q, False, **kwargs) + + +if __name__ == '__main__': + input = os.path.join(settings.input_location, 'en', 'wiki', 'txt') + output = os.path.join(settings.input_location, 'en', 'wiki', 'sorted') + dbname = 'enwiki' + mergesort_launcher(input, output) + mergesort_external_launcher(dbname, output, output) \ No newline at end of file Property changes on: trunk/tools/editor_trends/etl/loader.py ___________________________________________________________________ Added: svn:eol-style + native Added: trunk/tools/editor_trends/etl/optimize_editors.py =================================================================== --- trunk/tools/editor_trends/etl/optimize_editors.py (rev 0) +++ trunk/tools/editor_trends/etl/optimize_editors.py 2010-11-16 23:13:29 UTC (rev 76846) @@ -0,0 +1,172 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +''' +Copyright (C) 2010 by Diederik van Liere (dvanli...@gmail.com) +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License version 2 +as published by the Free Software Foundation. +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +See the GNU General Public License for more details, at +http://www.fsf.org/licenses/gpl.html +''' + +__author__ = '''\n'''.join(['Diederik van Liere (dvanli...@gmail.com)', ]) +__author__email = 'dvanliere at gmail dot com' +__date__ = '2010-11-02' +__version__ = '0.1' + +from multiprocessing import Queue +from Queue import Empty +from operator import itemgetter +import datetime + +import configuration +settings = configuration.Settings() +from database import db +from utils import process_constructor as pc +from utils import utils +import construct_datasets + + +try: + import psyco + psyco.full() +except ImportError: + pass + + +def create_datacontainer(init_value=0): + ''' + This function initializes an empty dictionary with as key the year (starting + 2001 and running through) and as value @init_value, in most cases this will + be zero so the dictionary will act as a running tally for a variable but + @init_value can also a list, [], or a dictionary, {}, or a set, set(). + ''' + data = {} + year = datetime.datetime.now().year + 1 + for x in xrange(2001, year): + data[str(x)] = init_value + return data + + +def add_months_to_datacontainer(datacontainer): + for dc in datacontainer: + datacontainer[dc] = {} + for x in xrange(1, 13): + datacontainer[dc][str(x)] = 0 + return datacontainer + + +def determine_edits_by_month(edits): + datacontainer = create_datacontainer(init_value=0) + datacontainer = add_months_to_datacontainer(datacontainer) + for year in edits: + months = set() + for edit in edits[year]: + m = str(edit['date'].month) + if m not in months: + datacontainer[year][m] = 1 + months.add(m) + if len(months) == 12: + break + return datacontainer + + +def determine_edits_by_year(dates): + ''' + This function counts the number of edits by year made by a particular editor. + ''' + edits = create_datacontainer() + for date in dates: + year = str(date['date'].year) + edits[year] += 1 + return edits + + +def determine_articles_by_year(dates): + ''' + This function counts the number of unique articles by year edited by a + particular editor. + ''' + articles = create_datacontainer(set()) + for date in dates: + year = str(date['date'].year) + articles[year].add(date['article']) + for article in articles: + articles[article] = len(articles[article]) + return articles + + +def sort_edits(edits): + edits = utils.merge_list(edits) + return sorted(edits, key=itemgetter('date')) + + +def optimize_editors(input_queue, result_queue, pbar, **kwargs): + dbname = kwargs.pop('dbname') + mongo = db.init_mongo_db(dbname) + input = mongo['test'] + output = mongo['dataset'] + output.ensure_index('editor') + output.ensure_index('year_joined') + definition = kwargs.pop('definition') + while True: + try: + id = input_queue.get(block=False) + editor = input.find_one({'editor': id}) + if editor == None: + continue + edits = editor['edits'] + monthly_edits = determine_edits_by_month(edits) + edits = sort_edits(edits) + edit_count = len(edits) + new_wikipedian = edits[9]['date'] + first_edit = edits[0]['date'] + final_edit = edits[-1]['date'] + edits_by_year = determine_edits_by_year(edits) + articles_by_year = determine_articles_by_year(edits) + + edits = edits[:10] + + output.insert({'editor': id, 'edits': edits, + 'edits_by_year': edits_by_year, + 'new_wikipedian': new_wikipedian, + 'edit_count': edit_count, + 'final_edit': final_edit, + 'first_edit': first_edit, + 'articles_by_year': articles_by_year, + 'monthly_edits': monthly_edits}) + print 'Items left: %s' % input_queue.qsize() + except Empty: + break + + +def run_optimize_editors(dbname): + ids = construct_datasets.retrieve_editor_ids_mongo(dbname, 'editors') + kwargs = {'definition': 'traditional', + 'pbar': True, + 'dbname': 'enwiki', + 'nr_input_processors': 1, + 'nr_output_processors': 0, + 'poison_pill': False + } + print len(ids) + ids = list(ids) + chunks = dict(0, ids) + pc.build_scaffolding(pc.load_queue, optimize_editors, chunks, False, False, **kwargs) + + +def debug_optimize_editors(dbname): + ids = construct_datasets.retrieve_editor_ids_mongo(dbname, 'editors') + q = pc.load_queue(ids) + kwargs = {'definition': 'traditional', + 'dbname': dbname + } + optimize_editors(q, False, True, kwargs) + + +if __name__ == '__main__': + #debug_optimize_editors('test') + run_optimize_editors('enwiki') \ No newline at end of file Property changes on: trunk/tools/editor_trends/etl/optimize_editors.py ___________________________________________________________________ Added: svn:eol-style + native Added: trunk/tools/editor_trends/etl/xml2pig.py =================================================================== --- trunk/tools/editor_trends/etl/xml2pig.py (rev 0) +++ trunk/tools/editor_trends/etl/xml2pig.py 2010-11-16 23:13:29 UTC (rev 76846) @@ -0,0 +1,30 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +''' +Copyright (C) 2010 by Diederik van Liere (dvanli...@gmail.com) +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License version 2 +as published by the Free Software Foundation. +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +See the GNU General Public License for more details, at +http://www.fsf.org/licenses/gpl.html +''' + +__author__ = '''\n'''.join(['Diederik van Liere (dvanli...@gmail.com)', ]) +__author__email = 'dvanliere at gmail dot com' +__date__ = '2010-11-15' +__version__ = '0.1' + +import sys +sys.path.append('..') + +import os +import xml.etree.cElementTree as cElementTree + +import configuration +settings = configuration.Settings() +import split_settings.input_filename + + _______________________________________________ MediaWiki-CVS mailing list MediaWiki-CVS@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-cvs