Milimetric has submitted this change and it was merged.

Change subject: Setup celery task workflow to handle running reports for the 
ProgramMetrics API
......................................................................


Setup celery task workflow to handle running reports for the ProgramMetrics API

The form to trigger the Program Metrics reports can be found at
/reports/program-global-metrics

Bug:T118308
Change-Id: I1d4a6bba9f81962d01990754299a22a077ba61df
---
M tests/test_controllers/test_reports.py
A tests/test_models/test_run_program_metrics_report.py
M wikimetrics/api/__init__.py
M wikimetrics/api/cohorts.py
A wikimetrics/api/reports.py
M wikimetrics/controllers/cohorts.py
M wikimetrics/controllers/reports.py
M wikimetrics/forms/__init__.py
D wikimetrics/forms/global_metrics_form.py
A wikimetrics/forms/program_metrics_form.py
M wikimetrics/models/report_nodes/__init__.py
M wikimetrics/models/report_nodes/report.py
A wikimetrics/models/report_nodes/run_program_metrics_report.py
M wikimetrics/models/report_nodes/run_report.py
M wikimetrics/models/report_nodes/sum_aggregate_by_user_report.py
A wikimetrics/models/report_nodes/validate_program_metrics_report.py
R wikimetrics/static/js/programMetricsReportUpload.js
R wikimetrics/templates/forms/program_metrics_report_upload.html
R wikimetrics/templates/program_metrics_reports.html
19 files changed, 715 insertions(+), 145 deletions(-)

Approvals:
  Milimetric: Verified; Looks good to me, approved
  jenkins-bot: Verified



diff --git a/tests/test_controllers/test_reports.py 
b/tests/test_controllers/test_reports.py
index 8703186..509e7f7 100644
--- a/tests/test_controllers/test_reports.py
+++ b/tests/test_controllers/test_reports.py
@@ -2,7 +2,6 @@
 import celery
 import time
 import unittest
-import os.path
 from mock import Mock, MagicMock, patch
 from contextlib import contextmanager
 from flask import appcontext_pushed, g
@@ -19,7 +18,7 @@
 from wikimetrics.controllers.reports import (
     get_celery_task,
 )
-from wikimetrics.configurables import app, get_absolute_path, db, queue
+from wikimetrics.configurables import app, queue
 
 
 @contextmanager
@@ -788,3 +787,56 @@
         mock_report = ReportStore()
         failure = mock_report.get_result_safely('')
         assert_equal(failure['failure'], 'result not available')
+
+
+class ProgramMetricsReportControllerTest(ControllerAsyncTest):
+
+    def setUp(self):
+        ControllerAsyncTest.setUp(self)
+
+    def test_report_fails_with_invalid_cohort(self):
+        response = self.client.post('/reports/program-global-metrics', 
data=dict(
+            name='TestCohort1',
+            project='wiki',
+            centralauth=True,
+            validate_as_user_ids=False,
+            paste_ids_or_names='test-specific-0\ntest-specific-1',
+            start_date='2015-11-01 00:00:00',
+            end_date='2015-11-30 00:00:00',
+        ))
+
+        assert_equal(response.status_code, 302)
+        assert_true(response.data.find('/reports/') >= 0)
+
+        # Wait for the task to get processed
+        time.sleep(3)
+
+        response = self.client.get('/reports/list/')
+        parsed = json.loads(response.data)
+        result_key = parsed['reports'][-1]['result_key']
+        task, report = get_celery_task(result_key)
+        assert_true(task is not None)
+
+    def test_report_create_and_result(self):
+
+        response = self.client.post('/reports/program-global-metrics', 
data=dict(
+            name='TestCohort2',
+            project='wiki',
+            centralauth=True,
+            validate_as_user_ids=False,
+            paste_ids_or_names='Editor test-specific-0\nEditor 
test-specific-1',
+            start_date='2015-11-01 00:00:00',
+            end_date='2015-11-30 00:00:00',
+        ))
+
+        assert_equal(response.status_code, 302)
+        assert_true(response.data.find('/reports/') >= 0)
+
+        # Wait for the task to get processed
+        time.sleep(3)
+
+        response = self.client.get('/reports/list/')
+        parsed = json.loads(response.data)
+        result_key = parsed['reports'][-1]['result_key']
+        task, report = get_celery_task(result_key)
+        assert_true(task is not None)
diff --git a/tests/test_models/test_run_program_metrics_report.py 
b/tests/test_models/test_run_program_metrics_report.py
new file mode 100644
index 0000000..3b5206f
--- /dev/null
+++ b/tests/test_models/test_run_program_metrics_report.py
@@ -0,0 +1,120 @@
+from nose.tools import assert_equals, raises, assert_true, assert_is_not_none
+
+from tests.fixtures import QueueDatabaseTest, d
+from wikimetrics.models import RunProgramMetricsReport, ReportStore, 
WikiUserStore
+from wikimetrics.utils import parse_pretty_date, format_pretty_date
+from wikimetrics.enums import Aggregation
+from wikimetrics.api import CohortService
+
+cohort_service = CohortService()
+
+
+def make_pending(report):
+    report.status = 'PENDING'
+
+
+class RunProgramMetricsReportWithInvalidCohort(QueueDatabaseTest):
+    def setUp(self):
+        QueueDatabaseTest.setUp(self)
+        self.common_cohort_1()
+
+    @raises(Exception)
+    def test_raises_unvalidated_cohort(self):
+        self.cohort.validated = False
+        jr = RunProgramMetricsReport(self.cohort.id,
+                                     parse_pretty_date('2015-01-01 00:00:00'),
+                                     parse_pretty_date('2015-01-31 00:00:00'),
+                                     self.owner_user_id)
+        jr.task.delay(jr).get()
+
+    def test_invalid_cohort_returns_failure(self):
+        self.cohort.validated = True
+        wikiusers = self.session.query(WikiUserStore) \
+            .filter(WikiUserStore.validating_cohort == self.cohort.id) \
+            .all()
+        # Valid cohorts have >=50% of the users valid, so we make half
+        # of them invalid to test
+        for wu in wikiusers[:(len(wikiusers) / 2 + 1)]:
+            wu.valid = False
+        self.session.commit()
+        jr = RunProgramMetricsReport(self.cohort.id,
+                                     parse_pretty_date('2015-01-01 00:00:00'),
+                                     parse_pretty_date('2015-01-31 00:00:00'),
+                                     self.owner_user_id)
+        results = jr.task.delay(jr).get()
+        self.session.commit()
+        result_key = self.session.query(ReportStore) \
+            .get(jr.persistent_id) \
+            .result_key
+        results = results[result_key]
+        assert_is_not_none(results['FAILURE'])
+
+
+class RunProgramMetricsReportTest(QueueDatabaseTest):
+    def setUp(self):
+        QueueDatabaseTest.setUp(self)
+        # registration for all the editors below
+        self.r = r = 20150101000000
+        # exactly 30 days after registration
+        self.m = m = 20150131000000
+        self.r_plus_30 = format_pretty_date(d(self.m))
+
+        self.create_test_cohort(
+            editor_count=5,
+            revisions_per_editor=8,
+            revision_timestamps=[
+                # this one will make 5 edits within 30 days of self.r_plus_30
+                [r + 1, r + 2, r + 3, r + 4, r + 5, m + 6, m + 7, m + 8],
+                # this one will make 3 edits within 30 days of self.r_plus_30
+                [r + 1, r + 2, r + 3, m + 4, m + 5, m + 6, m + 7, m + 8],
+                # this one will make 8 edits within 30 days of self.r_plus_30
+                [r + 1, r + 2, r + 3, r + 4, r + 5, r + 6, r + 7, r + 8],
+                # this one will make 0 edits within 30 days of self.r_plus_30
+                [m + 1, m + 2, m + 3, m + 4, m + 5, m + 6, m + 7, m + 8],
+                # this one will make the 5th edit right on self.r_plus_30
+                [r + 1, r + 2, r + 3, r + 4, m + 0, m + 6, m + 7, m + 8],
+            ],
+            user_registrations=r,
+            revision_lengths=10
+        )
+
+    @raises(Exception)
+    def test_empty_response(self):
+        """
+        Case where user tries to submit form with no cohorts / metrics
+        should be handled client side, on server side an exception will be
+        thrown if RunProgramMetricsReport object cannot be created
+        """
+        jr = RunProgramMetricsReport(None, None, None, 
user_id=self.owner_user_id)
+        jr.task.delay(jr).get()
+
+    def test_basic_response(self):
+        jr = RunProgramMetricsReport(self.cohort.id,
+                                     parse_pretty_date('2015-01-01 00:00:00'),
+                                     parse_pretty_date('2015-01-31 00:00:00'),
+                                     self.owner_user_id)
+        results = jr.task.delay(jr).get()
+        self.session.commit()
+        
+        # Make sure the cohort is validated
+        assert_true(self.cohort.validated)
+
+        result_key = self.session.query(ReportStore) \
+            .get(jr.persistent_id) \
+            .result_key
+        results = results[result_key]
+        assert_equals(
+            len(results[Aggregation.SUM]),
+            6)
+        assert_equals(
+            results[Aggregation.SUM]['pages_created'], 1)
+        assert_equals(
+            results[Aggregation.SUM]['pages_edited'], 4)
+        assert_equals(
+            results[Aggregation.SUM]['newly_registered'], 0)
+        assert_equals(
+            results[Aggregation.SUM]['existing_editors'], 5)
+        assert_equals(
+            results[Aggregation.SUM]['rolling_active_editor'], 0)
+        assert_equals(
+            results[Aggregation.SUM]['bytes_added'], 10)
diff --git a/wikimetrics/api/__init__.py b/wikimetrics/api/__init__.py
index 15e69bb..7d7367a 100644
--- a/wikimetrics/api/__init__.py
+++ b/wikimetrics/api/__init__.py
@@ -3,7 +3,7 @@
 from cohorts import *
 from tags import *
 from replication_lag import *
-
+from reports import *
 from batch import *
 
 # ignore flake8 because of F403 violation
diff --git a/wikimetrics/api/cohorts.py b/wikimetrics/api/cohorts.py
index e3d0c88..3b797cb 100644
--- a/wikimetrics/api/cohorts.py
+++ b/wikimetrics/api/cohorts.py
@@ -1,5 +1,5 @@
 from sqlalchemy.orm.exc import NoResultFound
-from sqlalchemy import func
+from sqlalchemy import func, distinct
 from wikimetrics.configurables import db
 from wikimetrics.exceptions import Unauthorized, InvalidCohort, DatabaseError
 from wikimetrics.models import cohort_classes, ValidatedCohort, WikiCohort
@@ -100,6 +100,16 @@
         """
         db_session = db.get_session()
         return db_session.query(CohortStore).get(cohort.id)
+
+    def fetch_by_id(self, cohort_id):
+        """
+        Fetches a CohortStore object from the database, without checking 
permissions
+
+        Parameters
+            cohort  : a logical Cohort object
+        """
+        db_session = db.get_session()
+        return db_session.query(CohortStore).get(cohort_id)
 
     def get_cohort_by_name(self, db_session, name):
         """
@@ -276,7 +286,7 @@
             session.add(search)
             session.commit()
 
-    def get_validation_info(self, cohort, session):
+    def get_validation_info(self, cohort, session, unique_users=False):
         """
         Returns
             If the cohort has no validation information, an empty dictionary
@@ -286,6 +296,7 @@
             valid_count         : number of valid users
             total_count         : number of users in the cohort
             not_validated_count : users in the cohort not yet validated at all
+            percentage_valid    : percentage of valid users in the cohort
         """
         if not cohort.has_validation_info:
             # make UI work as little as possible, do not return nulls
