Yurik has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/144682

Change subject: Initial checkin - some data sanitizing code
......................................................................

Initial checkin - some data sanitizing code

Change-Id: I5e45f0ee8a8427307b4250e34cfcd030b7f355d6
---
A .gitignore
A download.py
A generate.py
3 files changed, 911 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/zero-sms 
refs/changes/82/144682/1

diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..0648918
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,7 @@
+/.idea
+/data
+/private.py
+*.pyc
+/ResultGraphs.xlsx
+/state
+/graphs
diff --git a/download.py b/download.py
new file mode 100644
index 0000000..c12ebab
--- /dev/null
+++ b/download.py
@@ -0,0 +1,331 @@
+# coding=utf-8
+import pipes
+import string
+import subprocess
+import locale
+from datetime import datetime, timedelta
+import re
+import os
+import json
+import traceback
+
+from boto.s3.connection import S3Connection
+import io
+import itertools
+import generate
+
+import private
+
+
+def generatePassword(size=10, chars=string.ascii_letters + string.digits):
+    """ Adapted from
+    
http://stackoverflow.com/questions/2257441/random-string-generation-with-upper-case-letters-and-digits-in-python
+    """
+    import random
+    return ''.join(random.choice(chars) for _ in range(size))
+
+
+class Processor(object):
+    dateFormat = '%Y-%m-%d'
+
+    def __init__(self, dataDir='data', workDir='state', graphDir='graphs', 
settingsFile='settings.json'):
+        self.dataDir = dataDir
+        self.workDir = workDir
+        self.graphDir = graphDir
+        if not os.path.exists(dataDir): os.mkdir(dataDir)
+        if not os.path.exists(workDir): os.mkdir(workDir)
+        if not os.path.exists(graphDir): os.mkdir(graphDir)
+
+        self.settingsFilePath = os.path.join(self.workDir, settingsFile)
+        self.combinedFilePath = os.path.join(self.workDir, 'combined.tsv')
+        self.tempFilePath = os.path.join(self.workDir, 'temp.tsv')
+        if os.path.exists(self.tempFilePath): os.remove(self.tempFilePath)
+        self.statsFilePath = os.path.join(self.workDir, 'combined.json')
+        if os.path.exists(self.statsFilePath): os.remove(self.statsFilePath)
+
+        data = self.loadState()
+
+        self.enableDownload = data['enableDownload'] if 'enableDownload' in 
data else True
+        self.enableDownloadOld = data['enableDownloadOld'] if 
'enableDownloadOld' in data else True
+        self.lastDownloadTs = self.parseDate(data['lastDownloadTs']) if 
'lastDownloadTs' in data else False
+        self.downloadOverlapDays = data['downloadOverlapDays'] if 
'downloadOverlapDays' in data else False
+        self.lastProcessedTs = self.parseDate(data['lastProcessedTs']) if 
'lastProcessedTs' in data else False
+        self.processOverlapDays = data['processOverlapDays'] if 
'processOverlapDays' in data else 1
+        self.smtpHost = data['smtpHost'] if 'smtpHost' in data else False
+        self.smtpFrom = data['smtpFrom'] if 'smtpFrom' in data else False
+        self.smtpTo = data['smtpTo'] if 'smtpTo' in data else False
+        self.lastErrorTs = self.parseDate(data['lastErrorTs']) if 
'lastErrorTs' in data else False
+        self.lastErrorMsg = data['lastErrorMsg'] if 'lastErrorMsg' in data 
else False
+        self.sortCmd = data['sortCmd'] if 'sortCmd' in data else 'sort'
+        self.lastGoodRunTs = self.parseDate(data['lastGoodRunTs']) if 
'lastGoodRunTs' in data else False
+        self.partnerMap = data['partnerMap'] if 'partnerMap' in data else {}
+        self.partnerDirMap = data['partnerDirMap'] if 'partnerDirMap' in data 
else {}
+        self.salt = data['salt'] if 'salt' in data else generatePassword()
+
+        if self.downloadOverlapDays and self.lastDownloadTs:
+            self.downloadIfAfter = self.lastDownloadTs - 
timedelta(days=self.downloadOverlapDays)
+        else:
+            self.downloadIfAfter = False
+
+        if self.lastProcessedTs:
+            self.processIfAfter = self.lastProcessedTs - 
timedelta(days=self.processOverlapDays)
+        else:
+            self.processIfAfter = False
+
+        # wikipedia_application_3.log.2014-06-11
+        self.fileRe = 
re.compile(ur'^wikipedia_application_\d+\.log\.(?P<date>\d{4}-\d{2}-\d{2})$', 
re.IGNORECASE)
+
+    def loadState(self):
+        if os.path.isfile(self.settingsFilePath):
+            with io.open(self.settingsFilePath, 'rb') as f:
+                return json.load(f)
+        return {}
+
+    def saveState(self):
+        data = self.loadState()
+        data['enableDownload'] = self.enableDownload
+        data['enableDownloadOld'] = self.enableDownloadOld
+        data['lastDownloadTs'] = self.formatDate(self.lastDownloadTs)
+        data['downloadOverlapDays'] = int(self.downloadOverlapDays) if 
self.downloadOverlapDays else False
+        data['lastProcessedTs'] = self.formatDate(self.lastProcessedTs)
+        data['processOverlapDays'] = int(self.processOverlapDays) if 
self.processOverlapDays else False
+        data['smtpHost'] = self.smtpHost
+        data['smtpFrom'] = self.smtpFrom
+        data['smtpTo'] = self.smtpTo
+        data['lastErrorTs'] = self.formatDate(self.lastErrorTs)
+        data['lastErrorMsg'] = self.lastErrorMsg
+        data['sortCmd'] = self.sortCmd
+        data['lastGoodRunTs'] = self.formatDate(self.lastGoodRunTs)
+        data['partnerMap'] = self.partnerMap
+        data['partnerDirMap'] = self.partnerDirMap
+        data['salt'] = self.salt
+
+        stateBk = self.settingsFilePath + '.bak'
+        with open(stateBk, 'wb') as f:
+            json.dump(data, f, indent=True, sort_keys=True)
+        if os.path.exists(self.statsFilePath):
+            os.remove(self.settingsFilePath)
+            os.rename(stateBk, self.settingsFilePath)
+
+    def parseDate(self, value):
+        return datetime.strptime(str(value), self.dateFormat) if 
isinstance(value, basestring) else value
+
+    def formatDate(self, value):
+        return value.strftime(self.dateFormat) if isinstance(value, datetime) 
else value
+
+    def getFileDate(self, filename):
+        m = self.fileRe.match(filename)
+        return self.parseDate(m.group('date')) if m else False
+
+    def download(self):
+        cn = S3Connection(private.aws_access_key_id, 
private.aws_secret_access_key)
+        prefix = 'prd-vumi-wikipedia.aws.prk-host.net/'
+        bucket = cn.get_bucket(private.bucket_name)
+        files = bucket.list(prefix)
+
+        for key in files:
+            filename = key.key[len(prefix):]
+            filePath = os.path.join(self.dataDir, filename)
+            fileDate = self.getFileDate(filename)
+            fileExists = os.path.exists(filePath)
+
+            if not self.enableDownloadOld and not fileDate:
+                print('Skipping legacy-named file %s' % filename)
+                continue
+            elif key.size == 0:
+                print('Skipping empty file %s' % filename)
+                continue
+            elif not fileExists:
+                reason = u"it doesn't exist"
+            elif key.size != os.stat(filePath).st_size:
+                reason = u'local size %s <> remote %s' % (
+                    locale.format(u"%d", os.stat(filePath).st_size, 
grouping=True),
+                    locale.format(u"%d", key.size, grouping=True))
+            elif fileDate and self.downloadIfAfter and fileDate > 
self.downloadIfAfter:
+                reason = u'date is too close to last file date %s' % 
self.downloadIfAfter
+            else:
+                continue
+
+            print('Downloading %s because %s' % (filename, reason))
+            if fileExists:
+                if os.stat(filePath).st_size == 0:
+                    print('Removing empty file %s' % filePath)
+                    os.remove(filePath)
+                else:
+                    bakCount = 0
+                    bakFile = filePath + '.bak'
+                    while os.path.exists(bakFile):
+                        bakCount += 1
+                        bakFile = filePath + '.bak' + str(bakCount)
+                    print('Renaming %s => %s' % (filePath, bakFile))
+                    os.rename(filePath, bakFile)
+
+            key.get_contents_to_filename(filePath)
+            if fileDate and (not self.lastDownloadTs or self.lastDownloadTs < 
fileDate):
+                self.lastDownloadTs = fileDate
+
+    def combineDataFiles(self, sourceFiles):
+
+        print 'Combining files into %s' % self.combinedFilePath
+        print 'Processing %s' % (('files on or after %s' % 
self.processIfAfter) if self.processIfAfter else 'all files')
+        with io.open(self.combinedFilePath, 'a', encoding='utf8') as dst:
+            totalCount = 0
+            for srcFile in sourceFiles:
+
+                fileDate = self.getFileDate(srcFile)
+                if self.processIfAfter:
+                    if not fileDate:
+                        continue # old style filename, and the processIfAfter 
is set
+                    elif fileDate <= self.processIfAfter:
+                        continue # we have already processed this file
+
+                srcFilePath = os.path.join(self.dataDir, srcFile)
+                if not os.path.isfile(srcFilePath):
+                    print 'File %s was not found, skipping' % srcFilePath
+                    continue
+                last = False
+                count = 0
+                for line in io.open(srcFilePath, 'r', encoding='utf8'):
+                    count += 1
+                    totalCount += 1
+                    if count == 1 or totalCount % 30000 == 0:
+                        print('File %s, line %d, total lines %d' % (srcFile, 
count-1, totalCount-1))
+
+                    l = line.strip('\n\r')
+                    if u' WIKI\t' in l:
+                        self.writeLine(dst, last)
+                        last = l
+                    elif len(l) > 2 and l[0] == u'2' and l[1] == u'0':
+                        self.writeLine(dst, last)
+                        last = False
+                    elif isinstance(last, basestring):
+                        last = last + '\t' + l
+
+                self.writeLine(dst, last)
+                if fileDate and (not self.lastProcessedTs or 
self.lastProcessedTs < fileDate):
+                    self.lastProcessedTs = fileDate
+
+    def writeLine(self, dst, line):
+        if not line:
+            return
+        line = line.replace(u'\0', u'\\0')
+        parts = line.split('\t')
+        if parts[1][0] == u'+':
+            return
+        parts = [p[2:-1]
+                 if (p.startswith(u"u'") and p.endswith(u"'")) or 
(p.startswith(u'u"') and p.endswith(u'"'))
+                 else p for p in parts]
+        tmp = parts[0]
+        parts[0] = parts[1]
+        parts[1] = tmp\
+            .replace(u' [VumiRedis,client] WIKI', u'') \
+            .replace(u' [HTTP11ClientProtocol,client] WIKI', u'') \
+            .replace(u'+0000', u'')
+
+        if len(parts) > 5 and parts[5].startswith(u'content='):
+            parts[5] = u'content=' + str(len(parts[5]) - 10)
+
+        if len(parts) > 6:
+            parts[6] = parts[6].replace('\0', '\\0')
+
+        dst.write('\t'.join(parts) + '\n')
+
+    def sort(self):
+
+        args = [self.sortCmd, '-u', '-o', self.tempFilePath, 
self.combinedFilePath]
+        cmd = ' '.join([pipes.quote(v) for v in args])
+        print('\nSorting: %s' % cmd)
+        try:
+            tmp2 = self.tempFilePath + '2'
+            if os.path.exists(tmp2):
+                os.remove(tmp2)
+
+            subprocess.check_output(args, stderr=subprocess.STDOUT)
+
+            # Extra safety - keep old file until we rename temp to its name
+            os.rename(self.combinedFilePath, tmp2)
+            os.rename(self.tempFilePath, self.combinedFilePath)
+            os.remove(tmp2)
+
+        except subprocess.CalledProcessError, ex:
+            raise Exception(u'Error %s running %s\nOutput:\n%s' % 
(ex.returncode, cmd, ex.output))
+
+    def error(self, error):
+        self.lastErrorTs = datetime.now()
+        self.lastErrorMsg = error
+
+        print(error)
+
+        if not self.smtpHost or not self.smtpFrom or not self.smtpTo:
+            return
+
+        import smtplib
+        from email.mime.text import MIMEText
+        from email.mime.multipart import MIMEMultipart
+
+        msg = MIMEMultipart('alternative')
+        msg['Subject'] = 'SMS report error'
+        msg.attach(MIMEText(error, 'plain', 'utf-8'))
+
+        # m = MIMEText(error, 'plain', 'utf-8')
+        # m['From'] = self.smtpFrom
+        # m['To'] = self.smtpTo
+        # m['Subject'] = msg['Subject']
+
+        s = smtplib.SMTP(self.smtpHost)
+        s.sendmail(self.smtpFrom, self.smtpTo, msg.as_string().encode('ascii'))
+        s.quit()
+
+    def generateGraphData(self, skipParsing=False):
+        # if ((parts[2] == 'airtel_ke_ussd_transport' and parts[3] == 'ussd' 
and parts[4] == 'airtel') or
+        # (parts[2] == 'airtel_ke_sms_transport' and parts[3] == 'sms' and 
parts[4] == '')):
+        #     parts[2:5] = ['airtel']
+        # elif ((parts[2] == 'vumi_starcode_transport' and parts[3] == 'ussd' 
and parts[4] == '') or
+        #       (parts[2] == 'smpp_transport' and parts[3] == 'sms' and 
parts[4] == '')):
+        #     parts[2:5] = ['vumi']
+        # elif parts[2] == 'zambia_cellulant_ussd_transport' and parts[3] == 
'ussd' and parts[4] == '':
+        #     parts[2:5] = ['zambia-cellulant']
+        # elif parts[2] == 'ambient_go_smpp_transport' and parts[3] == 'sms' 
and parts[4] == '':
+        #     parts[2:5] = ['ambient_go']
+        # elif ((parts[2] == 'truteq_8864_transport' or parts[2] == 
'truteq_32323_transport') and parts[3] == 'ussd' and
+        #       parts[4] == ''):
+        #     parts[2:5] = ['truteq']
+        # elif parts[2] == 'equity_kenya_ussd_smpp_transport' and parts[3] == 
'ussd' and parts[4] == '':
+        #     parts[2:5] = ['equity_ke']
+        # else:
+        #     raise BaseException(line)
+
+        stats = generate.Stats(self.combinedFilePath, self.graphDir, 
self.statsFilePath, self.partnerMap, self.partnerDirMap, self.salt)
+
+        if not skipParsing:
+            print('\nParsing data')
+            stats.process()
+            stats.pickle()
+        else:
+            print('Loading parsed data')
+            stats.unpickle()
+
+        print('Generating data files to to %s' % self.graphDir)
+        # stats.dumpStats()
+        stats.createGraphs()
+
+    def run(self):
+        # noinspection PyBroadException
+        try:
+            # if self.enableDownload:
+            #     self.download()
+            # files = os.listdir(self.dataDir)
+            # # files = itertools.chain([os.path.join('pc', f) for f in 
os.listdir(os.path.join(self.dataDir, 'pc'))], files)
+            # self.combineDataFiles(files)
+            # self.sort()
+
+            self.generateGraphData(True)
+            self.lastGoodRunTs = datetime.now()
+        except:
+            self.error(traceback.format_exc())
+        self.saveState()
+
+
+if __name__ == "__main__":
+    prc = Processor()
+    prc.run()
\ No newline at end of file
diff --git a/generate.py b/generate.py
new file mode 100644
index 0000000..4c585a2
--- /dev/null
+++ b/generate.py
@@ -0,0 +1,573 @@
+import io
+import json
+from operator import itemgetter
+import os
+from datetime import *
+from collections import defaultdict
+from itertools import *
+
+# Daily totals -
+#
+# A. Number of sessions initiated
+# B. Number of unique users initiating sessions
+# C. Number of sessions cancelled at search box
+# D. Number of sessions initiated and sent an SMS response of any non-zero 
number of SMS's
+# E.  Number of sessions initiated and sent an SMS response, and had no option 
to reply for more
+# F.  Number of sessions initiated and sent an SMS response, and were 
presented with an option to reply for more
+# G.  Of the number above ('F'), how many of those opted to receive more 
information
+# H.  Average number of 'reply for more' requests during a single session 
(from the set of users in 'D')
+# I.    Average number of SMS's sent (from the set of users in 'D')
+#J.   Total number of SMS's sent (from the set of users in 'D')
+#
+#Hourly totals for 24-hour daily period
+#
+#A. Number of sessions initiated
+#B. Number of sessions initiated and sent an SMS response of any non-zero 
number of SMS's
+import re
+import unicodedata
+
+stateNames = {
+    u'start': u'0 start',
+    u'titles': u'1 titles',
+    u'section-invalid': u'1 wrong title',
+    u'section': u'2 sections',
+    u'content-invalid': u'2 wrong section',
+    u'ussdcontent': u'3 ussd content',
+    u'smscontent': u'3 sms content',
+    u'smscontent-2': u'3 sms content (2)',
+    u'smscontent-3+': u'3 sms content (3+)',
+    u'more-no-content': u'4 no-more-content',
+    u'more-no-session': u'5 no-more-session',
+    u'more-no-session-2': u'5 no-more-session (2)',
+    u'more-no-session-3+': u'5 no-more-session (3+)',
+    u'newuser': u'new user',
+}
+
+goodStates = [
+    u'start',
+    u'titles',
+    u'section',
+    u'smscontent',
+    u'smscontent-2',
+    u'smscontent-3+',
+]
+
+# State machine:
+#
+# start   ->   titles   ->   section -> ussdcontent -> smscontent(+) -> 
more-no-content
+#                v              v
+#        section-invalid  content-invalid
+#
+# NOTES:
+#   Sometimes smscontent appears in the logs before ussdcontent
+#
+okTransitions = {
+    (u'', u'start'),
+    (u'start', u'titles'),
+    (u'titles', u'section'),
+    (u'titles', u'section-invalid'),
+    (u'section', u'ussdcontent'),
+    (u'section', u'smscontent'),
+    (u'section', u'content-invalid'),
+    (u'ussdcontent', u'smscontent'),
+    (u'smscontent', u'ussdcontent'),
+    (u'smscontent', u'smscontent'),
+    (u'smscontent', u'more-no-content'),
+    (u'smscontent', u'more-no-session'),
+    (u'more-no-content', u'more-no-session'),
+    (u'more-no-session', u'more-no-session'),
+}
+
+multiactions = {u'smscontent', u'more-no-content', u'more-no-session'}
+
+entrySpecials = {'id', 'ts', 'partner'}
+
+
+class Entry(object):
+    def __init__(self, userId, ts, partner):
+        self.id = userId
+        self.ts = ts
+        self.partner = partner
+
+    def __setitem__(self, key, item):
+        self.__dict__[key] = item
+
+    def __getitem__(self, key):
+        return self.__dict__[key]
+
+    def __iter__(self):
+        return iter(self.__dict__)
+
+    def entryItems(self):
+        for v in self.__dict__.items():
+            if v[0] not in entrySpecials:
+                yield v
+
+    def __repr__(self):
+        return repr(self.__dict__)
+
+    def __len__(self):
+        return len(self.__dict__)
+
+    def __delitem__(self, key):
+        del self.__dict__[key]
+
+    def keys(self):
+        return self.__dict__.keys()
+
+    def values(self):
+        return self.__dict__.values()
+
+    def __cmp__(self, dict):
+        return cmp(self.__dict__, dict)
+
+    def __contains__(self, item):
+        return item in self.__dict__
+
+    def add(self, key, value):
+        self.__dict__[key] = value
+
+    def __call__(self):
+        return self.__dict__
+
+    def __unicode__(self):
+        return unicode(repr(self.__dict__))
+
+    def items(self):
+        return self.__dict__.items()
+
+
+# class SumEntryEncoder(json.JSONEncoder):
+#     def default(self, o):
+#         return super(SumEntryEncoder, self).default(o)
+
+
+class SumEntry(object):
+    def __init__(self, value=-1):
+        """
+        :type value: dict|string|False
+        """
+        if isinstance(value, dict):
+            self.__dict__ = value
+        else:
+            self.count = 1
+            if value >= 0:
+                self.sum = value
+                self.min = value
+                self.max = value
+
+    def addValue(self, value):
+        self.count += 1
+        if value >= 0:
+            self.sum += value
+            if self.min > value:
+                self.min = value
+            if self.max < value:
+                self.max = value
+
+    def countOnly(self):
+        return 'sum' not in self.__dict__
+
+
+class Stats(object):
+    def __init__(self, sourceFile, graphDir, stateFile, partnerMap=None, 
partnerDirMap=None, salt=''):
+        self.sourceFile = sourceFile
+        self.graphDir = graphDir
+        self.stateFile = stateFile
+        self.partnerMap = partnerMap if partnerMap is not None else {}
+        self.partnerDirMap = partnerDirMap if partnerDirMap is not None else {}
+        self.salt = salt
+        self.stats = {}
+
+    def _addStats(self, partner, stage, key2, value=-1):
+        p = partner if partner is not None else 'allpartners'
+        try:
+            partnerStat = self.stats[p]
+        except KeyError:
+            partnerStat = dict()
+            self.stats[p] = partnerStat
+        try:
+            stat = partnerStat[stage]
+        except KeyError:
+            stat = dict()
+            partnerStat[stage] = stat
+        if key2 not in stat:
+            stat[key2] = SumEntry(value)
+        else:
+            stat[key2].addValue(value)
+        # two-stage addition - one for partner, one total
+        if partner is not None:
+            self._addStats(None, stage, key2, value)
+
+    def _cleanupStats(self):
+        del self.unique
+        del self.newUserUnique
+        for partner, partnerData in self.stats.items():
+            if partner == 'allpartners':
+                continue
+            for k in list([i for i in partnerData if u'_usrmonth_' in i]):
+                kk = k.split(u'_', 2)
+                # removing top 1% of the heavy users
+                vals = list(sorted(partnerData[k].values(), key=lambda l: 
l.count))
+                vals = vals[0:len(vals) - int(len(vals) * 0.01)]
+                for v in vals:
+                    self._addStats(partner, kk[0] + u'_' + kk[1], kk[1] + u'_' 
+ kk[2], v.count)
+                del partnerData[k]
+                if k in self.stats['allpartners']:
+                    del self.stats['allpartners'][k]
+
+    def _addStatsUniqueUser(self, partner, key2, userId):
+        if userId not in self.newUserUnique:
+            self.newUserUnique.add(userId)
+            self._addStats(partner, u'newuser', key2)
+
+
+    def _addStatsUnique(self, partner, stage, key2, userId):
+        u = self.unique[stage]
+        if key2 not in u:
+            u[key2] = {userId}
+        elif userId not in u[key2]:
+            u[key2].add(userId)
+        else:
+            return
+        self._addStats(partner, stage, key2)
+
+    #        if not isError:
+    #            key2 = u'hourly_' + ts.strftime(u'%Y-%m-%d %H') + u':00'
+    #            self._addStats(partner, key, key2, value)
+
+    def addStats(self, partner, stage, ts, userId, value=-1, isError=False):
+        #        self._addStats(partner, key, u'_totals', value)
+        #        self._addStatsUnique(partner, stage + u'_unique', u'_totals', 
id)
+
+        key2 = u'daily_' + ts.strftime(u'%Y-%m-%d')
+        self._addStats(partner, stage, key2, value)
+        self._addStatsUnique(partner, stage + u'_unique', key2, userId)
+
+        self._addStats(partner, stage + u'_usrmonth_' + ts.strftime(u'%m-%Y'), 
userId)
+
+    def countStats(self, entry):
+        ts = entry.ts
+        userId = entry.id
+        self.addStats(entry.partner, u'start', ts, userId)
+        self._addStatsUniqueUser(entry.partner, u'daily_' + 
ts.strftime(u'%Y-%m-%d'), userId)
+        for k, v in entry.entryItems():
+            if type(v) is list:
+                maxN = 2
+                for i in range(min(len(v), maxN + 1)):
+                    self.addStats(entry.partner,
+                                  k + (u'' if i == 0 else '-' + str(i + 1) + 
('+' if i == maxN else '')),
+                                  ts, userId, v[i])
+            else:
+                self.addStats(entry.partner, k, ts, userId, v)
+
+    def process(self):
+        self.unique = defaultdict(dict)
+        self.newUserUnique = set()
+
+        cId = 0
+        cTime = 1
+        cPartner = 4
+        cAction = 5
+        cContent = 6
+
+        fErr = io.open(os.path.join(self.graphDir, 'errors.txt'), 'w', 
encoding='utf8')
+
+        isError = False
+        lastAction = u''
+        lastLine = u''
+        lastParts = False
+        entry = None
+        for line in io.open(self.sourceFile, encoding='utf8'):
+            if line == u'':
+                break
+            if line == lastLine:
+                continue
+            lastLine = line
+
+            parts = [v.strip() for v in line.split(u'\t')]
+            if len(parts) > cContent and 
parts[cContent].startswith(u'content='):
+                parts[cContent] = u'content=' + str(len(parts[cContent]) - 10) 
+ u'chars'
+
+            action = parts[cAction]
+            timestamp = datetime.strptime(parts[cTime], u'%Y-%m-%d %H:%M:%S')
+            isNew = entry is None or entry.id != parts[cId] or action == 
u'start'
+
+            if isNew:
+                if entry is not None:
+                    self.countStats(entry)
+                partnerKey = u'|'.join(parts[cPartner - 2:cPartner + 1])
+                if partnerKey in self.partnerMap:
+                    partner = self.partnerMap[partnerKey]
+                else:
+                    if parts[cPartner] == u'':
+                        partner = u'-'.join(parts[cPartner - 2:cPartner])
+                    else:
+                        partner = parts[cPartner]
+                    self.partnerMap[partnerKey] = partner
+                entry = Entry(parts[cId], timestamp, partner)
+                lastParts = parts
+                lastAction = u''
+                isError = False
+
+            transition = (lastAction, action)
+            secondsFromStart = int((timestamp - entry.ts).total_seconds())
+            isMultiAction = action in multiactions
+            isNewAction = action not in entry
+
+            if isError or transition not in okTransitions or (not isNewAction 
and not isMultiAction):
+                if lastParts:
+                    fErr.write(u'\n' + lastParts[0] + u'\n' + 
(u'\t'.join(lastParts[1:])) + u'\n')
+                parts[cPartner] = str(secondsFromStart)
+                del parts[cId]
+                fErr.write(u'\t'.join(parts) + u'\n')
+                self.addStats(entry.partner, u'err--bad-transitions', 
timestamp, entry.id, secondsFromStart,
+                              isError=True)
+                key = (u'err-cont-' if isError else u'err-new-') + 
transition[0] + u'-' + transition[1]
+                self.addStats(entry.partner, key, timestamp, entry.id, 
secondsFromStart, isError=True)
+                lastParts = False
+                isError = True
+            elif not isNew:
+                # noinspection PyTypeChecker
+                lastParts = lastParts + [str(secondsFromStart)] + 
parts[cAction:]
+                if isNewAction:
+                    entry[action] = [secondsFromStart] if isMultiAction else 
secondsFromStart
+                else:
+                    entry[action].append(secondsFromStart)
+
+            lastAction = action
+
+        if lastParts:
+            self.countStats(entry)
+
+        self._cleanupStats()
+
+    def pickle(self):
+        with open(self.stateFile, 'wb') as f:
+            self.recursiveConvert(self.stats)
+            json.dump(self.stats, f, indent=True, sort_keys=True)
+            self.recursiveConvert(self.stats)
+
+    def unpickle(self):
+        with io.open(self.stateFile, 'rb') as f:
+            self.stats = json.load(f)
+        self.recursiveConvert(self.stats)
+
+    def recursiveConvert(self, d):
+        for (k, v) in d.items():
+            if isinstance(v, dict):
+                if 'count' in v:
+                    d[k] = SumEntry(v)
+                else:
+                    self.recursiveConvert(v)
+            elif isinstance(v, SumEntry):
+                d[k] = v.__dict__
+
+    def dumpStats(self):
+
+        with io.open(os.path.join(self.graphDir, 'results-err.txt'), 'w', 
encoding='utf8') as err, \
+                io.open(os.path.join(self.graphDir, 'results-err-totals.txt'), 
'w', encoding='utf8') as err_t, \
+                io.open(os.path.join(self.graphDir, 'results-totals.txt'), 
'w', encoding='utf8') as res_t, \
+                io.open(os.path.join(self.graphDir, 'results-daily.txt'), 'w', 
encoding='utf8') as res_d, \
+                io.open(os.path.join(self.graphDir, 'results-hourly.txt'), 
'w', encoding='utf8') as res_hr:
+
+            hdr = u'\t'.join(
+                [u'distinct', u'cont', u'error', u'frequency', u'timestamp', 
u'count', u'avg', u'min', u'max']) + u'\n'
+            err.write(hdr)
+            err_t.write(hdr)
+            hdr = u'\t'.join(
+                [u'distinct', u'state', u'frequency', u'timestamp', u'count', 
u'avg', u'min', u'max']) + u'\n'
+            res_t.write(hdr)
+            res_d.write(hdr)
+            res_hr.write(hdr)
+
+            allStats = self.stats['allpartners']
+            for k in sorted(allStats):
+                v = allStats[k]
+                key = k
+                isError = key.startswith(u'err-')
+                if key.endswith(u'_unique'):
+                    key = key[0:-len(u'_unique')]
+                    line = u'unique'
+                elif key.endswith(u'_usrmonth'):
+                    key = key[0:-len(u'_usrmonth')]
+                    line = u'usrmonth'
+                elif key == u'newuser':
+                    line = key
+                else:
+                    line = u'total'
+                line += u'\t'
+                if isError:
+                    k1, k2, k3 = key.split(u'-', 2)
+                    line += k2 + u'\t' + k3
+                elif key in stateNames:
+                    line += stateNames[key]
+                else:
+                    line += key
+                line += u'\t'
+
+                for kk in sorted(v):
+                    vv = v[kk]
+                    if kk == u'_totals':
+                        l = line + u'total\t\t'
+                    else:
+                        l = line + kk.replace(u'_', u'\t') + u'\t'
+                    if vv.countOnly():
+                        l += unicode(vv.count)
+                    else:
+                        l += u'%d\t%g\t%d\t%d' % (vv.count, vv.sum / vv.count, 
vv.min, vv.max)
+
+                    if kk.startswith(u'hourly_'):
+                        f = err if isError else res_hr
+                    elif kk == u'_totals':
+                        f = err_t if isError else res_t
+                    else:
+                        f = err if isError else res_d
+                    f.write(l + u'\n')
+
+    def makePartnerDir(self, partner):
+        partnerKey = self.partnerDirMap[partner]
+
+        dashboard = os.path.join(self.graphDir, 'dashboards')
+        if not os.path.exists(dashboard):
+            os.mkdir(dashboard)
+        datafiles = os.path.join(self.graphDir, 'datafiles')
+        if not os.path.exists(datafiles):
+            os.mkdir(datafiles)
+        dataDir = os.path.join(datafiles, partnerKey)
+        if not os.path.exists(dataDir):
+            os.mkdir(dataDir)
+
+        # Create an empty file with the partner's name to easily see who is who
+        # From 
http://stackoverflow.com/questions/295135/turn-a-string-into-a-valid-filename-in-python
+        sanitizedPartner = unicodedata.normalize('NFKD', 
unicode(partner)).encode('ascii', 'ignore')
+        sanitizedPartner = unicode(re.sub('[^\w\s-]', '', 
sanitizedPartner).strip().lower())
+        sanitizedPartner = re.sub('[-\s]+', '-', sanitizedPartner)
+        infoFile = os.path.join(dataDir, sanitizedPartner)
+        if not os.path.exists(infoFile): open(infoFile, 'a').close()
+
+        # Create dashboard
+        dashboardFile = os.path.join(dashboard, partnerKey + '.json')
+        if not os.path.exists(dashboardFile):
+            data = {
+                "id": partnerKey,
+                "headline": partner,
+                # "subhead": "subtitle",
+                "tabs": [
+                    {
+                        "name": "Graphs",
+                        "graph_ids": [
+                            
"http://gp.wmflabs.org/data/datafiles/gp_zero_local/"; + partnerKey + 
"/states-count-per-day.tsv"
+                        ]
+                    }
+                ]
+            }
+            with open(dashboardFile, 'wb') as f:
+                json.dump(data, f, indent=True, sort_keys=True)
+
+        return dataDir
+
+    def createGraphs(self):
+
+        states = sorted([stateNames[v] for v in goodStates])
+
+        for partner, data in self.stats.items():
+            if partner in self.partnerDirMap:
+                partnerKey = self.partnerDirMap[partner]
+            else:
+                import hashlib
+                partnerKey = hashlib.sha224(partner + self.salt).hexdigest()
+                self.partnerDirMap[partner] = partnerKey
+
+            partnerDir = self.makePartnerDir(partner)
+            self.createStatesGraph(partnerDir, data, states)
+
+    def createStatesGraph(self, partnerDir, data, states):
+        d = sorted(self.filterData(data, yieldTuple=True),
+                   key=lambda v:
+                        v[0] + v[1])
+                        # v[u'date'] + v[u'state'])
+        # groups = groupby(d, key=itemgetter(''))
+        #     [{'type':k, 'items':[x[0] for x in v]} for k, v in groups]
+        # from itertools import groupby, islice
+        # from operator import itemgetter
+        # from collections import defaultdict
+
+        # probably splitting this up in multiple lines would be more readable
+        pivot = (
+            (ts,
+             defaultdict(lambda: '', (islice(d, 1, None) for d in dd))
+            )
+            for ts, dd in groupby(d, itemgetter(0)))
+
+        resultFile = os.path.join(partnerDir, 'states-count-per-day.tsv')
+        with io.open(resultFile, 'w', encoding='utf8') as f:
+            f.write(u'date\t' + u'\t'.join(states) + u'\n')
+            for ts, counts in pivot:
+                f.write(ts + u'\t' + u'\t'.join(str(counts[s]) for s in 
states)+u'\n')
+
+
+    def filterData(self, data, isError=False, isNewUser=False, isUnique=False, 
knownState=True, includeStats=False, yieldTuple=False):
+        for key, dates in data.items():
+            isErr, state, type = self.splitKey(key)
+            if isErr != isError:
+                continue
+            if (type == u'unique') != isUnique:
+                continue
+            if (type == u'newuser') != isNewUser:
+                continue
+            if (state in stateNames) != knownState:
+                continue
+            state = stateNames[state]
+            for dateStr, e in dates.items():
+                if not dateStr.startswith(u'daily_'):
+                    continue
+                ts = dateStr[len(u'daily_'):]
+                if yieldTuple:
+                    yield (ts, state, e.count) if not includeStats else (ts, 
state, e.count, e.min, e.avg, e.max)
+                else:
+                    res = {
+                        u'date': ts,
+                        u'state': state,
+                        u'count': e.count,
+                    }
+                    if includeStats:
+                        res[u'avg'] = e.avg
+                        res[u'min'] = e.min
+                        res[u'max'] = e.max
+                    yield res
+
+    def splitKey(self, key):
+        isError = key.startswith(u'err-')
+        if isError:
+            key = key[len(u'err-'):]
+
+        if key.endswith(u'_unique'):
+            key = key[0:-len(u'_unique')]
+            tp = u'unique'
+        elif key.endswith(u'_usrmonth'):
+            key = key[0:-len(u'_usrmonth')]
+            tp = u'usrmonth'
+        elif key == u'newuser':
+            tp = key
+        else:
+            tp = u'total'
+
+        return isError, key, tp
+
+
+if __name__ == '__main__':
+    stats = Stats('state/tmp.tsv', 'graphs', 'state/tmp.json')
+    # stats = Stats('state/combined.tsv', 'graphs', 'state/combined.json')
+
+    # stats.process()
+    #
+    # stats.pickle()
+
+    stats.unpickle()
+
+    stats.createGraphs()
+
+    stats.dumpStats()
+
+    pass
\ No newline at end of file

-- 
To view, visit https://gerrit.wikimedia.org/r/144682
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I5e45f0ee8a8427307b4250e34cfcd030b7f355d6
Gerrit-PatchSet: 1
Gerrit-Project: analytics/zero-sms
Gerrit-Branch: master
Gerrit-Owner: Yurik <yu...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to