Mforns has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/192319

Change subject: [WIP]
......................................................................

[WIP]

Bug: T89251
Change-Id: I5885cd85499501741b78fbbc95225939dc46b329
---
A reportupdater/README.md
A reportupdater/reportupdater/__init__.py
A reportupdater/reportupdater/executor.py
A reportupdater/reportupdater/reader.py
A reportupdater/reportupdater/report.py
A reportupdater/reportupdater/selector.py
A reportupdater/reportupdater/update_reports.py
A reportupdater/reportupdater/updater.py
8 files changed, 378 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/limn-mobile-data 
refs/changes/19/192319/1

diff --git a/reportupdater/README.md b/reportupdater/README.md
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/reportupdater/README.md
diff --git a/reportupdater/reportupdater/__init__.py 
b/reportupdater/reportupdater/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/reportupdater/reportupdater/__init__.py
diff --git a/reportupdater/reportupdater/executor.py 
b/reportupdater/reportupdater/executor.py
new file mode 100644
index 0000000..e3e5cf5
--- /dev/null
+++ b/reportupdater/reportupdater/executor.py
@@ -0,0 +1,65 @@
+
+import MySQLdb
+from datetime import datetime
+
+
+TIMESTAMP_FORMAT = '%Y%m%d%H%M%S'
+DATE_FORMAT = '%Y-%m-%d'
+
+
+class Executor(object):
+
+    def __init__(self, selector, config):
+        self.selector = selector
+        self.config = config
+
+    def run(self):
+        connections = {}
+        for report in self.selector.run():
+            sql_query = self.instantiate_sql(report)
+            if report.db_key not in connections:
+                db_config = self.config['databases'][report.db_key]
+                connections[report.db_key] = self.create_connection(db_config)
+            connection = connections[report.db_key]
+            report.result = self.execute_sql(sql_query, connection)
+            yield report
+
+    def instantiate_sql(self, report):
+        if report.is_timeboxed:
+            return report.sql_template.format(
+                from_timestamp=report.start.strftime(TIMESTAMP_FORMAT),
+                to_timestamp=report.end.strftime(TIMESTAMP_FORMAT),
+            )
+        else:
+            return report.sql_template
+
+    def create_connection(self, db_config):
+        return 'fake_connection'
+        # return MySQLdb.connect(
+        #     host=db_config['host'],
+        #     port=db_config['port'],
+        #     read_default_file=db_config['creds_file'],
+        #     db=db_config['db'],
+        #     charset='utf8',
+        #     use_unicode=True
+        # )
+
+    def execute_sql(self, sql_query, connection):
+        return {
+            'header': [u'timestamp', u'value1', u'value2'],
+            'data': {datetime.strptime('2015-02-23', '%Y-%m-%d'): 
[u'2015-02-23', u'11', u'22']}
+        }
+        # cursor = connection.cursor()
+        # try:
+        #     cursor.execute(sql_query)
+        #     rows = cursor.fetchall()
+        #     header = [field[0] for field in cursor.description]
+        # except Exception, e:
+        #     pass # error!
+        # finally:
+        #     cursor.close()
+        # data = {}
+        # for row in rows:
+        #     date = datetime.strptime(row[0], DATE_FORMAT)
+        #     data[date] = row
+        # return {'header': header, 'data': data}
diff --git a/reportupdater/reportupdater/reader.py 
b/reportupdater/reportupdater/reader.py
new file mode 100644
index 0000000..38d76ce
--- /dev/null
+++ b/reportupdater/reportupdater/reader.py
@@ -0,0 +1,78 @@
+
+import os
+import io
+from datetime import datetime
+from report import Report
+
+
+DATE_FORMAT = '%Y-%m-%d'
+
+
+class Reader(object):
+
+    def __init__(self, config):
+        self.config = config
+
+    def run(self):
+        for report_key, report_config in self.config['reports'].iteritems():
+            report = self.create_report(report_key, report_config)
+            yield report
+
+    def create_report(self, report_key, report_config):
+        report = Report()
+        report.key = report_key
+        report.frequency = self.get_frequency(report_config)
+        report.granularity = self.get_granularity(report_config)
+        report.is_timeboxed = self.get_is_timeboxed(report_config)
+        report.first_date = self.get_first_date(report_config)
+        report.db_key = self.config['defaults']['db']
+        report.sql_template = self.get_sql_template(report_key)
+        report.previous_result = self.get_previous_result(report_key)
+        return report
+
+    def get_first_date(self, report_config):
+        try:
+            return datetime.strptime(report_config['starts'], DATE_FORMAT)
+        except:
+            pass # error
+
+    def get_frequency(self, report_config):
+        if report_config['frequency'] == 'hourly':
+            return 'hours'
+        elif report_config['frequency'] == 'daily':
+            return 'days'
+        else:
+            pass # error
+
+    def get_granularity(self, report_config):
+        if report_config['frequency'] == 'hourly':
+            return 'days'
+        elif report_config['frequency'] == 'daily':
+            return 'months'
+        else:
+            pass # error
+
+    def get_is_timeboxed(self, report_config):
+        return 'timeboxed' in report_config and report_config['timeboxed']
+
+    def get_sql_template(self, report_key):
+        sql_template_path = os.path.join(self.config['sql_folder'], report_key 
+ '.sql')
+        with io.open(sql_template_path, encoding='utf-8') as sql_template_file:
+            return sql_template_file.read()
+        # handle errors
+
+    def get_previous_result(self, report_key):
+        previous_result = {'header': None, 'data': None}
+        output_file_path = os.path.join(self.config['output_folder'], 
report_key + '.csv')
+        if os.path.exists(output_file_path):
+            with io.open(output_file_path, encoding='utf-8') as output_file:
+                lines = output_file.readlines()
+            header = lines.pop(0).strip().split(',')
+            data = {}
+            for line in lines:
+                date_str = line.split(',')[0]
+                date = datetime.strptime(date_str, DATE_FORMAT)
+                data[date] = line.strip().split(',')
+            previous_result['header'] = header
+            previous_result['data'] = data
+        return previous_result
diff --git a/reportupdater/reportupdater/report.py 
b/reportupdater/reportupdater/report.py
new file mode 100644
index 0000000..0b06968
--- /dev/null
+++ b/reportupdater/reportupdater/report.py
@@ -0,0 +1,79 @@
+
+import re
+
+
+DATE_FORMAT = '%Y-%m-%d'
+
+
+class Report(object):
+
+    def __init__(self):
+        self.key = None
+        self.frequency = None
+        self.granularity = None
+        self.is_timeboxed = False
+        self.first_date = None
+        self.start = None
+        self.end = None
+        self.db_key = None
+        self.sql_template = None
+        self.previous_result = {'header': None, 'data': {}}
+        self.result = {'header': None, 'data': {}}
+
+    def __copy__(self):
+        other = Report()
+        other.key = self.key
+        other.frequency = self.frequency
+        other.granularity = self.granularity
+        other.is_timeboxed = self.is_timeboxed
+        other.first_date = self.first_date
+        other.start = self.start
+        other.end = self.end
+        other.db_key = self.db_key
+        other.sql_template = self.sql_template
+        other.previous_result = {
+            'header': self.previous_result['header'],
+            'data': self.previous_result['data'].copy()
+        }
+        other.result = {
+            'header': self.result['header'],
+            'data': self.result['data'].copy()
+        }
+        return other
+
+    def __str__(self):
+        return (
+            '<Report' +
+            ' key=' + self.key +
+            ' frequency=' + str(self.frequency) +
+            ' granularity=' + str(self.granularity) +
+            ' is_timeboxed=' + str(self.is_timeboxed) +
+            ' first_date=' + self.format_date(self.first_date) +
+            ' start=' + self.format_date(self.start) +
+            ' end=' + self.format_date(self.end) +
+            ' db_key=' + str(self.db_key) +
+            ' sql_template=' + self.format_sql(self.sql_template) +
+            ' previous_result=' + self.format_result(self.previous_result) +
+            ' result=' + self.format_result(self.result) +
+            '>'
+        )
+
+    def format_date(self, to_format):
+        if to_format:
+            return to_format.strftime(DATE_FORMAT)
+        else:
+            return str(None)
+
+    def format_result(self, to_format):
+        if to_format['header']:
+            header_line = ','.join(to_format['header'])
+        else:
+            header_line = str(None)
+        data_lines = []
+        for date, row in to_format['data'].iteritems():
+            data_lines.append(','.join(row))
+        return '{header: ' + header_line + ', data: ' + str(data_lines) + '}'
+
+    def format_sql(self, to_format):
+        # TODO: remove comments
+        return re.sub(r'\s+', ' ', to_format).strip()
diff --git a/reportupdater/reportupdater/selector.py 
b/reportupdater/reportupdater/selector.py
new file mode 100644
index 0000000..8f84abf
--- /dev/null
+++ b/reportupdater/reportupdater/selector.py
@@ -0,0 +1,66 @@
+
+from copy import copy
+from datetime import datetime
+from dateutil.relativedelta import relativedelta
+
+
+class Selector(object):
+
+    def __init__(self, reader, config):
+        self.reader = reader
+        self.config = config
+
+    def run(self):
+        now = self.config['current_exec_time']
+        for report in self.reader.run():
+            if self.is_time_to_execute(report, now):
+                if report.is_timeboxed:
+
+                    first_date = self.truncate_date(report.first_date, 
report.granularity)
+                    current_date = self.truncate_date(now, report.granularity)
+                    increment = self.get_increment(report.granularity)
+                    already_done_dates = report.previous_result['data'].keys()
+
+                    for start in self.get_all_start_dates(first_date, 
current_date, increment):
+                        if start == current_date or start not in 
already_done_dates:
+                            report_copy = copy(report)
+                            report_copy.start = start
+                            report_copy.end = start + increment
+                            yield report_copy
+                else:
+                    yield report
+
+    def is_time_to_execute(self, report, now):
+        last_exec_time = self.config['last_exec_time']
+        if last_exec_time:
+            t1 = self.truncate_date(last_exec_time, report.frequency)
+        else:
+            t1 = None
+        t2 = self.truncate_date(now, report.frequency)
+        return t1 != t2
+
+    def truncate_date(self, date, truncate_to):
+        if truncate_to == 'hours':
+            return date.replace(minute=0, second=0, microsecond=0)
+        if truncate_to == 'days':
+            return date.replace(hour=0, minute=0, second=0, microsecond=0)
+        elif truncate_to == 'months':
+            return date.replace(day=0, hour=0, minute=0, second=0, 
microsecond=0)
+        else:
+            pass # error!
+
+    def get_increment(self, period):
+        if period == 'hours':
+            return relativedelta(hours=1)
+        elif period == 'days':
+            return relativedelta(days=1)
+        elif period == 'months':
+            return relativedelta(months=1)
+        else:
+            pass # error!
+
+    def get_all_start_dates(self, first_date, current_date, increment):
+        current_start = first_date
+        while current_start <= current_date:
+            yield current_start
+            current_start += increment
diff --git a/reportupdater/reportupdater/update_reports.py 
b/reportupdater/reportupdater/update_reports.py
new file mode 100644
index 0000000..2e550bd
--- /dev/null
+++ b/reportupdater/reportupdater/update_reports.py
@@ -0,0 +1,63 @@
+
+import os
+import io
+from datetime import datetime
+from reader import Reader
+from selector import Selector
+from executor import Executor
+from updater import Updater
+
+
+TIMESTAMP_FORMAT = '%Y-%m-%dT%H:%M:%S'
+
+
+def main():
+    current_exec_time = datetime.now()
+    last_exec_time = replace_exec_time('history.txt', current_exec_time)
+    config = {
+        'sql_folder': '/home/mforns/Projects/limn-edit-data/edit',
+        'output_folder': '/home/mforns/Projects/limn-edit-data/datafiles',
+        'databases': {
+            'el': {
+                'host': 'analytics-store.eqiad.wmnet',
+                'port': 3306,
+                'creds_file': '/a/.my.cnf.research',
+                'db': 'log'
+            }
+        },
+        'reports': {
+            'save_rates_low_noise': {
+                'starts': '2015-01-01',
+                'frequency': 'hourly',
+                'timeboxed': True
+            }
+        },
+        'defaults': {
+            'db': 'el'
+        },
+        # 'db_lag': 10,
+        'current_exec_time': current_exec_time,
+        'last_exec_time': last_exec_time
+    }
+    reader = Reader(config)
+    selector = Selector(reader, config)
+    executor = Executor(selector, config)
+    updater = Updater(executor, config)
+    updater.run()
+
+
+def replace_exec_time(history_file_path, current_time):
+    if os.path.exists(history_file_path):
+        with io.open(history_file_path) as history_file:
+            last_time_str = history_file.read().strip()
+            last_time = datetime.strptime(last_time_str, TIMESTAMP_FORMAT)
+    else:
+        last_time = None
+    with io.open(history_file_path, 'w') as history_file:
+        current_time_str = current_time.strftime(TIMESTAMP_FORMAT)
+        history_file.write(unicode(current_time_str))
+    return last_time
+
+
+if __name__ == '__main__':
+    main()
diff --git a/reportupdater/reportupdater/updater.py 
b/reportupdater/reportupdater/updater.py
new file mode 100644
index 0000000..c4cf767
--- /dev/null
+++ b/reportupdater/reportupdater/updater.py
@@ -0,0 +1,27 @@
+
+import os
+import csv
+
+
+class Updater(object):
+
+    def __init__(self, executor, config):
+        self.executor = executor
+        self.config = config
+
+    def run(self):
+        for report in self.executor.run():
+            updated_data = report.previous_result['data'].copy()
+            for date, rows in report.result['data'].iteritems():
+                updated_data[date] = rows
+            self.write_result(updated_data, report)
+
+    def write_result(self, result_data, report):
+        result_dates = sorted(result_data.keys())
+        result_rows = [result_data[date] for date in result_dates]
+
+        output_file_path = os.path.join(self.config['output_folder'], 
report.key + '.csv')
+        with open(output_file_path, 'w') as output_file:
+            csv_writer = csv.writer(output_file)
+            csv_writer.writerow(report.result['header'])
+            csv_writer.writerows(result_rows)

-- 
To view, visit https://gerrit.wikimedia.org/r/192319
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I5885cd85499501741b78fbbc95225939dc46b329
Gerrit-PatchSet: 1
Gerrit-Project: analytics/limn-mobile-data
Gerrit-Branch: master
Gerrit-Owner: Mforns <mfo...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to