@@ -327,8 +338,21 @@
 
         stats['total_count'] = sum(stats.values())
         stats['validated_count'] = stats['valid_count'] + 
stats['invalid_count']
-
+        stats['percentage_valid'] = (stats['valid_count'] /
+                                     float(stats['total_count']) * 100) \
+            if stats['total_count'] > 0 else 0
+        if unique_users:
+            stats['unique_users'] = self.get_unique_users(cohort, session)
         return stats
+
+    def get_unique_users(self, cohort, session):
+        """
+        Returns:
+            the number of users in this cohort
+        """
+        return 
session.query(func.count(distinct(WikiUserStore.mediawiki_username))) \
+            .filter(WikiUserStore.validating_cohort == cohort.id) \
+            .one()[0]
 
     def get_membership(self, cohort, session):
         wikiusers = (
@@ -432,3 +456,66 @@
         )
         db_session.add(cohort_tag)
         db_session.commit()
+
+    def delete_owner_cohort(self, db_session, cohort_id):
+        """
+        Deletes the cohort and all associate records with that cohort if user 
is the
+        owner.
+
+        Raises an error if it cannot delete the cohort.
+        """
+        db_session = db.get_session() if not db_session else db_session
+        # Check that there's only one owner and delete it
+        cu = db_session.query(CohortUserStore) \
+            .filter(CohortUserStore.cohort_id == cohort_id) \
+            .filter(CohortUserStore.role == CohortUserRole.OWNER) \
+            .delete()
+
+        if cu != 1:
+            db_session.rollback()
+            raise DatabaseError('No owner or multiple owners in cohort.')
+        else:
+            try:
+                # Delete all other non-owners from cohort_user
+                db_session.query(CohortUserStore) \
+                    .filter(CohortUserStore.cohort_id == cohort_id) \
+                    .delete()
+                db_session.query(CohortWikiUserStore) \
+                    .filter(CohortWikiUserStore.cohort_id == cohort_id) \
+                    .delete()
+
+                db_session.query(WikiUserStore) \
+                    .filter(WikiUserStore.validating_cohort == cohort_id) \
+                    .delete()
+
+                # Delete tags related to the cohort
+                db_session.query(CohortTagStore) \
+                    .filter(CohortTagStore.cohort_id == cohort_id) \
+                    .delete()
+
+                c = db_session.query(CohortStore) \
+                    .filter(CohortStore.id == cohort_id) \
+                    .delete()
+                if c < 1:
+                    raise DatabaseError
+            except DatabaseError:
+                db_session.rollback()
+                raise DatabaseError('Owner attempt to delete a cohort failed.')
+
+    def delete_viewer_cohort(self, db_session, user_id, cohort_id):
+        """
+        Used when deleting a user's connection to a cohort. Currently used 
when user
+        is a VIEWER of a cohort and want to remove that cohort from their list.
+
+        Raises exception when viewer is duplicated, nonexistent, or can not be 
deleted.
+        """
+        db_session = db.get_session() if not db_session else db_session
+        cu = db_session.query(CohortUserStore) \
+            .filter(CohortUserStore.cohort_id == cohort_id) \
+            .filter(CohortUserStore.user_id == user_id) \
+            .filter(CohortUserStore.role == CohortUserRole.VIEWER) \
+            .delete()
+
+        if cu != 1:
+            db_session.rollback()
+            raise DatabaseError('Viewer attempt delete cohort failed.')
diff --git a/wikimetrics/api/reports.py b/wikimetrics/api/reports.py
new file mode 100644
index 0000000..3c8c96f
--- /dev/null
+++ b/wikimetrics/api/reports.py
@@ -0,0 +1,22 @@
+from batch import write_report_task
+from wikimetrics.models.storage.report import ReportStore
+
+
+class ReportService(object):
+
+    def write_report_to_file(self, report, results, db_session):
+        """
+        Helper to write results of a report to file.
+
+        The post_process method in RunReport and RunProgramMetricsReport have
+        the uses this functionality to write public reports, so we abstract it 
here.
+        """
+        db_report = db_session.query(ReportStore).get(report.persistent_id)
+
+        data = db_report.get_json_result(results)
+
+        # code below schedules an async task on celery to write the file
+        report_id_to_write = report.persistent_id
+        if report.recurrent_parent_id is not None:
+            report_id_to_write = report.recurrent_parent_id
+        write_report_task.delay(report_id_to_write, report.created, data)
diff --git a/wikimetrics/controllers/cohorts.py 
b/wikimetrics/controllers/cohorts.py
index af707e4..0274629 100644
--- a/wikimetrics/controllers/cohorts.py
+++ b/wikimetrics/controllers/cohorts.py
@@ -159,7 +159,7 @@
 
         if not task_key:
             validation['validation_status'] = 'UNKNOWN'
-            #TODO do these defaults really make sense?
+            # TODO do these defaults really make sense?
             validation['validated_count'] = number_of_wikiusers
             validation['total_count'] = number_of_wikiusers
             validation['valid_count'] = validation['total_count']
@@ -295,69 +295,6 @@
         raise DatabaseError('No role found in cohort user.')
 
 
-def delete_viewer_cohort(session, cohort_id):
-    """
-    Used when deleting a user's connection to a cohort. Currently used when 
user
-    is a VIEWER of a cohort and want to remove that cohort from their list.
-
-    Raises exception when viewer is duplicated, nonexistent, or can not be 
deleted.
-    """
-    cu = session.query(CohortUserStore) \
-        .filter(CohortUserStore.cohort_id == cohort_id) \
-        .filter(CohortUserStore.user_id == current_user.id) \
-        .filter(CohortUserStore.role == CohortUserRole.VIEWER) \
-        .delete()
-
-    if cu != 1:
-        session.rollback()
-        raise DatabaseError('Viewer attempt delete cohort failed.')
-
-
-def delete_owner_cohort(session, cohort_id):
-    """
-    Deletes the cohort and all associate records with that cohort if user is 
the
-    owner.
-
-    Raises an error if it cannot delete the cohort.
-    """
-    # Check that there's only one owner and delete it
-    cu = session.query(CohortUserStore) \
-        .filter(CohortUserStore.cohort_id == cohort_id) \
-        .filter(CohortUserStore.role == CohortUserRole.OWNER) \
-        .delete()
-
-    if cu != 1:
-        session.rollback()
-        raise DatabaseError('No owner or multiple owners in cohort.')
-    else:
-        try:
-            # Delete all other non-owners from cohort_user
-            session.query(CohortUserStore) \
-                .filter(CohortUserStore.cohort_id == cohort_id) \
-                .delete()
-            session.query(CohortWikiUserStore) \
-                .filter(CohortWikiUserStore.cohort_id == cohort_id) \
-                .delete()
-
-            session.query(WikiUserStore) \
-                .filter(WikiUserStore.validating_cohort == cohort_id) \
-                .delete()
-
-            # Delete tags related to the cohort
-            session.query(CohortTagStore) \
-                .filter(CohortTagStore.cohort_id == cohort_id) \
-                .delete()
-
-            c = session.query(CohortStore) \
-                .filter(CohortStore.id == cohort_id) \
-                .delete()
-            if c < 1:
-                raise DatabaseError
-        except DatabaseError:
-            session.rollback()
-            raise DatabaseError('Owner attempt to delete a cohort failed.')
-
-
 @app.route('/cohorts/delete/<int:cohort_id>', methods=['POST'])
 def delete_cohort(cohort_id):
     """
@@ -373,13 +310,13 @@
         # Owner wants to delete, no other viewers or
         # Owner wants to delete, have other viewers, delete from other 
viewer's lists too
         if owner_and_viewers >= 1 and role == CohortUserRole.OWNER:
-            delete_owner_cohort(session, cohort_id)
+            g.cohort_service.delete_owner_cohort(session, cohort_id)
             session.commit()
             return json_redirect(url_for('cohorts_index'))
 
         # Viewer wants to delete cohort from their list, doesn't delete cohort 
from db;l,
         elif owner_and_viewers > 1 and role == CohortUserRole.VIEWER:
-            delete_viewer_cohort(session, cohort_id)
+            g.cohort_service.delete_viewer_cohort(session, current_user.id, 
cohort_id)
             session.commit()
             return json_redirect(url_for('cohorts_index'))
 
diff --git a/wikimetrics/controllers/reports.py 
b/wikimetrics/controllers/reports.py
index b62021e..a762faf 100644
--- a/wikimetrics/controllers/reports.py
+++ b/wikimetrics/controllers/reports.py
@@ -4,19 +4,19 @@
 from StringIO import StringIO
 from sqlalchemy import or_
 from sqlalchemy.orm.exc import NoResultFound
-from flask import render_template, request, redirect, url_for, Response, 
abort, g
+from flask import render_template, request, redirect, url_for, Response, g, 
flash
 from flask.ext.login import current_user
-from sqlalchemy.exc import SQLAlchemyError
 from wikimetrics.configurables import app, db
+from wikimetrics.forms import ProgramMetricsForm
 from wikimetrics.models import (
-    Report, RunReport, ReportStore, WikiUserStore, WikiUserKey, TaskErrorStore
+    Report, RunReport, RunProgramMetricsReport, ReportStore,
+    WikiUserKey, TaskErrorStore, ValidateCohort
 )
 from wikimetrics.utils import (
-    json_response, json_error, json_redirect, thirty_days_ago, stringify
+    json_response, json_error, json_redirect, thirty_days_ago
 )
 from wikimetrics.enums import Aggregation, TimeseriesChoices
-from wikimetrics.exceptions import UnauthorizedReportAccessError
-from wikimetrics.api import PublicReportFileManager, CohortService
+from wikimetrics.api import PublicReportFileManager, CohortService, 
CentralAuthService
 
 
 @app.before_request
@@ -29,8 +29,11 @@
                     app.logger,
                     app.absolute_path_to_app_root)
         cohort_service = getattr(g, 'cohort_service', None)
+        centralauth_service = getattr(g, 'centralauth_service', None)
         if cohort_service is None:
             g.cohort_service = CohortService()
+        if centralauth_service is None:
+                g.centralauth_service = CentralAuthService()
 
 
 @app.route('/reports/unset-public/<int:report_id>', methods=['POST'])
@@ -127,6 +130,42 @@
 
     # TODO fix json_response to deal with ReportStore objects
     return json_response(reports=reports)
+
+
+@app.route('/reports/program-global-metrics', methods=['GET', 'POST'])
+def program_metrics_reports_request():
+    """
+    Renders a page that facilitates kicking off a new ProgramMetrics report
+    """
+    form = ProgramMetricsForm()
+
+    if request.method == 'POST':
+        form = ProgramMetricsForm.from_request(request)
+        try:
+            if not form.validate():
+                flash('Please fix validation problems.', 'warning')
+
+            else:
+                form.parse_records()
+                vc = ValidateCohort.from_upload(form, current_user.id)
+                gm = RunProgramMetricsReport(vc.cohort_id,
+                                             form.start_date.data,
+                                             form.end_date.data,
+                                             current_user.id)
+                # Validate the cohort, and on success, call the
+                # RunProgramMetricsReport task. No parameters are passed
+                # from ValidateCohort to the report, so we are using
+                # an immutable task signature in the link param.
+                vc.task.apply_async([vc], link=gm.task.si(gm))
+                return redirect(url_for('reports_index'))
+        except Exception, e:
+            app.logger.exception(str(e))
+            flash('Server error while processing your request', 'error')
+
+    return render_template(
+        'program_metrics_reports.html',
+        form=form,
+    )
 
 
 def get_celery_task(result_key):
@@ -434,22 +473,22 @@
     return json_response(message='Report scheduled for rerun')
 
 
-#@app.route('/reports/kill/<result_key>')
-#def report_kill(result_key):
-    #return 'not implemented'
-    #db_session = db.get_session()
-    #db_report = db_session.query(ReportStore).get(result_key)
-    #if not db_report:
-    #    return json_error('no task exists with id: {0}'.format(result_key))
-    #celery_task = Report.task.AsyncResult(db_report.result_key)
-    #app.logger.debug('revoking task: %s', celery_task.id)
-    #from celery.task.control import revoke
-    #celery_task.revoke()
-    # TODO figure out how to terminate tasks. this throws an error
-    # which I believe is related to 
https://github.com/celery/celery/issues/1153
-    # and which is fixed by a patch.  however, I can't get things running
-    # with development version
-    #revoke(celery_task.id, terminate=True)
-    #return json_response(status=celery_task.status)
+# @app.route('/reports/kill/<result_key>')
+# def report_kill(result_key):
+#     return 'not implemented'
+#     db_session = db.get_session()
+#     db_report = db_session.query(ReportStore).get(result_key)
+#     if not db_report:
+#        return json_error('no task exists with id: {0}'.format(result_key))
+#     celery_task = Report.task.AsyncResult(db_report.result_key)
+#     app.logger.debug('revoking task: %s', celery_task.id)
+#     from celery.task.control import revoke
+#     celery_task.revoke()
+#     TODO figure out how to terminate tasks. this throws an error
+#     which I believe is related to 
https://github.com/celery/celery/issues/1153
+#     and which is fixed by a patch.  however, I can't get things running
+#     with development version
+#     revoke(celery_task.id, terminate=True)
+#     return json_response(status=celery_task.status)
 
-########   Internal functions not available via HTTP 
################################
+# #######   Internal functions not available via HTTP 
################################
diff --git a/wikimetrics/forms/__init__.py b/wikimetrics/forms/__init__.py
index bafa288..be63cfb 100644
--- a/wikimetrics/forms/__init__.py
+++ b/wikimetrics/forms/__init__.py
@@ -1,5 +1,6 @@
 from secure_form import *
 from cohort_upload import *
+from program_metrics_form import *
 from fields import *
 
 # ignore flake8 because of F403 violation
diff --git a/wikimetrics/forms/global_metrics_form.py 
b/wikimetrics/forms/global_metrics_form.py
deleted file mode 100644
index a6b1d38..0000000
--- a/wikimetrics/forms/global_metrics_form.py
+++ /dev/null
@@ -1,28 +0,0 @@
-import uuid
-
-from flask import g
-from wikimetrics.configurables import db
-from wtforms import HiddenField
-from wikimetrics.forms.fields import BetterBooleanField, BetterDateTimeField
-from wtforms.validators import Required
-from wikimetrics.forms.validators import NotGreater
-from wikimetrics.utils import parse_username, thirty_days_ago, today, 
format_pretty_date
-from cohort_upload import CohortUpload
-from validators import (
-    CohortNameUnused, CohortNameLegalCharacters, ProjectExists, RequiredIfNot
-)
-
-
-class GlobalMetricsForm(CohortUpload):
-    """
-    Defines the fields necessary to upload inputs to calculate
-    the global metrics
-    """
-    # Override cohort name and default project
-    # The user for the Global API doesn't have to define these
-    name                    = HiddenField(default='GlobalCohort_' + 
str(uuid.uuid1()))
-    project                 = HiddenField(default='enwiki')
-    validate_as_user_ids    = HiddenField(default='False')
-    start_date              = BetterDateTimeField(
-        default=thirty_days_ago, validators=[NotGreater('end_date')])
-    end_date                = BetterDateTimeField(default=today)
diff --git a/wikimetrics/forms/program_metrics_form.py 
b/wikimetrics/forms/program_metrics_form.py
new file mode 100644
index 0000000..3724e2d
--- /dev/null
+++ b/wikimetrics/forms/program_metrics_form.py
@@ -0,0 +1,23 @@
+import uuid
+
+from wtforms import HiddenField
+from wikimetrics.forms.fields import BetterDateTimeField
+from wikimetrics.forms.validators import NotGreater
+from wikimetrics.utils import thirty_days_ago, today
+from cohort_upload import CohortUpload
+
+
+class ProgramMetricsForm(CohortUpload):
+    """
+    Defines the fields necessary to upload inputs to calculate
+    the ProgramMetrics
+    """
+    # Override cohort name and default project
+    # The user for the ProgramMetrics API doesn't have to define these
+    name                    = HiddenField(
+        default='ProgramGlobalMetricsCohort_' + str(uuid.uuid1()))
+    project                 = HiddenField(default='enwiki')
+    validate_as_user_ids    = HiddenField(default='False')
+    start_date              = BetterDateTimeField(
+        default=thirty_days_ago, validators=[NotGreater('end_date')])
+    end_date                = BetterDateTimeField(default=today)
diff --git a/wikimetrics/models/report_nodes/__init__.py 
b/wikimetrics/models/report_nodes/__init__.py
index c26abca..9a52fba 100644
--- a/wikimetrics/models/report_nodes/__init__.py
+++ b/wikimetrics/models/report_nodes/__init__.py
@@ -4,6 +4,7 @@
 from sum_aggregate_by_user_report import *
 from report import *
 from run_report import *
-
+from run_program_metrics_report import *
+from validate_program_metrics_report import *
 # ignore flake8 because of F403 violation
 # flake8: noqa
diff --git a/wikimetrics/models/report_nodes/report.py 
b/wikimetrics/models/report_nodes/report.py
index 41c7b82..10c709e 100644
--- a/wikimetrics/models/report_nodes/report.py
+++ b/wikimetrics/models/report_nodes/report.py
@@ -168,7 +168,6 @@
         """
         self.set_status(celery.states.STARTED, task_id=current_task.request.id)
         results = []
