Mforns has uploaded a new change for review. https://gerrit.wikimedia.org/r/192319
Change subject: [WIP] ...................................................................... [WIP] Bug: T89251 Change-Id: I5885cd85499501741b78fbbc95225939dc46b329 --- A reportupdater/README.md A reportupdater/reportupdater/__init__.py A reportupdater/reportupdater/executor.py A reportupdater/reportupdater/reader.py A reportupdater/reportupdater/report.py A reportupdater/reportupdater/selector.py A reportupdater/reportupdater/update_reports.py A reportupdater/reportupdater/updater.py 8 files changed, 378 insertions(+), 0 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/analytics/limn-mobile-data refs/changes/19/192319/1 diff --git a/reportupdater/README.md b/reportupdater/README.md new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/reportupdater/README.md diff --git a/reportupdater/reportupdater/__init__.py b/reportupdater/reportupdater/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/reportupdater/reportupdater/__init__.py diff --git a/reportupdater/reportupdater/executor.py b/reportupdater/reportupdater/executor.py new file mode 100644 index 0000000..e3e5cf5 --- /dev/null +++ b/reportupdater/reportupdater/executor.py @@ -0,0 +1,65 @@ + +import MySQLdb +from datetime import datetime + + +TIMESTAMP_FORMAT = '%Y%m%d%H%M%S' +DATE_FORMAT = '%Y-%m-%d' + + +class Executor(object): + + def __init__(self, selector, config): + self.selector = selector + self.config = config + + def run(self): + connections = {} + for report in self.selector.run(): + sql_query = self.instantiate_sql(report) + if report.db_key not in connections: + db_config = self.config['databases'][report.db_key] + connections[report.db_key] = self.create_connection(db_config) + connection = connections[report.db_key] + report.result = self.execute_sql(sql_query, connection) + yield report + + def instantiate_sql(self, report): + if report.is_timeboxed: + return report.sql_template.format( + from_timestamp=report.start.strftime(TIMESTAMP_FORMAT), + to_timestamp=report.end.strftime(TIMESTAMP_FORMAT), + ) + else: + return report.sql_template + + def create_connection(self, db_config): + return 'fake_connection' + # return MySQLdb.connect( + # host=db_config['host'], + # port=db_config['port'], + # read_default_file=db_config['creds_file'], + # db=db_config['db'], + # charset='utf8', + # use_unicode=True + # ) + + def execute_sql(self, sql_query, connection): + return { + 'header': [u'timestamp', u'value1', u'value2'], + 'data': {datetime.strptime('2015-02-23', '%Y-%m-%d'): [u'2015-02-23', u'11', u'22']} + } + # cursor = connection.cursor() + # try: + # cursor.execute(sql_query) + # rows = cursor.fetchall() + # header = [field[0] for field in cursor.description] + # except Exception, e: + # pass # error! + # finally: + # cursor.close() + # data = {} + # for row in rows: + # date = datetime.strptime(row[0], DATE_FORMAT) + # data[date] = row + # return {'header': header, 'data': data} diff --git a/reportupdater/reportupdater/reader.py b/reportupdater/reportupdater/reader.py new file mode 100644 index 0000000..38d76ce --- /dev/null +++ b/reportupdater/reportupdater/reader.py @@ -0,0 +1,78 @@ + +import os +import io +from datetime import datetime +from report import Report + + +DATE_FORMAT = '%Y-%m-%d' + + +class Reader(object): + + def __init__(self, config): + self.config = config + + def run(self): + for report_key, report_config in self.config['reports'].iteritems(): + report = self.create_report(report_key, report_config) + yield report + + def create_report(self, report_key, report_config): + report = Report() + report.key = report_key + report.frequency = self.get_frequency(report_config) + report.granularity = self.get_granularity(report_config) + report.is_timeboxed = self.get_is_timeboxed(report_config) + report.first_date = self.get_first_date(report_config) + report.db_key = self.config['defaults']['db'] + report.sql_template = self.get_sql_template(report_key) + report.previous_result = self.get_previous_result(report_key) + return report + + def get_first_date(self, report_config): + try: + return datetime.strptime(report_config['starts'], DATE_FORMAT) + except: + pass # error + + def get_frequency(self, report_config): + if report_config['frequency'] == 'hourly': + return 'hours' + elif report_config['frequency'] == 'daily': + return 'days' + else: + pass # error + + def get_granularity(self, report_config): + if report_config['frequency'] == 'hourly': + return 'days' + elif report_config['frequency'] == 'daily': + return 'months' + else: + pass # error + + def get_is_timeboxed(self, report_config): + return 'timeboxed' in report_config and report_config['timeboxed'] + + def get_sql_template(self, report_key): + sql_template_path = os.path.join(self.config['sql_folder'], report_key + '.sql') + with io.open(sql_template_path, encoding='utf-8') as sql_template_file: + return sql_template_file.read() + # handle errors + + def get_previous_result(self, report_key): + previous_result = {'header': None, 'data': None} + output_file_path = os.path.join(self.config['output_folder'], report_key + '.csv') + if os.path.exists(output_file_path): + with io.open(output_file_path, encoding='utf-8') as output_file: + lines = output_file.readlines() + header = lines.pop(0).strip().split(',') + data = {} + for line in lines: + date_str = line.split(',')[0] + date = datetime.strptime(date_str, DATE_FORMAT) + data[date] = line.strip().split(',') + previous_result['header'] = header + previous_result['data'] = data + return previous_result diff --git a/reportupdater/reportupdater/report.py b/reportupdater/reportupdater/report.py new file mode 100644 index 0000000..0b06968 --- /dev/null +++ b/reportupdater/reportupdater/report.py @@ -0,0 +1,79 @@ + +import re + + +DATE_FORMAT = '%Y-%m-%d' + + +class Report(object): + + def __init__(self): + self.key = None + self.frequency = None + self.granularity = None + self.is_timeboxed = False + self.first_date = None + self.start = None + self.end = None + self.db_key = None + self.sql_template = None + self.previous_result = {'header': None, 'data': {}} + self.result = {'header': None, 'data': {}} + + def __copy__(self): + other = Report() + other.key = self.key + other.frequency = self.frequency + other.granularity = self.granularity + other.is_timeboxed = self.is_timeboxed + other.first_date = self.first_date + other.start = self.start + other.end = self.end + other.db_key = self.db_key + other.sql_template = self.sql_template + other.previous_result = { + 'header': self.previous_result['header'], + 'data': self.previous_result['data'].copy() + } + other.result = { + 'header': self.result['header'], + 'data': self.result['data'].copy() + } + return other + + def __str__(self): + return ( + '<Report' + + ' key=' + self.key + + ' frequency=' + str(self.frequency) + + ' granularity=' + str(self.granularity) + + ' is_timeboxed=' + str(self.is_timeboxed) + + ' first_date=' + self.format_date(self.first_date) + + ' start=' + self.format_date(self.start) + + ' end=' + self.format_date(self.end) + + ' db_key=' + str(self.db_key) + + ' sql_template=' + self.format_sql(self.sql_template) + + ' previous_result=' + self.format_result(self.previous_result) + + ' result=' + self.format_result(self.result) + + '>' + ) + + def format_date(self, to_format): + if to_format: + return to_format.strftime(DATE_FORMAT) + else: + return str(None) + + def format_result(self, to_format): + if to_format['header']: + header_line = ','.join(to_format['header']) + else: + header_line = str(None) + data_lines = [] + for date, row in to_format['data'].iteritems(): + data_lines.append(','.join(row)) + return '{header: ' + header_line + ', data: ' + str(data_lines) + '}' + + def format_sql(self, to_format): + # TODO: remove comments + return re.sub(r'\s+', ' ', to_format).strip() diff --git a/reportupdater/reportupdater/selector.py b/reportupdater/reportupdater/selector.py new file mode 100644 index 0000000..8f84abf --- /dev/null +++ b/reportupdater/reportupdater/selector.py @@ -0,0 +1,66 @@ + +from copy import copy +from datetime import datetime +from dateutil.relativedelta import relativedelta + + +class Selector(object): + + def __init__(self, reader, config): + self.reader = reader + self.config = config + + def run(self): + now = self.config['current_exec_time'] + for report in self.reader.run(): + if self.is_time_to_execute(report, now): + if report.is_timeboxed: + + first_date = self.truncate_date(report.first_date, report.granularity) + current_date = self.truncate_date(now, report.granularity) + increment = self.get_increment(report.granularity) + already_done_dates = report.previous_result['data'].keys() + + for start in self.get_all_start_dates(first_date, current_date, increment): + if start == current_date or start not in already_done_dates: + report_copy = copy(report) + report_copy.start = start + report_copy.end = start + increment + yield report_copy + else: + yield report + + def is_time_to_execute(self, report, now): + last_exec_time = self.config['last_exec_time'] + if last_exec_time: + t1 = self.truncate_date(last_exec_time, report.frequency) + else: + t1 = None + t2 = self.truncate_date(now, report.frequency) + return t1 != t2 + + def truncate_date(self, date, truncate_to): + if truncate_to == 'hours': + return date.replace(minute=0, second=0, microsecond=0) + if truncate_to == 'days': + return date.replace(hour=0, minute=0, second=0, microsecond=0) + elif truncate_to == 'months': + return date.replace(day=0, hour=0, minute=0, second=0, microsecond=0) + else: + pass # error! + + def get_increment(self, period): + if period == 'hours': + return relativedelta(hours=1) + elif period == 'days': + return relativedelta(days=1) + elif period == 'months': + return relativedelta(months=1) + else: + pass # error! + + def get_all_start_dates(self, first_date, current_date, increment): + current_start = first_date + while current_start <= current_date: + yield current_start + current_start += increment diff --git a/reportupdater/reportupdater/update_reports.py b/reportupdater/reportupdater/update_reports.py new file mode 100644 index 0000000..2e550bd --- /dev/null +++ b/reportupdater/reportupdater/update_reports.py @@ -0,0 +1,63 @@ + +import os +import io +from datetime import datetime +from reader import Reader +from selector import Selector +from executor import Executor +from updater import Updater + + +TIMESTAMP_FORMAT = '%Y-%m-%dT%H:%M:%S' + + +def main(): + current_exec_time = datetime.now() + last_exec_time = replace_exec_time('history.txt', current_exec_time) + config = { + 'sql_folder': '/home/mforns/Projects/limn-edit-data/edit', + 'output_folder': '/home/mforns/Projects/limn-edit-data/datafiles', + 'databases': { + 'el': { + 'host': 'analytics-store.eqiad.wmnet', + 'port': 3306, + 'creds_file': '/a/.my.cnf.research', + 'db': 'log' + } + }, + 'reports': { + 'save_rates_low_noise': { + 'starts': '2015-01-01', + 'frequency': 'hourly', + 'timeboxed': True + } + }, + 'defaults': { + 'db': 'el' + }, + # 'db_lag': 10, + 'current_exec_time': current_exec_time, + 'last_exec_time': last_exec_time + } + reader = Reader(config) + selector = Selector(reader, config) + executor = Executor(selector, config) + updater = Updater(executor, config) + updater.run() + + +def replace_exec_time(history_file_path, current_time): + if os.path.exists(history_file_path): + with io.open(history_file_path) as history_file: + last_time_str = history_file.read().strip() + last_time = datetime.strptime(last_time_str, TIMESTAMP_FORMAT) + else: + last_time = None + with io.open(history_file_path, 'w') as history_file: + current_time_str = current_time.strftime(TIMESTAMP_FORMAT) + history_file.write(unicode(current_time_str)) + return last_time + + +if __name__ == '__main__': + main() diff --git a/reportupdater/reportupdater/updater.py b/reportupdater/reportupdater/updater.py new file mode 100644 index 0000000..c4cf767 --- /dev/null +++ b/reportupdater/reportupdater/updater.py @@ -0,0 +1,27 @@ + +import os +import csv + + +class Updater(object): + + def __init__(self, executor, config): + self.executor = executor + self.config = config + + def run(self): + for report in self.executor.run(): + updated_data = report.previous_result['data'].copy() + for date, rows in report.result['data'].iteritems(): + updated_data[date] = rows + self.write_result(updated_data, report) + + def write_result(self, result_data, report): + result_dates = sorted(result_data.keys()) + result_rows = [result_data[date] for date in result_dates] + + output_file_path = os.path.join(self.config['output_folder'], report.key + '.csv') + with open(output_file_path, 'w') as output_file: + csv_writer = csv.writer(output_file) + csv_writer.writerow(report.result['header']) + csv_writer.writerows(result_rows) -- To view, visit https://gerrit.wikimedia.org/r/192319 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I5885cd85499501741b78fbbc95225939dc46b329 Gerrit-PatchSet: 1 Gerrit-Project: analytics/limn-mobile-data Gerrit-Branch: master Gerrit-Owner: Mforns <mfo...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits