http://www.mediawiki.org/wiki/Special:Code/MediaWiki/93712
Revision: 93712 Author: rfaulk Date: 2011-08-02 09:01:17 +0000 (Tue, 02 Aug 2011) Log Message: ----------- built dataloader to process category hierarchies Modified Paths: -------------- trunk/tools/wsor/scripts/classes/WSORSlaveDataLoader.py Modified: trunk/tools/wsor/scripts/classes/WSORSlaveDataLoader.py =================================================================== --- trunk/tools/wsor/scripts/classes/WSORSlaveDataLoader.py 2011-08-02 08:35:23 UTC (rev 93711) +++ trunk/tools/wsor/scripts/classes/WSORSlaveDataLoader.py 2011-08-02 09:01:17 UTC (rev 93712) @@ -1,6 +1,6 @@ """ -WSOR dataloader class for the MySQL slave of enwiki +WSOR dataloader class for the MySQL slave of enwiki and user dbs """ @@ -13,11 +13,17 @@ """ Import python base modules """ import sys, getopt, re, datetime, logging, MySQLdb, settings +import networkx as nx """ Import Analytics modules """ from Fundraiser_Tools.classes.DataLoader import DataLoader +""" Configure the logger """ +LOGGING_STREAM = sys.stderr +logging.basicConfig(level=logging.DEBUG, stream=LOGGING_STREAM, format='%(asctime)s %(levelname)-8s %(message)s', datefmt='%b-%d %H:%M:%S') + + """ Inherits DataLoader @@ -27,10 +33,12 @@ class WSORSlaveDataLoader(DataLoader): def __init__(self): - - """ Configure the logger """ - LOGGING_STREAM = sys.stderr - logging.basicConfig(level=logging.DEBUG, stream=LOGGING_STREAM, format='%(asctime)s %(levelname)-8s %(message)s', datefmt='%b-%d %H:%M:%S') + + self.init_db() + + def __del__(self): + + self.close_db() """ @@ -42,9 +50,9 @@ """ Establish connection """ try: - self._db_ = MySQLdb.connect(host=settings.__db_server__, user=settings.__user__, db=settings.__db__, port=settings.__db_port__, passwd=settings.__pass__) + self._db_ = MySQLdb.connect(host=settings.__db_server__, user=settings.__user__, db=settings.__db__, port=settings.__db_port__, passwd=settings.__pass__) self._db_enwiki_ = MySQLdb.connect(host=settings.__db_server__, user=settings.__user__, db=settings.__db_enwikislave__, port=settings.__db_port__, passwd=settings.__pass__) - logging.info('Successfully connected.\n') + logging.info('Successfully connected.') except: logging.DEBUG('Could not establish a connection to %s @ %s : %s' % (settings.__user__, settings.__db_server__, settings.__db__)) return @@ -66,6 +74,255 @@ """ Inherits WSORSlaveDataLoader + DataLoader class for querying category tables + +""" +class CategoryLoader(WSORSlaveDataLoader): + + def __init__(self): + + self._query_names_['build_subcat_tbl'] = "CREATE TABLE rfaulk.categorylinks_cp select * from enwiki.categorylinks where cl_type = 'subcat'" + self._query_names_['drop_subcat_tbl'] = "drop table if exists rfaulk.categorylinks_cp;" + self._query_names_['get_first_rec'] = "select cl_from from categorylinks_cp limit 1" + self._query_names_['get_category_page_title'] = "select page_title from enwiki.page where page_id = %s" + self._query_names_['get_category_page_id'] = "select page_id from enwiki.page where page_title = '%s' and page_namespace = 14" + self._query_names_['get_subcategories'] = "select cl_to from categorylinks_cp where cl_from = %s" + self._query_names_['delete_from_recs'] = "delete from rfaulk.categorylinks_cp where cl_from = %s" + self._query_names_['is_empty'] = "select * from rfaulk.categorylinks_cp limit 1" + + WSORSlaveDataLoader.__init__(self) + logging.info('Creating CategoryLoader') + + """ + Retrives the integer page id + """ + def get_page_id(self, page_title): + + try: + sql = self._query_names_['get_category_page_id'] % page_title + #logging.info('Executing: ' + sql) + results = self.execute_SQL(sql) + id = int(results[0][0]) + + except Exception as inst: + + logging.error('Could not retrieve page_id.') + return -1 + + return id + + """ + Retrives the string page title + """ + def get_page_title(self, page_id): + + try: + sql = self._query_names_['get_category_page_title'] % page_id + #logging.info('Executing: ' + sql) + results = self.execute_SQL(sql) + title = str(results[0][0]) + + except Exception as inst: + + logging.error('Could not retrieve page_title.') + return -1 + + return title + + """ + Look at the first category in caegorylinks_cp + """ + def get_first_record_from_category_links(self): + + try: + + sql = self._query_names_['get_first_rec'] + #logging.info('Executing: ' + sql) + results = self.execute_SQL(sql) + + #logging.info('Retrieved first row from rfaulk.categorylinks_cp.') + category_page_id = int(results[0][0]) + category_page_title = self.get_page_title(category_page_id) + + except Exception as inst: + + logging.error('Could not retrieve page_title.') + return '' + + return category_page_title + + + """ + Finds and returns a list of sub categories + """ + def get_subcategories(self, category_title): + + subcategories = list() + + # category_upper, category_lower, category_camel = self.normalize_field_cl_from(category) + + """ Retrieve the sub categories and add to list """ + category_page_id = self.get_page_id(category_title) + category_page_id_str = str(category_page_id) + sql_select = self._query_names_['get_subcategories'] % category_page_id_str + sql_delete = self._query_names_['delete_from_recs'] % category_page_id_str + + try: + #logging.info('Executing: ' + sql_select) + results = self.execute_SQL(sql_select) + #logging.info('Retrieved sub-categories of %s.' % category_page_id_str) + + for row in results: + subcategories.append(row[0]) + + #logging.info('Executing: ' + sql_delete) + self.execute_SQL(sql_delete) + #logging.info('Removed references from the category with page_id %s.' % category_page_id_str) + + except Exception as inst: + + logging.error('Could not retrieve sub-categories.') + return [] + + return subcategories + + + + """ + Recursively builds subtree of categories + """ + def build_category_tree(self, directed_graph, category_title): + + # logging.info('Creating directed links for category: %s' % category_title) + + """ Get sub-categories """ + sub_categories = self.get_subcategories(category_title) + + """ Build nodes for each """ + for sub_cat in sub_categories: + + directed_graph.add_weighted_edges_from([(category_title, sub_cat,1)]) + self.build_category_tree(directed_graph, sub_cat) + + return directed_graph + + + """ + Execution entry point of the class - builds a full category hierarchy from categorylinks + """ + def extract_hierarchy(self): + + #self.drop_category_links_cp_table() + #self.create_category_links_cp_table() + + """ Create graph """ + logging.info('Initializing directed graph...') + directed_graph = nx.DiGraph() + + """ while there are rows left in categorylinks_cp """ + while(not self.is_empty()): + + category_title = self.get_first_record_from_category_links() + self.build_category_tree(directed_graph, category_title) + directed_graph.add_weighted_edges_from([('ALL', category_title, 1)]) + + logging.info('Category links finished processing.') + + return directed_graph + + + """ drop rfaulk.categorylinks_cp """ + def drop_category_links_cp_table(self): + + try: + sql = self._query_names_['drop_subcat_tbl'] + logging.info('Executing: ' + sql) + + self._cur_.execute(sql) + logging.info('Dropped rfaulk.categorylinks_cp table.') + + except Exception as inst: + + logging.error('Could not execute: %s\n' % sql) + logging.error(str(type(inst))) # the exception instance + logging.error(str(inst.args)) # arguments stored in .args + logging.error(inst.__str__()) # __str__ allows args to printed directly + + + """ create rfaulk.categorylinks_cp """ + def create_category_links_cp_table(self): + + try: + sql = self._query_names_['build_subcat_tbl'] + logging.info('Executing: ' + sql) + + self._cur_.execute(sql) + logging.info('Created rfaulk.categorylinks_cp table from enwiki.categorylinks_cp.') + + except Exception as inst: + + logging.error('Could not execute: %s\n' % sql) + logging.error(str(type(inst))) # the exception instance + logging.error(str(inst.args)) # arguments stored in .args + logging.error(inst.__str__()) # __str__ allows args to printed directly + + + + """ + Are there any records remaining in rfaulk.categorylinks_cp ?? + """ + def is_empty(self): + + sql = self._query_names_['is_empty'] + + try: + self._cur_.execute(sql) + rows = self._cur_.fetchone() + + except Exception as inst: + + logging.error('Could not execute: %s\n' % sql) + logging.error(str(type(inst))) # the exception instance + logging.error(str(inst.args)) # arguments stored in .args + logging.error(inst.__str__()) # __str__ allows args to printed directly + + return True + + if len(rows) > 0: + return False + else: + return True + + """ + The cl_from key is formatted in uppercase with non-uniform whitespace + + def normalize_field_cl_from(self, category): + + category = category.lower() + words = category.split('\n')[0] # only keep text before the a carraige return + words = words.split() + len_words = len(words) + + category = '' + category_camel = '' + + + for i in range(len_words - 1): + category = category + words[i] + ' ' + category_camel = category_camel + words[i][0].upper() + words[i][1:] + ' ' + + category = category + words[len_words - 1] + category_camel = category_camel + words[len_words - 1][0].upper() + words[len_words - 1][1:] + + category_upper = category.upper() + category_lower = category.lower() + + return category_upper, category_lower, category_camel + """ + +""" + Inherits WSORSlaveDataLoader + DataLoader class for vandal reversion related queries """ @@ -74,21 +331,21 @@ def __init__(self, query_key): - DataLoader.__init__(self) + WSORSlaveDataLoader.__init__(self) logging.info('Creating VadalLoader') """ DEFINE SQL queries """ self._query_names_['query_test'] = 'select count(*) from revert_20110115' - self._query_names_['query_vandal_count'] = 'select revision_id, username, user_id, sum(is_vandalism) from reverted_20110115 group by 1,2,3' - self._query_names_['query_total_reverts'] = 'select revision_id, username, user_id, sum(is_vandalism) from reverted_20110115' + self._query_names_['query_vandal_count'] = 'select revision_id, sum(is_vandalism) from revert_20110115 group by 1' + self._query_names_['query_total_reverts'] = '' """ ASSIGN query """ try: - _sql_ = self._query_names_[query_key] + self._sql_ = self._query_names_[query_key] except KeyError: logging.debug('Query does not exist\n') @@ -98,12 +355,11 @@ """ def run_query(self): - logging.info('Running VandalLoader') - self.init_db() try: - self._cur_.execute(self._query_names_['query_test']) + logging.info('Running VandalLoader') + self._cur_.execute(self._sql_) """ GET THE COLUMN NAMES FROM THE QUERY RESULTS """ self._col_names_ = list() @@ -116,9 +372,10 @@ except Exception as inst: - logging.debug(str(type(inst))) # the exception instance - logging.debug(str(inst.args)) # arguments stored in .args - logging.debug(inst.__str__()) # __str__ allows args to printed directly + logging.error('Could not execute: %s\n' % self._sql_) + logging.error(str(type(inst))) # the exception instance + logging.error(str(inst.args)) # arguments stored in .args + logging.error(inst.__str__()) # __str__ allows args to printed directly # self._db_.rollback() _______________________________________________ MediaWiki-CVS mailing list MediaWiki-CVS@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-cvs