-        
         if self.children:
             try:
                 child_results = [child.run() for child in self.children]
diff --git a/wikimetrics/models/report_nodes/run_program_metrics_report.py 
b/wikimetrics/models/report_nodes/run_program_metrics_report.py
new file mode 100644
index 0000000..177d555
--- /dev/null
+++ b/wikimetrics/models/report_nodes/run_program_metrics_report.py
@@ -0,0 +1,269 @@
+from celery.utils.log import get_task_logger
+from datetime import timedelta
+
+from wikimetrics.enums import Aggregation
+from wikimetrics.metrics import metric_classes
+from report import ReportNode
+from aggregate_report import AggregateReport
+from validate_program_metrics_report import ValidateProgramMetricsReport
+from sum_aggregate_by_user_report import SumAggregateByUserReport
+from wikimetrics.api import ReportService, CohortService
+from wikimetrics.configurables import db
+
+__all__ = ['RunProgramMetricsReport']
+task_logger = get_task_logger(__name__)
+cohort_service = CohortService()
+
+
+class RunProgramMetricsReport(ReportNode):
+    """
+    This class launches the reports for the 4 metrics needed to
+    calculate the global metrics, collects the results and
+    writes them to storage.
+    """
+
+    show_in_ui = True
+
+    def __init__(self,
+                 cohort_id,
+                 start_date,
+                 end_date,
+                 user_id=0):
+        """
+        Parameters:
+            cohort_id       : ID of the cohort
+            start_date      : Start date for the GlobalMetrics report
+            end_date        : End date for the GlobalMetrics report
+            user_id         : The user running this report
+        """
+        self.cohort_id = cohort_id
+        self.start_date = start_date
+        self.end_date = end_date
+        self.user_id = user_id
+        self.recurrent_parent_id = None
+        self.persistent_id = None
+
+    def run(self):
+        """
+        This initializes the cohort and any parameters not known at init time
+        for this ReportNode, and initializes and calls it's super class' run 
method.
+
+        Raises:
+            KeyError if required parameters are missing
+        """
+        cohort_store_object = cohort_service.fetch_by_id(self.cohort_id)
+        # First make sure this is a valid cohort
+        if cohort_store_object is not None and cohort_store_object.validated:
+            self.cohort = cohort_service.convert(cohort_store_object)
+            validate_report = ValidateProgramMetricsReport(self.cohort,
+                                                           db.get_session(),
+                                                           
user_id=self.user_id)
+            self.cohort.size = validate_report.unique_users
+            self.parameters = {
+                'name': 'Program Global Metrics Report',
+                'cohort': {
+                    'id': self.cohort.id,
+                    'name': self.cohort.name,
+                    'size': self.cohort.size,
+                },
+                'user_id': self.user_id,
+                'metric': {
+                    'name': 'ProgramGlobalMetrics',
+                    'end_date': self.end_date
+                },
+            }
+
+            super(RunProgramMetricsReport, self).__init__(
+                name=self.parameters['name'],
+                user_id=self.user_id,
+                parameters=self.parameters,
+                public=False,
+                recurrent=False,
+                recurrent_parent_id=self.recurrent_parent_id,
+                created=None,
+                store=True,
+                persistent_id=self.persistent_id,
+            )
+            
+            if validate_report.valid():
+                self.children = [self.get_active_editors_report(),
+                                 self.get_new_editors_report(),
+                                 self.get_pages_created_report(),
+                                 self.get_pages_edited_report(),
+                                 self.get_bytes_added_report()]
+            else:
+                self.children = [validate_report]
+            return super(RunProgramMetricsReport, self).run()
+
+        else:
+            # This should never happen, unless it's a test where 
RunProgramMetricsReport
+            # is being directly instantiated.
+            task_logger.error("Cohort not validated")
+            # Clean up cohort anyway
+            cohort_service.delete_owner_cohort(None, self.cohort_id)
+            raise Exception("Cohort not validated")
+
+    def finish(self, aggregated_results):
+        # Delete the cohort - we don't want to store these cohorts permanently
+        cohort_service.delete_owner_cohort(None, self.cohort_id)
+        if len(aggregated_results) > 1:
+            # Get all the results into the desired shape and return them
+            new_editors_count = 
aggregated_results[1][Aggregation.SUM]['newly_registered']
+
+            # Existing editors can be calculated by subtracting the new 
editors count
+            # from the size of the cohort, we calculate this here and add it to
+            # the list of aggregated_results.
+            existing_editors_count = self.cohort.size - new_editors_count
+            aggregated_results.append({Aggregation.SUM:
+                                       {'existing_editors': 
existing_editors_count}})
+
+            # Manually rename absolute_sum to bytes_added - this looks silly, 
there
+            # is probably a better way to do it
+            bytes_added_count = 
aggregated_results[4][Aggregation.SUM]['absolute_sum']
+            aggregated_results[4] = ({Aggregation.SUM:
+                                      {'bytes_added': bytes_added_count}})
+
+            # At this point aggregated_results is a list of dicts that looks 
like:
+            # [{Aggregation.SUM: {'newly_registered':3}},
+            #  {Aggregation.SUM: {'existing_editors':3}}]
+            # We convert this into a single dict with key Sum,
+            # and all the submetrics as values
+            # Like: {Aggregation.SUM: {'newly_registered': 3, 
'existing_editors': 3}}
+            submetrics = [s[Aggregation.SUM] for s in aggregated_results]
+            result = {}
+            for s in submetrics:
+                result.update(s)
+            return self.report_result({Aggregation.SUM: result})
+        else:
+            result = self.report_result(aggregated_results[0])
+            return result
+
+    def post_process(self, results):
+        """
+         If the report is public and this task went well,
+         it will create a file on disk asynchronously.
+
+         Results are of this form:
+
+         Parameters:
+            results : data to write to disk, in this form:
+                {'5cab8d55-da19-436f-b675-1d2a3fca3481':
+                    {'Sum': {'pages_created': Decimal('0.0000')}}
+                }
+        """
+        if self.public is False:
+            return
+        rs = ReportService()
+        rs.write_report_to_file(self, results, db.get_session())
+
+    def get_aggregate_by_user_report(self, parameters):
+        metric_dict = parameters['metric']
+        metric = metric_classes[metric_dict['name']](**metric_dict)
+        return SumAggregateByUserReport(self.cohort,
+                                        metric,
+                                        parameters=parameters,
+                                        user_id=self.user_id)
+
+    def get_aggregate_report(self, parameters):
+        metric_dict = parameters['metric']
+        metric = metric_classes[metric_dict['name']](**metric_dict)
+        return AggregateReport(metric,
+                               self.cohort,
+                               options=parameters['options'],
+                               user_id=self.user_id)
+
+    def get_active_editors_report(self):
+        return self.get_aggregate_by_user_report({
+            'name': 'Active Editors report',
+            'cohort': {
+                'id': self.cohort.id,
+                'name': self.cohort.name,
+            },
+            'metric': {
+                'name': 'RollingActiveEditor',
+                'end_date': self.start_date,
+                'individualResults': True,
+            },
+        })
+
+    def get_new_editors_report(self):
+        return self.get_aggregate_by_user_report({
+            'name': 'New Editors report',
+            'cohort': {
+                'id': self.cohort.id,
+                'name': self.cohort.name,
+            },
+            'metric': {
+                'name': 'NewlyRegistered',
+                'start_date': self.end_date - timedelta(weeks=2),
+                'end_date': self.end_date,
+                'individualResults': True,
+            },
+        })
+
+    def get_pages_created_report(self):
+        return self.get_aggregate_report({
+            'name': 'Pages created report',
+            'cohort': {
+                'id': self.cohort.id,
+                'name': self.cohort.name,
+            },
+            'metric': {
+                'name': 'PagesCreated',
+                'namespaces': [0],
+                'start_date': self.start_date,
+                'end_date': self.end_date,
+                'aggregateResults': True,
+                'aggregateSum': True,
+            },
+            'options': {
+                'aggregateResults': True,
+                'aggregateSum': True,
+            }
+        })
+
+    def get_pages_edited_report(self):
+        return self.get_aggregate_by_user_report({
+            'name': 'Pages Edited report',
+            'cohort': {
+                'id': self.cohort.id,
+                'name': self.cohort.name,
+            },
+            'metric': {
+                'name': 'PagesEdited',
+                'namespaces': [0],
+                'start_date': self.start_date,
+                'end_date': self.end_date,
+                'aggregateResults': True,
+                'aggregateSum': True,
+            },
+            'options': {
+                'aggregateResults': True,
+                'aggregateSum': True,
+            }
+        })
+
+    def get_bytes_added_report(self):
+        return self.get_aggregate_report({
+            'name': 'Bytes Added report',
+            'cohort': {
+                'id': self.cohort.id,
+                'name': self.cohort.name,
+            },
+            'metric': {
+                'name': 'BytesAdded',
+                'namespaces': [0],
+                'start_date': self.start_date,
+                'end_date': self.end_date,
+                'aggregateResults': True,
+                'aggregateSum': True,
+                'absolute_sum': True,
+                'negative_only_sum': False,
+                'positive_only_sum': False,
+                'net_sum': False,
+            },
+            'options': {
+                'aggregateResults': True,
+                'aggregateSum': True,
+            }
+        })
diff --git a/wikimetrics/models/report_nodes/run_report.py 
b/wikimetrics/models/report_nodes/run_report.py
index bcc3968..720bd36 100644
--- a/wikimetrics/models/report_nodes/run_report.py
+++ b/wikimetrics/models/report_nodes/run_report.py
@@ -18,7 +18,7 @@
 from null_report import NullReport
 from validate_report import ValidateReport
 from metric_report import MetricReport
-from wikimetrics.api import write_report_task, CohortService
+from wikimetrics.api import ReportService, CohortService
 from wikimetrics.utils import stringify
 from wikimetrics.schedules import recurring_reports
 
@@ -145,16 +145,8 @@
         if self.public is False:
             return
 
-        session = db.get_session()
-        db_report = session.query(ReportStore).get(self.persistent_id)
-
-        data = db_report.get_json_result(results)
-
-        # code below schedules an async task on celery to write the file
-        report_id_to_write = self.persistent_id
-        if self.recurrent_parent_id is not None:
-            report_id_to_write = self.recurrent_parent_id
-        write_report_task.delay(report_id_to_write, self.created, data)
+        rs = ReportService()
+        rs.write_report_to_file(self, results, db.get_session())
 
     # TODO, this method belongs on a different class and it should not be a 
class method
     @classmethod
diff --git a/wikimetrics/models/report_nodes/sum_aggregate_by_user_report.py 
b/wikimetrics/models/report_nodes/sum_aggregate_by_user_report.py
index 142782c..b41813c 100644
--- a/wikimetrics/models/report_nodes/sum_aggregate_by_user_report.py
+++ b/wikimetrics/models/report_nodes/sum_aggregate_by_user_report.py
@@ -13,7 +13,7 @@
 class SumAggregateByUserReport(ReportNode):
     """
     A node responsible for aggregating the results of MultiProjectMetricReport
-    by user, and formatting them as expected by RunGlobalReport. It 
specifically
+    by user, and formatting them as expected by RunProgramMetricsReport. It 
specifically
     knows how to aggregate rolling active editor and newly registered metrics.
     """
     show_in_ui = False
@@ -43,6 +43,7 @@
         # The way of aggregating results accross different projects
         # is applying the OR operator. Read more in the Wikitech docs:
         # 
https://wikitech.wikimedia.org/wiki/Analytics/Wikimetrics/Global_metrics
+        # (TODO: Rename page to Program_metrics)
         aggregated_results = defaultdict(lambda: defaultdict(lambda: 0))
         for key_str, result in results.iteritems():
             key = WikiUserKey.fromstr(key_str)
@@ -57,4 +58,4 @@
                 summed_results[metric_name] += value
 
         # Encapsulate the results to be consistent with other metrics.
-        return {Aggregation.SUM: summed_results}
+        return {Aggregation.SUM: dict(summed_results)}
diff --git a/wikimetrics/models/report_nodes/validate_program_metrics_report.py 
b/wikimetrics/models/report_nodes/validate_program_metrics_report.py
new file mode 100644
index 0000000..a205690
--- /dev/null
+++ b/wikimetrics/models/report_nodes/validate_program_metrics_report.py
@@ -0,0 +1,55 @@
+import celery
+from report import ReportLeaf
+from celery import current_task
+from wikimetrics.api import CohortService
+
+
+class ValidateProgramMetricsReport(ReportLeaf):
+    """
+    Checks if the cohort is valid (it should be validated, and >=50% of the
+    cohort members must be valid.  If not, this ReportLeaf
+    can be added to a Report hierarchy and will report the appropriate errors
+    """
+    
+    show_in_ui = False
+    
+    def __init__(self, cohort, session, *args, **kwargs):
+        """
+        Parameters
+            cohort          : an instance of CohortStore
+            session         : DB session instance
+        """
+        super(ValidateProgramMetricsReport, self).__init__(*args, **kwargs)
+        self.cohort = cohort
+        self.session = session
+        self.validation_info = self.get_validation_info()
+        self.unique_users = self.validation_info['unique_users']
+        self.cohort_valid = self.is_cohort_valid()
+
+    def get_validation_info(self):
+        cohort_service = CohortService()
+        return cohort_service.get_validation_info(self.cohort,
+                                                  self.session,
+                                                  True)
+
+    def is_cohort_valid(self):
+        if self.validation_info['percentage_valid'] >= 50.0:
+            return True
+        else:
+            return False
+
+    def valid(self):
+        return self.cohort_valid
+
+    def run(self):
+        """
+        This will get executed if the instance is added into a Report node 
hierarchy
+        It outputs failure messages due to any invalid configuration.  None of 
these
+        failures should happen unless the user tries to hack the system.
+        """
+        self.set_status(celery.states.STARTED, task_id=current_task.request.id)
+        
+        message = ''
+        if not self.cohort_valid:
+            message += 'Cohort invalid: >=50% of the cohort members are not 
valid'
+        return {'FAILURE': message or 'False'}
diff --git a/wikimetrics/static/js/globalReportUpload.js 
b/wikimetrics/static/js/programMetricsReportUpload.js
similarity index 100%
rename from wikimetrics/static/js/globalReportUpload.js
rename to wikimetrics/static/js/programMetricsReportUpload.js
diff --git a/wikimetrics/templates/forms/global_report_upload.html 
b/wikimetrics/templates/forms/program_metrics_report_upload.html
similarity index 94%
rename from wikimetrics/templates/forms/global_report_upload.html
rename to wikimetrics/templates/forms/program_metrics_report_upload.html
index c171b25..08ff09f 100644
--- a/wikimetrics/templates/forms/global_report_upload.html
+++ b/wikimetrics/templates/forms/program_metrics_report_upload.html
@@ -1,14 +1,14 @@
 {% import 'forms/field_validation.html' as validation %}
 
 <form enctype="multipart/form-data"
