Milimetric has submitted this change and it was merged.
Change subject: Add max_data_points option
......................................................................
Add max_data_points option
This feature allows to specify a maximum size for the report using
the config option 'max_data_points=N', where N is an integer >= 1.
So, if N=5, the report will only hold the last 5 data points.
Note that this depends on the granularity, N=5 means last 5 weeks
for weekly granularity, and last 5 days for daily granularity, etc.
Bug: T131849
Change-Id: I99683d3ac3c54361d75d6fe4a1c72f3070312d30
---
M reportupdater/reader.py
M reportupdater/report.py
M reportupdater/selector.py
M reportupdater/utils.py
M reportupdater/writer.py
M test/reader_test.py
M test/selector_test.py
M test/writer_test.py
8 files changed, 144 insertions(+), 74 deletions(-)
Approvals:
Milimetric: Verified; Looks good to me, approved
diff --git a/reportupdater/reader.py b/reportupdater/reader.py
index 1fb0a2c..3f4a52f 100644
--- a/reportupdater/reader.py
+++ b/reportupdater/reader.py
@@ -61,6 +61,7 @@
report.is_funnel = self.get_is_funnel(report_config)
report.first_date = self.get_first_date(report_config)
report.explode_by = self.get_explode_by(report_config)
+ report.max_data_points = self.get_max_data_points(report_config)
executable = self.get_executable(report_config) or report_key
if report.type == 'sql':
report.db_key = self.get_db_key(report_config)
@@ -154,6 +155,15 @@
return explode_by
+ def get_max_data_points(self, report_config):
+ if 'max_data_points' not in report_config:
+ return None
+ max_data_points = report_config['max_data_points']
+ if type(max_data_points) != int or max_data_points < 1:
+ raise ValueError('Max data points is not valid.')
+ return max_data_points
+
+
def get_executable(self, report_config):
if 'execute' not in report_config:
return None
diff --git a/reportupdater/report.py b/reportupdater/report.py
index c58c4ea..04c353e 100644
--- a/reportupdater/report.py
+++ b/reportupdater/report.py
@@ -27,6 +27,7 @@
self.sql_template = None
self.script = None
self.explode_by = {}
+ self.max_data_points = None
self.results = {'header': [], 'data': {}}
@@ -45,6 +46,7 @@
' sql_template=' + self.format_sql(self.sql_template) +
' script=' + str(self.script) +
' explode_by=' + str(self.explode_by) +
+ ' max_data_points=' + str(self.max_data_points) +
' results=' + self.format_results(self.results) +
'>'
)
diff --git a/reportupdater/selector.py b/reportupdater/selector.py
index c402b1f..94e690e 100644
--- a/reportupdater/selector.py
+++ b/reportupdater/selector.py
@@ -14,7 +14,7 @@
from datetime import datetime
from dateutil.relativedelta import relativedelta
from reader import Reader
-from utils import raise_critical, get_previous_results
+from utils import raise_critical, get_previous_results, get_increment
class Selector(object):
@@ -57,7 +57,7 @@
first_date = self.truncate_date(report.first_date, report.granularity)
lag_increment = relativedelta(seconds=report.lag)
- granularity_increment = self.get_increment(report.granularity)
+ granularity_increment = get_increment(report.granularity)
relative_now = now - lag_increment - granularity_increment
last_date = self.truncate_date(relative_now, report.granularity)
previous_results = get_previous_results(report, output_folder)
@@ -83,19 +83,6 @@
return self.truncate_date(date, 'days') - passed_weekdays
elif period == 'months':
return date.replace(day=1, hour=0, minute=0, second=0,
microsecond=0)
- else:
- raise ValueError('Period is not valid.')
-
-
- def get_increment(self, period):
- if period == 'hours':
- return relativedelta(hours=1)
- elif period == 'days':
- return relativedelta(days=1)
- elif period == 'weeks':
- return relativedelta(days=7)
- elif period == 'months':
- return relativedelta(months=1)
else:
raise ValueError('Period is not valid.')
diff --git a/reportupdater/utils.py b/reportupdater/utils.py
index 75ddf8a..21cdcec 100644
--- a/reportupdater/utils.py
+++ b/reportupdater/utils.py
@@ -9,6 +9,7 @@
import logging
from datetime import datetime
from collections import defaultdict
+from dateutil.relativedelta import relativedelta
DATE_FORMAT = '%Y-%m-%d'
@@ -94,3 +95,16 @@
def ensure_dir(dir_path):
if not os.path.exists(dir_path):
os.makedirs(dir_path)
+
+
+def get_increment(period):
+ if period == 'hours':
+ return relativedelta(hours=1)
+ elif period == 'days':
+ return relativedelta(days=1)
+ elif period == 'weeks':
+ return relativedelta(days=7)
+ elif period == 'months':
+ return relativedelta(months=1)
+ else:
+ raise ValueError('Period is not valid.')
diff --git a/reportupdater/writer.py b/reportupdater/writer.py
index 00dc290..731d7a5 100644
--- a/reportupdater/writer.py
+++ b/reportupdater/writer.py
@@ -8,9 +8,11 @@
import io
import csv
import logging
+from copy import copy, deepcopy
from executor import Executor
from utils import (raise_critical, get_previous_results,
- DATE_FORMAT, get_exploded_report_output_path)
+ DATE_FORMAT, get_exploded_report_output_path,
+ get_increment)
class Writer(object):
@@ -51,73 +53,66 @@
def update_results(self, report):
- header = report.results['header']
+ # Get current results.
+ current_header = copy(report.results['header'])
+ current_data = deepcopy(report.results['data'])
+ for date in current_data:
+ rows = current_data[date] if report.is_funnel else
[current_data[date]]
+ for row in rows:
+ if len(row) != len(current_header):
+ raise ValueError('Results and header do not match.')
+
+ # Get previous results.
previous_results = get_previous_results(report,
self.get_output_folder())
previous_header = previous_results['header']
-
- updated_data = {}
-
- # Handle the first run case
+ previous_data = previous_results['data']
if not previous_header:
- if not previous_results['data']:
- previous_header = header
+ if not previous_data:
+ previous_header = current_header
else:
- raise ValueError('Previous results have no header')
+ raise ValueError('Previous results have no header.')
- # New results may have a different header than previous results.
+ # Current results may have a different header than previous results.
# They may contain new columns, column order changes, or removal
- # of some columns. In the latter case, the previous data will be
- # kept intact and the None value will be assigned to the missing
- # columns of the new data.
- if header != previous_header:
- # Fill in the values for removed columns.
- removed_columns = sorted(list(set(previous_header) - set(header)))
+ # of some columns.
+ if current_header != previous_header:
+
+ # Rewrite current header and data to include removed columns.
+ removed_columns = sorted(list(set(previous_header) -
set(current_header)))
if removed_columns:
- header.extend(removed_columns)
- new_data = report.results['data']
- for date in new_data:
- rows = new_data[date] if report.is_funnel else
[new_data[date]]
+ current_header.extend(removed_columns)
+ for date in current_data:
+ rows = current_data[date] if report.is_funnel else
[current_data[date]]
for row in rows:
row.extend([None] * len(removed_columns))
- # make a map to use when updating old rows to new rows
- old_columns = set(header).intersection(set(previous_header))
- new_indexes = {
- header.index(col): previous_header.index(col)
- for col in old_columns
- }
+ # Make a map to use when updating previous data column order.
+ column_map = [
+ (current_header.index(col), previous_header.index(col))
+ for col in
set(current_header).intersection(set(previous_header))
+ ]
- # rewrite previous results if there are new columns
- for date, rows in previous_results['data'].items():
- rows_with_nulls = []
- iteratee = rows
- if not report.is_funnel:
- iteratee = [rows]
- for row in iteratee:
- updated_row = [None] * len(header)
- for new_index, old_index in new_indexes.items():
- updated_row[new_index] = row[old_index]
-
- if report.is_funnel:
- rows_with_nulls.append(updated_row)
- else:
- rows_with_nulls = updated_row
-
- updated_data[date] = rows_with_nulls
- else:
- updated_data = previous_results['data']
-
- for date, rows in report.results['data'].iteritems():
- updated_data[date] = rows
- if report.is_funnel:
+ # Rewrite previous data in the new order and including new columns.
+ for date in previous_data:
+ rows = previous_data[date] if report.is_funnel else
[previous_data[date]]
+ rewritten_rows = []
for row in rows:
- if len(row) != len(header):
- raise ValueError('Results and Header do not match')
- else:
- if len(rows) != len(header):
- raise ValueError('Results and Header do not match')
+ rewritten_row = [None] * len(current_header)
+ for new_index, old_index in column_map:
+ rewritten_row[new_index] = row[old_index]
+ rewritten_rows.append(rewritten_row)
+ previous_data[date] = rewritten_rows if report.is_funnel else
rewritten_rows[0]
- return header, updated_data
+ # Build final updated data.
+ updated_header = current_header
+ updated_data = {}
+ date_threshold = self.get_date_threshold(report)
+ for date in previous_data:
+ if not date_threshold or date > date_threshold:
+ updated_data[date] = previous_data[date]
+ updated_data.update(current_data)
+
+ return updated_header, updated_data
def write_results(self, header, data, report, output_folder):
@@ -151,3 +146,10 @@
os.rename(temp_output_path, output_path)
except Exception, e:
raise RuntimeError('Could not rename the output file (' + str(e) +
').')
+
+
+ def get_date_threshold(self, report):
+ if not report.max_data_points:
+ return None
+ increment = get_increment(report.granularity)
+ return report.start - report.max_data_points * increment
diff --git a/test/reader_test.py b/test/reader_test.py
index 10a5036..102af69 100644
--- a/test/reader_test.py
+++ b/test/reader_test.py
@@ -247,6 +247,27 @@
self.assertEqual(result, expected)
+ def test_get_max_data_points_when_not_in_config(self):
+ result = self.reader.get_max_data_points({})
+ self.assertEqual(result, None)
+
+
+ def test_get_max_data_points_not_an_int_or_not_positive(self):
+ report_config = {'max_data_points': 'not and int'}
+ with self.assertRaises(ValueError):
+ self.reader.get_max_data_points(report_config)
+ report_config = {'max_data_points': 0}
+ with self.assertRaises(ValueError):
+ self.reader.get_max_data_points(report_config)
+
+
+ def test_get_max_data_points(self):
+ max_data_points = 10
+ report_config = {'max_data_points': max_data_points}
+ result = self.reader.get_max_data_points(report_config)
+ self.assertEqual(result, max_data_points)
+
+
def test_get_executable_when_not_in_config(self):
result = self.reader.get_executable({})
self.assertEqual(result, None)
diff --git a/test/selector_test.py b/test/selector_test.py
index eb41d0e..0232703 100644
--- a/test/selector_test.py
+++ b/test/selector_test.py
@@ -3,6 +3,7 @@
from reportupdater.selector import Selector
from reportupdater.reader import Reader
from reportupdater.report import Report
+from reportupdater.utils import get_increment
from unittest import TestCase
from mock import MagicMock
from datetime import datetime
@@ -101,23 +102,23 @@
def test_get_increment_when_period_is_days(self):
- increment = self.selector.get_increment('days')
+ increment = get_increment('days')
self.assertEqual(increment, relativedelta(days=1))
def test_get_increment_when_period_is_weeks(self):
- increment = self.selector.get_increment('weeks')
+ increment = get_increment('weeks')
self.assertEqual(increment, relativedelta(days=7))
def test_get_increment_when_period_is_months(self):
- increment = self.selector.get_increment('months')
+ increment = get_increment('months')
self.assertEqual(increment, relativedelta(months=1))
def test_get_increment_when_period_is_invalid(self):
with self.assertRaises(ValueError):
- self.selector.get_increment('notvalid')
+ get_increment('notvalid')
def
test_get_all_start_dates_when_first_date_is_greater_than_current_date(self):
diff --git a/test/writer_test.py b/test/writer_test.py
index 87fc02e..449156c 100644
--- a/test/writer_test.py
+++ b/test/writer_test.py
@@ -262,6 +262,39 @@
self.assertEqual(updated_data[old_date], [old_date, '2', None, '1',
'3', None])
+ def test_update_results_when_max_data_points_is_set(self):
+ # see setUp for the fake data written to this report output
+ self.report.key = 'writer_test_header_change'
+
+ new_date = datetime(2015, 1, 2)
+ new_row = [new_date, 1, 2, 3]
+ self.report.max_data_points = 1
+ self.report.granularity = 'days'
+ self.report.start = new_date
+ self.report.results = {
+ 'header': ['date', 'val1', 'val2', 'val3'], # no changes
+ 'data': {new_date: new_row}
+ }
+ header, updated_data = self.writer.update_results(self.report)
+ self.assertEqual(len(updated_data), 1)
+ self.assertTrue(new_date in updated_data)
+ self.assertEqual(updated_data[new_date], new_row)
+
+
+ def test_get_date_threshold_when_max_data_points_is_not_specified(self):
+ date_threshold = self.writer.get_date_threshold(self.report)
+ self.assertEqual(date_threshold, None)
+
+
+ def test_get_date_threshold(self):
+ self.report.max_data_points = 3
+ self.report.start = datetime(2015, 1, 1)
+ self.report.granularity = 'days'
+ expected = datetime(2014, 12, 29)
+ result = self.writer.get_date_threshold(self.report)
+ self.assertEqual(result, expected)
+
+
def test_run_when_helper_method_raises_error(self):
executed = [self.report]
self.writer.executor.run = MagicMock(return_value=executed)
--
To view, visit https://gerrit.wikimedia.org/r/281815
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I99683d3ac3c54361d75d6fe4a1c72f3070312d30
Gerrit-PatchSet: 1
Gerrit-Project: analytics/reportupdater
Gerrit-Branch: master
Gerrit-Owner: Mforns <[email protected]>
Gerrit-Reviewer: Milimetric <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits