Nuria has submitted this change and it was merged.

Change subject: Add null values when query returns no results
......................................................................


Add null values when query returns no results

Before, when the query or script returned no results (just header),
RU didn't update the report file for that time period, leaving a
blank in the report. This was on purpose and intended to give
another opportunity for the data to be backfilled when ready.

But in practice, when a query does not return any results at first,
it usually does not return any results in subsequent retrials.
So, the corresponding slot in the report file remains empty
forever, and the query is run every time RU is executed,
using unnecessary resources.

This change makes RU output a line with null values, when a query
or script returns no results. Thus, avoiding unnecessary
recomputations. If the data for a query is not ready at the time
RU triggers by default, the lag option should be used to delay
the execution by a given number of seconds.

Bug: T117537
Change-Id: Ic1790aed5df600759999b617d5900ad9a80085e0
---
M reportupdater/executor.py
M test/executor_test.py
2 files changed, 120 insertions(+), 107 deletions(-)

Approvals:
  Nuria: Verified; Looks good to me, approved



diff --git a/reportupdater/executor.py b/reportupdater/executor.py
index c44d0aa..95db060 100644
--- a/reportupdater/executor.py
+++ b/reportupdater/executor.py
@@ -10,6 +10,7 @@
 import subprocess
 import csv
 import os
+from copy import copy
 from datetime import datetime, date
 from selector import Selector
 from collections import defaultdict
@@ -51,7 +52,8 @@
             if report.db_key not in self.connections:
                 self.connections[report.db_key] = 
self.create_connection(report.db_key)
             connection = self.connections[report.db_key]
-            report.results = self.execute_sql(sql_query, connection, 
report.is_funnel)
+            header, data = self.execute_sql(sql_query, connection)
+            report.results = self.normalize_results(report, header, data)
             return True
         except Exception, e:
             message = ('Report "{report_key}" could not be executed '
@@ -115,32 +117,17 @@
             raise RuntimeError('MySQLdb can not connect to database (' + 
str(e) + ').')
 
 
-    def execute_sql(self, sql_query, connection, is_funnel=False):
+    def execute_sql(self, sql_query, connection):
         cursor = connection.cursor()
         try:
             cursor.execute(sql_query)
-            rows = cursor.fetchall()
+            data = cursor.fetchall()
             header = [field[0] for field in cursor.description]
         except Exception, e:
             raise RuntimeError('MySQLdb can not execute query (' + str(e) + 
').')
         finally:
             cursor.close()
-        if is_funnel:
-            data = defaultdict(list)
-        else:
-            data = {}
-        for row in rows:
-            sql_date = row[0]
-            if not isinstance(sql_date, date):
-                raise ValueError('Query results do not have date values in 
first column.')
-            # normalize to datetime
-            row = list(row)  # make row item assignable
-            row[0] = datetime(sql_date.year, sql_date.month, sql_date.day, 0, 
0, 0, 0)
-            if is_funnel:
-                data[row[0]].append(row)
-            else:
-                data[row[0]] = row
-        return {'header': header, 'data': data}
+        return header, data
 
 
     def execute_script_report(self, report):
@@ -157,33 +144,52 @@
         # it will be the absolute path to itself
         # NOTE: wouldn't this be available to the script anyway?
         parameters.append(os.path.dirname(report.script))
-        # execute the script, store its output in a pipe
         try:
+            # Execute the script, parse the results and normalize them.
             process = subprocess.Popen(parameters, stdout=subprocess.PIPE)
-        except OSError, e:
+            tsv_reader = csv.reader(process.stdout, delimiter='\t')
+            report.results = self.normalize_results(report, None, tsv_reader)
+        except Exception, e:
             message = ('Report "{report_key}" could not be executed '
                        'because of error: {error}')
             logging.error(message.format(report_key=report.key, error=str(e)))
             return False
-        # parse the results into the report object
-        header = None
-        if report.is_funnel:
-            data = defaultdict(list)
-        else:
-            data = {}
-        tsv_reader = csv.reader(process.stdout, delimiter='\t')
-        for row in tsv_reader:
-            if header is None:  # first row
-                header = row
-            else:  # other rows
-                try:
-                    row[0] = datetime.strptime(row[0], DATE_FORMAT)
-                except ValueError:
-                    logging.error('Query results do not have date values in 
first column.')
-                    return False
-                if report.is_funnel:
-                    data[row[0]].append(row)
-                else:
-                    data[row[0]] = row
-        report.results = {'header': header, 'data': data}
         return True
+
+
+    def normalize_results(self, report, header, data):
+        normalized_header = copy(header)
+        normalized_data = defaultdict(list) if report.is_funnel else {}
+
+        for row in data:
+            # If the header was not explicitly passed, use the first row.
+            if normalized_header is None:
+                normalized_header = row
+                continue
+
+            # Parse the date in the first column.
+            raw_date = row[0]
+            if isinstance(raw_date, date):
+                normalized_date = datetime(raw_date.year, raw_date.month,
+                                           raw_date.day, 0, 0, 0, 0)
+            elif isinstance(raw_date, str):
+                try:
+                    normalized_date = datetime.strptime(raw_date, DATE_FORMAT)
+                except:
+                    raise ValueError('Could not parse date from results.')
+            else:
+                raise ValueError('Results do not have dates in first column.')
+
+            # Build the normalized data.
+            normalized_row = [normalized_date] + list(row[1:])
+            if report.is_funnel:
+                normalized_data[normalized_date].append(normalized_row)
+            else:
+                normalized_data[normalized_date] = normalized_row
+
+        # If there's no data, store a row with null values to avoid 
recomputation.
+        if len(normalized_data) == 0:
+            empty_row = [report.start] + [None] * (len(normalized_header) - 1)
+            normalized_data[report.start] = [empty_row] if report.is_funnel 
else empty_row
+
+        return {'header': normalized_header, 'data': normalized_data}
diff --git a/test/executor_test.py b/test/executor_test.py
index f5569d1..da89b33 100644
--- a/test/executor_test.py
+++ b/test/executor_test.py
@@ -152,45 +152,6 @@
             self.executor.execute_sql('SOME sql;', connection)
 
 
-    def test_execute_sql_when_first_column_is_not_a_date(self):
-        def fetchall_callback():
-            return [
-                [date(2015, 1, 1), '1'],
-                ['bad formated date', '2']
-            ]
-        connection = ConnectionMock(None, fetchall_callback, [])
-        with self.assertRaises(ValueError):
-            self.executor.execute_sql('SOME sql;', connection)
-
-
-    def test_execute_sql_with_funnel_data(self):
-        def fetchall_callback():
-            return [
-                [date(2015, 1, 1), '1'],
-                [date(2015, 1, 1), '2'],
-                [date(2015, 1, 1), '3'],
-                [date(2015, 1, 2), '4'],
-                [date(2015, 1, 2), '5']
-            ]
-        connection = ConnectionMock(None, fetchall_callback, [])
-        result = self.executor.execute_sql('SOME sql;', connection, 
is_funnel=True)
-        expected = {
-            'header': [],
-            'data': {
-                datetime(2015, 1, 1): [
-                    [datetime(2015, 1, 1), '1'],
-                    [datetime(2015, 1, 1), '2'],
-                    [datetime(2015, 1, 1), '3']
-                ],
-                datetime(2015, 1, 2): [
-                    [datetime(2015, 1, 2), '4'],
-                    [datetime(2015, 1, 2), '5']
-                ]
-            }
-        }
-        self.assertEqual(result, expected)
-
-
     def test_execute_sql(self):
         def fetchall_callback():
             return [
@@ -199,13 +160,7 @@
             ]
         connection = ConnectionMock(None, fetchall_callback, [])
         result = self.executor.execute_sql('SOME sql;', connection)
-        expected = {
-            'header': [],
-            'data': {
-                datetime(2015, 1, 1): [datetime(2015, 1, 1), '1'],
-                datetime(2015, 1, 2): [datetime(2015, 1, 2), '2'],
-            }
-        }
+        expected = ([], [[date(2015, 1, 1), '1'], [date(2015, 1, 2), '2']])
         self.assertEqual(result, expected)
 
 
@@ -271,20 +226,6 @@
         self.assertEqual(success, False)
 
 
-    def test_execute_script_when_script_output_is_invalid(self):
-        class PopenReturnMock():
-            def __init__(self):
-                self.stdout = ['date\tvalue', 'invalid_date\t1']
-
-        def subprocess_popen_mock(parameters, **kwargs):
-            return PopenReturnMock()
-        subprocess_popen_stash = subprocess.Popen
-        subprocess.Popen = MagicMock(wraps=subprocess_popen_mock)
-        success = self.executor.execute_script_report(self.report)
-        subprocess.Popen = subprocess_popen_stash
-        self.assertEqual(success, False)
-
-
     def test_execute_script(self):
         class PopenReturnMock():
             def __init__(self):
@@ -302,16 +243,82 @@
         self.assertEqual(self.report.results['data'], expected_data)
 
 
+    def test_normalize_results_when_header_is_not_set(self):
+        data = [['date', 'col1', 'col2'], ['2016-01-01', 1, 2]]
+        results = self.executor.normalize_results(self.report, None, data)
+        expected = {
+            'header': ['date', 'col1', 'col2'],
+            'data': {
+                datetime(2016, 1, 1): [datetime(2016, 1, 1), 1, 2]
+            }
+        }
+        self.assertEqual(results, expected)
+
+
+    def test_normalize_results_when_first_column_is_not_a_date(self):
+        header = ['date', 'col1', 'col2']
+        data = [
+            [date(2015, 1, 1), 1, 2],
+            ['bad formated date', 1, 2]
+        ]
+        with self.assertRaises(ValueError):
+            self.executor.normalize_results(self.report, header, data)
+
+
+    def test_normalize_results_when_data_is_empty(self):
+        header = ['date', 'col1', 'col2']
+        data = []
+        results = self.executor.normalize_results(self.report, header, data)
+        expected = {
+            'header': ['date', 'col1', 'col2'],
+            'data': {
+                datetime(2015, 1, 1): [datetime(2015, 1, 1), None, None]
+            }
+        }
+        self.assertEqual(results, expected)
+
+    def test_normalize_results_with_funnel_data(self):
+        header = ['date', 'val']
+        data = [
+            [date(2015, 1, 1), '1'],
+            [date(2015, 1, 1), '2'],
+            [date(2015, 1, 1), '3'],
+            [date(2015, 1, 2), '4'],
+            [date(2015, 1, 2), '5']
+        ]
+        self.report.is_funnel = True
+        results = self.executor.normalize_results(self.report, header, data)
+        expected = {
+            'header': ['date', 'val'],
+            'data': {
+                datetime(2015, 1, 1): [
+                    [datetime(2015, 1, 1), '1'],
+                    [datetime(2015, 1, 1), '2'],
+                    [datetime(2015, 1, 1), '3']
+                ],
+                datetime(2015, 1, 2): [
+                    [datetime(2015, 1, 2), '4'],
+                    [datetime(2015, 1, 2), '5']
+                ]
+            }
+        }
+        self.assertEqual(results, expected)
+
+
     def test_run(self):
         selected = [self.report]
         self.executor.selector.run = MagicMock(return_value=selected)
         self.executor.create_connection = MagicMock(return_value='connection')
-        results = {
-            'header': ['some', 'sql', 'header'],
-            'data': {datetime(2015, 1, 1): [date(2015, 1, 1), 'some', 'value']}
-        }
+        results = (
+            ['some', 'sql', 'header'],
+            [[date(2015, 1, 1), 'some', 'value']]
+        )
         self.executor.execute_sql = MagicMock(return_value=results)
         executed = list(self.executor.run())
         self.assertEqual(len(executed), 1)
         report = executed[0]
-        self.assertEqual(report.results, results)
+        expected = {
+            'header': ['some', 'sql', 'header'],
+            'data': {datetime(2015, 1, 1): [datetime(2015, 1, 1), 'some', 
'value']}
+        }
+        self.assertEqual(report.results, expected)

-- 
To view, visit https://gerrit.wikimedia.org/r/284162
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Ic1790aed5df600759999b617d5900ad9a80085e0
Gerrit-PatchSet: 1
Gerrit-Project: analytics/reportupdater
Gerrit-Branch: master
Gerrit-Owner: Mforns <mfo...@wikimedia.org>
Gerrit-Reviewer: Mforns <mfo...@wikimedia.org>
Gerrit-Reviewer: Nuria <nu...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to