-      action="{{url_for('global_reports_request')}}"
+      action="{{url_for('program_metrics_reports_request')}}"
       method="POST"
       class="upload-cohort form-horizontal">
 
     {{ form.csrf_token }}
 
     <div class="navbar-inner">
-        <h3>Launch a global report</h3>
+        <h3>Launch a Program Global Metrics report</h3>
     </div>
     <!-- These fields are hidden since the user doesn't have to provide values 
for them -->
     <input id="name" name="name" type="hidden" value="{{form.name.data}}"/>
@@ -61,6 +61,6 @@
         </div>
     </div>
     <div class="form-actions">
-        <input type="submit" class="btn btn-primary" value="Launch Global 
Report"/>
+        <input type="submit" class="btn btn-primary" value="Launch Program 
Global Metrics Report"/>
     </div>
-</form>
\ No newline at end of file
+</form>
diff --git a/wikimetrics/templates/global_reports.html 
b/wikimetrics/templates/program_metrics_reports.html
similarity index 70%
rename from wikimetrics/templates/global_reports.html
rename to wikimetrics/templates/program_metrics_reports.html
index 9a60a27..7655c00 100644
--- a/wikimetrics/templates/global_reports.html
+++ b/wikimetrics/templates/program_metrics_reports.html
@@ -1,11 +1,11 @@
 {% extends "layout.html" %}
 {% block body %}
-{% include "forms/global_report_upload.html" %}
+{% include "forms/program_metrics_report_upload.html" %}
 {% endblock %}
 
 {% block scripts %}
 <script 
src="//ajax.aspnetcdn.com/ajax/jquery.validate/1.11.1/jquery.validate.min.js"></script>
 <script src="{{ url_for('static', 
filename='js/bootstrap-datetimepicker.min.js') }}"></script>
 <script src="{{ url_for('static', filename='js/cohortUpload.js') }}"></script>
-<script src="{{ url_for('static', filename='js/globalReportUpload.js') 
}}"></script>
+<script src="{{ url_for('static', filename='js/programMetricsReportUpload.js') 
}}"></script>
 {% endblock %}
\ No newline at end of file

-- 
To view, visit https://gerrit.wikimedia.org/r/253750
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I1d4a6bba9f81962d01990754299a22a077ba61df
Gerrit-PatchSet: 20
Gerrit-Project: analytics/wikimetrics
Gerrit-Branch: master
Gerrit-Owner: Madhuvishy <mviswanat...@wikimedia.org>
Gerrit-Reviewer: Madhuvishy <mviswanat...@wikimedia.org>
Gerrit-Reviewer: Mforns <mfo...@wikimedia.org>
Gerrit-Reviewer: Milimetric <dandree...@wikimedia.org>
Gerrit-Reviewer: Nuria <nu...@wikimedia.org>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to