Yurik has uploaded a new change for review. https://gerrit.wikimedia.org/r/144682
Change subject: Initial checkin - some data sanitizing code ...................................................................... Initial checkin - some data sanitizing code Change-Id: I5e45f0ee8a8427307b4250e34cfcd030b7f355d6 --- A .gitignore A download.py A generate.py 3 files changed, 911 insertions(+), 0 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/analytics/zero-sms refs/changes/82/144682/1 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0648918 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +/.idea +/data +/private.py +*.pyc +/ResultGraphs.xlsx +/state +/graphs diff --git a/download.py b/download.py new file mode 100644 index 0000000..c12ebab --- /dev/null +++ b/download.py @@ -0,0 +1,331 @@ +# coding=utf-8 +import pipes +import string +import subprocess +import locale +from datetime import datetime, timedelta +import re +import os +import json +import traceback + +from boto.s3.connection import S3Connection +import io +import itertools +import generate + +import private + + +def generatePassword(size=10, chars=string.ascii_letters + string.digits): + """ Adapted from + http://stackoverflow.com/questions/2257441/random-string-generation-with-upper-case-letters-and-digits-in-python + """ + import random + return ''.join(random.choice(chars) for _ in range(size)) + + +class Processor(object): + dateFormat = '%Y-%m-%d' + + def __init__(self, dataDir='data', workDir='state', graphDir='graphs', settingsFile='settings.json'): + self.dataDir = dataDir + self.workDir = workDir + self.graphDir = graphDir + if not os.path.exists(dataDir): os.mkdir(dataDir) + if not os.path.exists(workDir): os.mkdir(workDir) + if not os.path.exists(graphDir): os.mkdir(graphDir) + + self.settingsFilePath = os.path.join(self.workDir, settingsFile) + self.combinedFilePath = os.path.join(self.workDir, 'combined.tsv') + self.tempFilePath = os.path.join(self.workDir, 'temp.tsv') + if os.path.exists(self.tempFilePath): os.remove(self.tempFilePath) + self.statsFilePath = os.path.join(self.workDir, 'combined.json') + if os.path.exists(self.statsFilePath): os.remove(self.statsFilePath) + + data = self.loadState() + + self.enableDownload = data['enableDownload'] if 'enableDownload' in data else True + self.enableDownloadOld = data['enableDownloadOld'] if 'enableDownloadOld' in data else True + self.lastDownloadTs = self.parseDate(data['lastDownloadTs']) if 'lastDownloadTs' in data else False + self.downloadOverlapDays = data['downloadOverlapDays'] if 'downloadOverlapDays' in data else False + self.lastProcessedTs = self.parseDate(data['lastProcessedTs']) if 'lastProcessedTs' in data else False + self.processOverlapDays = data['processOverlapDays'] if 'processOverlapDays' in data else 1 + self.smtpHost = data['smtpHost'] if 'smtpHost' in data else False + self.smtpFrom = data['smtpFrom'] if 'smtpFrom' in data else False + self.smtpTo = data['smtpTo'] if 'smtpTo' in data else False + self.lastErrorTs = self.parseDate(data['lastErrorTs']) if 'lastErrorTs' in data else False + self.lastErrorMsg = data['lastErrorMsg'] if 'lastErrorMsg' in data else False + self.sortCmd = data['sortCmd'] if 'sortCmd' in data else 'sort' + self.lastGoodRunTs = self.parseDate(data['lastGoodRunTs']) if 'lastGoodRunTs' in data else False + self.partnerMap = data['partnerMap'] if 'partnerMap' in data else {} + self.partnerDirMap = data['partnerDirMap'] if 'partnerDirMap' in data else {} + self.salt = data['salt'] if 'salt' in data else generatePassword() + + if self.downloadOverlapDays and self.lastDownloadTs: + self.downloadIfAfter = self.lastDownloadTs - timedelta(days=self.downloadOverlapDays) + else: + self.downloadIfAfter = False + + if self.lastProcessedTs: + self.processIfAfter = self.lastProcessedTs - timedelta(days=self.processOverlapDays) + else: + self.processIfAfter = False + + # wikipedia_application_3.log.2014-06-11 + self.fileRe = re.compile(ur'^wikipedia_application_\d+\.log\.(?P<date>\d{4}-\d{2}-\d{2})$', re.IGNORECASE) + + def loadState(self): + if os.path.isfile(self.settingsFilePath): + with io.open(self.settingsFilePath, 'rb') as f: + return json.load(f) + return {} + + def saveState(self): + data = self.loadState() + data['enableDownload'] = self.enableDownload + data['enableDownloadOld'] = self.enableDownloadOld + data['lastDownloadTs'] = self.formatDate(self.lastDownloadTs) + data['downloadOverlapDays'] = int(self.downloadOverlapDays) if self.downloadOverlapDays else False + data['lastProcessedTs'] = self.formatDate(self.lastProcessedTs) + data['processOverlapDays'] = int(self.processOverlapDays) if self.processOverlapDays else False + data['smtpHost'] = self.smtpHost + data['smtpFrom'] = self.smtpFrom + data['smtpTo'] = self.smtpTo + data['lastErrorTs'] = self.formatDate(self.lastErrorTs) + data['lastErrorMsg'] = self.lastErrorMsg + data['sortCmd'] = self.sortCmd + data['lastGoodRunTs'] = self.formatDate(self.lastGoodRunTs) + data['partnerMap'] = self.partnerMap + data['partnerDirMap'] = self.partnerDirMap + data['salt'] = self.salt + + stateBk = self.settingsFilePath + '.bak' + with open(stateBk, 'wb') as f: + json.dump(data, f, indent=True, sort_keys=True) + if os.path.exists(self.statsFilePath): + os.remove(self.settingsFilePath) + os.rename(stateBk, self.settingsFilePath) + + def parseDate(self, value): + return datetime.strptime(str(value), self.dateFormat) if isinstance(value, basestring) else value + + def formatDate(self, value): + return value.strftime(self.dateFormat) if isinstance(value, datetime) else value + + def getFileDate(self, filename): + m = self.fileRe.match(filename) + return self.parseDate(m.group('date')) if m else False + + def download(self): + cn = S3Connection(private.aws_access_key_id, private.aws_secret_access_key) + prefix = 'prd-vumi-wikipedia.aws.prk-host.net/' + bucket = cn.get_bucket(private.bucket_name) + files = bucket.list(prefix) + + for key in files: + filename = key.key[len(prefix):] + filePath = os.path.join(self.dataDir, filename) + fileDate = self.getFileDate(filename) + fileExists = os.path.exists(filePath) + + if not self.enableDownloadOld and not fileDate: + print('Skipping legacy-named file %s' % filename) + continue + elif key.size == 0: + print('Skipping empty file %s' % filename) + continue + elif not fileExists: + reason = u"it doesn't exist" + elif key.size != os.stat(filePath).st_size: + reason = u'local size %s <> remote %s' % ( + locale.format(u"%d", os.stat(filePath).st_size, grouping=True), + locale.format(u"%d", key.size, grouping=True)) + elif fileDate and self.downloadIfAfter and fileDate > self.downloadIfAfter: + reason = u'date is too close to last file date %s' % self.downloadIfAfter + else: + continue + + print('Downloading %s because %s' % (filename, reason)) + if fileExists: + if os.stat(filePath).st_size == 0: + print('Removing empty file %s' % filePath) + os.remove(filePath) + else: + bakCount = 0 + bakFile = filePath + '.bak' + while os.path.exists(bakFile): + bakCount += 1 + bakFile = filePath + '.bak' + str(bakCount) + print('Renaming %s => %s' % (filePath, bakFile)) + os.rename(filePath, bakFile) + + key.get_contents_to_filename(filePath) + if fileDate and (not self.lastDownloadTs or self.lastDownloadTs < fileDate): + self.lastDownloadTs = fileDate + + def combineDataFiles(self, sourceFiles): + + print 'Combining files into %s' % self.combinedFilePath + print 'Processing %s' % (('files on or after %s' % self.processIfAfter) if self.processIfAfter else 'all files') + with io.open(self.combinedFilePath, 'a', encoding='utf8') as dst: + totalCount = 0 + for srcFile in sourceFiles: + + fileDate = self.getFileDate(srcFile) + if self.processIfAfter: + if not fileDate: + continue # old style filename, and the processIfAfter is set + elif fileDate <= self.processIfAfter: + continue # we have already processed this file + + srcFilePath = os.path.join(self.dataDir, srcFile) + if not os.path.isfile(srcFilePath): + print 'File %s was not found, skipping' % srcFilePath + continue + last = False + count = 0 + for line in io.open(srcFilePath, 'r', encoding='utf8'): + count += 1 + totalCount += 1 + if count == 1 or totalCount % 30000 == 0: + print('File %s, line %d, total lines %d' % (srcFile, count-1, totalCount-1)) + + l = line.strip('\n\r') + if u' WIKI\t' in l: + self.writeLine(dst, last) + last = l + elif len(l) > 2 and l[0] == u'2' and l[1] == u'0': + self.writeLine(dst, last) + last = False + elif isinstance(last, basestring): + last = last + '\t' + l + + self.writeLine(dst, last) + if fileDate and (not self.lastProcessedTs or self.lastProcessedTs < fileDate): + self.lastProcessedTs = fileDate + + def writeLine(self, dst, line): + if not line: + return + line = line.replace(u'\0', u'\\0') + parts = line.split('\t') + if parts[1][0] == u'+': + return + parts = [p[2:-1] + if (p.startswith(u"u'") and p.endswith(u"'")) or (p.startswith(u'u"') and p.endswith(u'"')) + else p for p in parts] + tmp = parts[0] + parts[0] = parts[1] + parts[1] = tmp\ + .replace(u' [VumiRedis,client] WIKI', u'') \ + .replace(u' [HTTP11ClientProtocol,client] WIKI', u'') \ + .replace(u'+0000', u'') + + if len(parts) > 5 and parts[5].startswith(u'content='): + parts[5] = u'content=' + str(len(parts[5]) - 10) + + if len(parts) > 6: + parts[6] = parts[6].replace('\0', '\\0') + + dst.write('\t'.join(parts) + '\n') + + def sort(self): + + args = [self.sortCmd, '-u', '-o', self.tempFilePath, self.combinedFilePath] + cmd = ' '.join([pipes.quote(v) for v in args]) + print('\nSorting: %s' % cmd) + try: + tmp2 = self.tempFilePath + '2' + if os.path.exists(tmp2): + os.remove(tmp2) + + subprocess.check_output(args, stderr=subprocess.STDOUT) + + # Extra safety - keep old file until we rename temp to its name + os.rename(self.combinedFilePath, tmp2) + os.rename(self.tempFilePath, self.combinedFilePath) + os.remove(tmp2) + + except subprocess.CalledProcessError, ex: + raise Exception(u'Error %s running %s\nOutput:\n%s' % (ex.returncode, cmd, ex.output)) + + def error(self, error): + self.lastErrorTs = datetime.now() + self.lastErrorMsg = error + + print(error) + + if not self.smtpHost or not self.smtpFrom or not self.smtpTo: + return + + import smtplib + from email.mime.text import MIMEText + from email.mime.multipart import MIMEMultipart + + msg = MIMEMultipart('alternative') + msg['Subject'] = 'SMS report error' + msg.attach(MIMEText(error, 'plain', 'utf-8')) + + # m = MIMEText(error, 'plain', 'utf-8') + # m['From'] = self.smtpFrom + # m['To'] = self.smtpTo + # m['Subject'] = msg['Subject'] + + s = smtplib.SMTP(self.smtpHost) + s.sendmail(self.smtpFrom, self.smtpTo, msg.as_string().encode('ascii')) + s.quit() + + def generateGraphData(self, skipParsing=False): + # if ((parts[2] == 'airtel_ke_ussd_transport' and parts[3] == 'ussd' and parts[4] == 'airtel') or + # (parts[2] == 'airtel_ke_sms_transport' and parts[3] == 'sms' and parts[4] == '')): + # parts[2:5] = ['airtel'] + # elif ((parts[2] == 'vumi_starcode_transport' and parts[3] == 'ussd' and parts[4] == '') or + # (parts[2] == 'smpp_transport' and parts[3] == 'sms' and parts[4] == '')): + # parts[2:5] = ['vumi'] + # elif parts[2] == 'zambia_cellulant_ussd_transport' and parts[3] == 'ussd' and parts[4] == '': + # parts[2:5] = ['zambia-cellulant'] + # elif parts[2] == 'ambient_go_smpp_transport' and parts[3] == 'sms' and parts[4] == '': + # parts[2:5] = ['ambient_go'] + # elif ((parts[2] == 'truteq_8864_transport' or parts[2] == 'truteq_32323_transport') and parts[3] == 'ussd' and + # parts[4] == ''): + # parts[2:5] = ['truteq'] + # elif parts[2] == 'equity_kenya_ussd_smpp_transport' and parts[3] == 'ussd' and parts[4] == '': + # parts[2:5] = ['equity_ke'] + # else: + # raise BaseException(line) + + stats = generate.Stats(self.combinedFilePath, self.graphDir, self.statsFilePath, self.partnerMap, self.partnerDirMap, self.salt) + + if not skipParsing: + print('\nParsing data') + stats.process() + stats.pickle() + else: + print('Loading parsed data') + stats.unpickle() + + print('Generating data files to to %s' % self.graphDir) + # stats.dumpStats() + stats.createGraphs() + + def run(self): + # noinspection PyBroadException + try: + # if self.enableDownload: + # self.download() + # files = os.listdir(self.dataDir) + # # files = itertools.chain([os.path.join('pc', f) for f in os.listdir(os.path.join(self.dataDir, 'pc'))], files) + # self.combineDataFiles(files) + # self.sort() + + self.generateGraphData(True) + self.lastGoodRunTs = datetime.now() + except: + self.error(traceback.format_exc()) + self.saveState() + + +if __name__ == "__main__": + prc = Processor() + prc.run() \ No newline at end of file diff --git a/generate.py b/generate.py new file mode 100644 index 0000000..4c585a2 --- /dev/null +++ b/generate.py @@ -0,0 +1,573 @@ +import io +import json +from operator import itemgetter +import os +from datetime import * +from collections import defaultdict +from itertools import * + +# Daily totals - +# +# A. Number of sessions initiated +# B. Number of unique users initiating sessions +# C. Number of sessions cancelled at search box +# D. Number of sessions initiated and sent an SMS response of any non-zero number of SMS's +# E. Number of sessions initiated and sent an SMS response, and had no option to reply for more +# F. Number of sessions initiated and sent an SMS response, and were presented with an option to reply for more +# G. Of the number above ('F'), how many of those opted to receive more information +# H. Average number of 'reply for more' requests during a single session (from the set of users in 'D') +# I. Average number of SMS's sent (from the set of users in 'D') +#J. Total number of SMS's sent (from the set of users in 'D') +# +#Hourly totals for 24-hour daily period +# +#A. Number of sessions initiated +#B. Number of sessions initiated and sent an SMS response of any non-zero number of SMS's +import re +import unicodedata + +stateNames = { + u'start': u'0 start', + u'titles': u'1 titles', + u'section-invalid': u'1 wrong title', + u'section': u'2 sections', + u'content-invalid': u'2 wrong section', + u'ussdcontent': u'3 ussd content', + u'smscontent': u'3 sms content', + u'smscontent-2': u'3 sms content (2)', + u'smscontent-3+': u'3 sms content (3+)', + u'more-no-content': u'4 no-more-content', + u'more-no-session': u'5 no-more-session', + u'more-no-session-2': u'5 no-more-session (2)', + u'more-no-session-3+': u'5 no-more-session (3+)', + u'newuser': u'new user', +} + +goodStates = [ + u'start', + u'titles', + u'section', + u'smscontent', + u'smscontent-2', + u'smscontent-3+', +] + +# State machine: +# +# start -> titles -> section -> ussdcontent -> smscontent(+) -> more-no-content +# v v +# section-invalid content-invalid +# +# NOTES: +# Sometimes smscontent appears in the logs before ussdcontent +# +okTransitions = { + (u'', u'start'), + (u'start', u'titles'), + (u'titles', u'section'), + (u'titles', u'section-invalid'), + (u'section', u'ussdcontent'), + (u'section', u'smscontent'), + (u'section', u'content-invalid'), + (u'ussdcontent', u'smscontent'), + (u'smscontent', u'ussdcontent'), + (u'smscontent', u'smscontent'), + (u'smscontent', u'more-no-content'), + (u'smscontent', u'more-no-session'), + (u'more-no-content', u'more-no-session'), + (u'more-no-session', u'more-no-session'), +} + +multiactions = {u'smscontent', u'more-no-content', u'more-no-session'} + +entrySpecials = {'id', 'ts', 'partner'} + + +class Entry(object): + def __init__(self, userId, ts, partner): + self.id = userId + self.ts = ts + self.partner = partner + + def __setitem__(self, key, item): + self.__dict__[key] = item + + def __getitem__(self, key): + return self.__dict__[key] + + def __iter__(self): + return iter(self.__dict__) + + def entryItems(self): + for v in self.__dict__.items(): + if v[0] not in entrySpecials: + yield v + + def __repr__(self): + return repr(self.__dict__) + + def __len__(self): + return len(self.__dict__) + + def __delitem__(self, key): + del self.__dict__[key] + + def keys(self): + return self.__dict__.keys() + + def values(self): + return self.__dict__.values() + + def __cmp__(self, dict): + return cmp(self.__dict__, dict) + + def __contains__(self, item): + return item in self.__dict__ + + def add(self, key, value): + self.__dict__[key] = value + + def __call__(self): + return self.__dict__ + + def __unicode__(self): + return unicode(repr(self.__dict__)) + + def items(self): + return self.__dict__.items() + + +# class SumEntryEncoder(json.JSONEncoder): +# def default(self, o): +# return super(SumEntryEncoder, self).default(o) + + +class SumEntry(object): + def __init__(self, value=-1): + """ + :type value: dict|string|False + """ + if isinstance(value, dict): + self.__dict__ = value + else: + self.count = 1 + if value >= 0: + self.sum = value + self.min = value + self.max = value + + def addValue(self, value): + self.count += 1 + if value >= 0: + self.sum += value + if self.min > value: + self.min = value + if self.max < value: + self.max = value + + def countOnly(self): + return 'sum' not in self.__dict__ + + +class Stats(object): + def __init__(self, sourceFile, graphDir, stateFile, partnerMap=None, partnerDirMap=None, salt=''): + self.sourceFile = sourceFile + self.graphDir = graphDir + self.stateFile = stateFile + self.partnerMap = partnerMap if partnerMap is not None else {} + self.partnerDirMap = partnerDirMap if partnerDirMap is not None else {} + self.salt = salt + self.stats = {} + + def _addStats(self, partner, stage, key2, value=-1): + p = partner if partner is not None else 'allpartners' + try: + partnerStat = self.stats[p] + except KeyError: + partnerStat = dict() + self.stats[p] = partnerStat + try: + stat = partnerStat[stage] + except KeyError: + stat = dict() + partnerStat[stage] = stat + if key2 not in stat: + stat[key2] = SumEntry(value) + else: + stat[key2].addValue(value) + # two-stage addition - one for partner, one total + if partner is not None: + self._addStats(None, stage, key2, value) + + def _cleanupStats(self): + del self.unique + del self.newUserUnique + for partner, partnerData in self.stats.items(): + if partner == 'allpartners': + continue + for k in list([i for i in partnerData if u'_usrmonth_' in i]): + kk = k.split(u'_', 2) + # removing top 1% of the heavy users + vals = list(sorted(partnerData[k].values(), key=lambda l: l.count)) + vals = vals[0:len(vals) - int(len(vals) * 0.01)] + for v in vals: + self._addStats(partner, kk[0] + u'_' + kk[1], kk[1] + u'_' + kk[2], v.count) + del partnerData[k] + if k in self.stats['allpartners']: + del self.stats['allpartners'][k] + + def _addStatsUniqueUser(self, partner, key2, userId): + if userId not in self.newUserUnique: + self.newUserUnique.add(userId) + self._addStats(partner, u'newuser', key2) + + + def _addStatsUnique(self, partner, stage, key2, userId): + u = self.unique[stage] + if key2 not in u: + u[key2] = {userId} + elif userId not in u[key2]: + u[key2].add(userId) + else: + return + self._addStats(partner, stage, key2) + + # if not isError: + # key2 = u'hourly_' + ts.strftime(u'%Y-%m-%d %H') + u':00' + # self._addStats(partner, key, key2, value) + + def addStats(self, partner, stage, ts, userId, value=-1, isError=False): + # self._addStats(partner, key, u'_totals', value) + # self._addStatsUnique(partner, stage + u'_unique', u'_totals', id) + + key2 = u'daily_' + ts.strftime(u'%Y-%m-%d') + self._addStats(partner, stage, key2, value) + self._addStatsUnique(partner, stage + u'_unique', key2, userId) + + self._addStats(partner, stage + u'_usrmonth_' + ts.strftime(u'%m-%Y'), userId) + + def countStats(self, entry): + ts = entry.ts + userId = entry.id + self.addStats(entry.partner, u'start', ts, userId) + self._addStatsUniqueUser(entry.partner, u'daily_' + ts.strftime(u'%Y-%m-%d'), userId) + for k, v in entry.entryItems(): + if type(v) is list: + maxN = 2 + for i in range(min(len(v), maxN + 1)): + self.addStats(entry.partner, + k + (u'' if i == 0 else '-' + str(i + 1) + ('+' if i == maxN else '')), + ts, userId, v[i]) + else: + self.addStats(entry.partner, k, ts, userId, v) + + def process(self): + self.unique = defaultdict(dict) + self.newUserUnique = set() + + cId = 0 + cTime = 1 + cPartner = 4 + cAction = 5 + cContent = 6 + + fErr = io.open(os.path.join(self.graphDir, 'errors.txt'), 'w', encoding='utf8') + + isError = False + lastAction = u'' + lastLine = u'' + lastParts = False + entry = None + for line in io.open(self.sourceFile, encoding='utf8'): + if line == u'': + break + if line == lastLine: + continue + lastLine = line + + parts = [v.strip() for v in line.split(u'\t')] + if len(parts) > cContent and parts[cContent].startswith(u'content='): + parts[cContent] = u'content=' + str(len(parts[cContent]) - 10) + u'chars' + + action = parts[cAction] + timestamp = datetime.strptime(parts[cTime], u'%Y-%m-%d %H:%M:%S') + isNew = entry is None or entry.id != parts[cId] or action == u'start' + + if isNew: + if entry is not None: + self.countStats(entry) + partnerKey = u'|'.join(parts[cPartner - 2:cPartner + 1]) + if partnerKey in self.partnerMap: + partner = self.partnerMap[partnerKey] + else: + if parts[cPartner] == u'': + partner = u'-'.join(parts[cPartner - 2:cPartner]) + else: + partner = parts[cPartner] + self.partnerMap[partnerKey] = partner + entry = Entry(parts[cId], timestamp, partner) + lastParts = parts + lastAction = u'' + isError = False + + transition = (lastAction, action) + secondsFromStart = int((timestamp - entry.ts).total_seconds()) + isMultiAction = action in multiactions + isNewAction = action not in entry + + if isError or transition not in okTransitions or (not isNewAction and not isMultiAction): + if lastParts: + fErr.write(u'\n' + lastParts[0] + u'\n' + (u'\t'.join(lastParts[1:])) + u'\n') + parts[cPartner] = str(secondsFromStart) + del parts[cId] + fErr.write(u'\t'.join(parts) + u'\n') + self.addStats(entry.partner, u'err--bad-transitions', timestamp, entry.id, secondsFromStart, + isError=True) + key = (u'err-cont-' if isError else u'err-new-') + transition[0] + u'-' + transition[1] + self.addStats(entry.partner, key, timestamp, entry.id, secondsFromStart, isError=True) + lastParts = False + isError = True + elif not isNew: + # noinspection PyTypeChecker + lastParts = lastParts + [str(secondsFromStart)] + parts[cAction:] + if isNewAction: + entry[action] = [secondsFromStart] if isMultiAction else secondsFromStart + else: + entry[action].append(secondsFromStart) + + lastAction = action + + if lastParts: + self.countStats(entry) + + self._cleanupStats() + + def pickle(self): + with open(self.stateFile, 'wb') as f: + self.recursiveConvert(self.stats) + json.dump(self.stats, f, indent=True, sort_keys=True) + self.recursiveConvert(self.stats) + + def unpickle(self): + with io.open(self.stateFile, 'rb') as f: + self.stats = json.load(f) + self.recursiveConvert(self.stats) + + def recursiveConvert(self, d): + for (k, v) in d.items(): + if isinstance(v, dict): + if 'count' in v: + d[k] = SumEntry(v) + else: + self.recursiveConvert(v) + elif isinstance(v, SumEntry): + d[k] = v.__dict__ + + def dumpStats(self): + + with io.open(os.path.join(self.graphDir, 'results-err.txt'), 'w', encoding='utf8') as err, \ + io.open(os.path.join(self.graphDir, 'results-err-totals.txt'), 'w', encoding='utf8') as err_t, \ + io.open(os.path.join(self.graphDir, 'results-totals.txt'), 'w', encoding='utf8') as res_t, \ + io.open(os.path.join(self.graphDir, 'results-daily.txt'), 'w', encoding='utf8') as res_d, \ + io.open(os.path.join(self.graphDir, 'results-hourly.txt'), 'w', encoding='utf8') as res_hr: + + hdr = u'\t'.join( + [u'distinct', u'cont', u'error', u'frequency', u'timestamp', u'count', u'avg', u'min', u'max']) + u'\n' + err.write(hdr) + err_t.write(hdr) + hdr = u'\t'.join( + [u'distinct', u'state', u'frequency', u'timestamp', u'count', u'avg', u'min', u'max']) + u'\n' + res_t.write(hdr) + res_d.write(hdr) + res_hr.write(hdr) + + allStats = self.stats['allpartners'] + for k in sorted(allStats): + v = allStats[k] + key = k + isError = key.startswith(u'err-') + if key.endswith(u'_unique'): + key = key[0:-len(u'_unique')] + line = u'unique' + elif key.endswith(u'_usrmonth'): + key = key[0:-len(u'_usrmonth')] + line = u'usrmonth' + elif key == u'newuser': + line = key + else: + line = u'total' + line += u'\t' + if isError: + k1, k2, k3 = key.split(u'-', 2) + line += k2 + u'\t' + k3 + elif key in stateNames: + line += stateNames[key] + else: + line += key + line += u'\t' + + for kk in sorted(v): + vv = v[kk] + if kk == u'_totals': + l = line + u'total\t\t' + else: + l = line + kk.replace(u'_', u'\t') + u'\t' + if vv.countOnly(): + l += unicode(vv.count) + else: + l += u'%d\t%g\t%d\t%d' % (vv.count, vv.sum / vv.count, vv.min, vv.max) + + if kk.startswith(u'hourly_'): + f = err if isError else res_hr + elif kk == u'_totals': + f = err_t if isError else res_t + else: + f = err if isError else res_d + f.write(l + u'\n') + + def makePartnerDir(self, partner): + partnerKey = self.partnerDirMap[partner] + + dashboard = os.path.join(self.graphDir, 'dashboards') + if not os.path.exists(dashboard): + os.mkdir(dashboard) + datafiles = os.path.join(self.graphDir, 'datafiles') + if not os.path.exists(datafiles): + os.mkdir(datafiles) + dataDir = os.path.join(datafiles, partnerKey) + if not os.path.exists(dataDir): + os.mkdir(dataDir) + + # Create an empty file with the partner's name to easily see who is who + # From http://stackoverflow.com/questions/295135/turn-a-string-into-a-valid-filename-in-python + sanitizedPartner = unicodedata.normalize('NFKD', unicode(partner)).encode('ascii', 'ignore') + sanitizedPartner = unicode(re.sub('[^\w\s-]', '', sanitizedPartner).strip().lower()) + sanitizedPartner = re.sub('[-\s]+', '-', sanitizedPartner) + infoFile = os.path.join(dataDir, sanitizedPartner) + if not os.path.exists(infoFile): open(infoFile, 'a').close() + + # Create dashboard + dashboardFile = os.path.join(dashboard, partnerKey + '.json') + if not os.path.exists(dashboardFile): + data = { + "id": partnerKey, + "headline": partner, + # "subhead": "subtitle", + "tabs": [ + { + "name": "Graphs", + "graph_ids": [ + "http://gp.wmflabs.org/data/datafiles/gp_zero_local/" + partnerKey + "/states-count-per-day.tsv" + ] + } + ] + } + with open(dashboardFile, 'wb') as f: + json.dump(data, f, indent=True, sort_keys=True) + + return dataDir + + def createGraphs(self): + + states = sorted([stateNames[v] for v in goodStates]) + + for partner, data in self.stats.items(): + if partner in self.partnerDirMap: + partnerKey = self.partnerDirMap[partner] + else: + import hashlib + partnerKey = hashlib.sha224(partner + self.salt).hexdigest() + self.partnerDirMap[partner] = partnerKey + + partnerDir = self.makePartnerDir(partner) + self.createStatesGraph(partnerDir, data, states) + + def createStatesGraph(self, partnerDir, data, states): + d = sorted(self.filterData(data, yieldTuple=True), + key=lambda v: + v[0] + v[1]) + # v[u'date'] + v[u'state']) + # groups = groupby(d, key=itemgetter('')) + # [{'type':k, 'items':[x[0] for x in v]} for k, v in groups] + # from itertools import groupby, islice + # from operator import itemgetter + # from collections import defaultdict + + # probably splitting this up in multiple lines would be more readable + pivot = ( + (ts, + defaultdict(lambda: '', (islice(d, 1, None) for d in dd)) + ) + for ts, dd in groupby(d, itemgetter(0))) + + resultFile = os.path.join(partnerDir, 'states-count-per-day.tsv') + with io.open(resultFile, 'w', encoding='utf8') as f: + f.write(u'date\t' + u'\t'.join(states) + u'\n') + for ts, counts in pivot: + f.write(ts + u'\t' + u'\t'.join(str(counts[s]) for s in states)+u'\n') + + + def filterData(self, data, isError=False, isNewUser=False, isUnique=False, knownState=True, includeStats=False, yieldTuple=False): + for key, dates in data.items(): + isErr, state, type = self.splitKey(key) + if isErr != isError: + continue + if (type == u'unique') != isUnique: + continue + if (type == u'newuser') != isNewUser: + continue + if (state in stateNames) != knownState: + continue + state = stateNames[state] + for dateStr, e in dates.items(): + if not dateStr.startswith(u'daily_'): + continue + ts = dateStr[len(u'daily_'):] + if yieldTuple: + yield (ts, state, e.count) if not includeStats else (ts, state, e.count, e.min, e.avg, e.max) + else: + res = { + u'date': ts, + u'state': state, + u'count': e.count, + } + if includeStats: + res[u'avg'] = e.avg + res[u'min'] = e.min + res[u'max'] = e.max + yield res + + def splitKey(self, key): + isError = key.startswith(u'err-') + if isError: + key = key[len(u'err-'):] + + if key.endswith(u'_unique'): + key = key[0:-len(u'_unique')] + tp = u'unique' + elif key.endswith(u'_usrmonth'): + key = key[0:-len(u'_usrmonth')] + tp = u'usrmonth' + elif key == u'newuser': + tp = key + else: + tp = u'total' + + return isError, key, tp + + +if __name__ == '__main__': + stats = Stats('state/tmp.tsv', 'graphs', 'state/tmp.json') + # stats = Stats('state/combined.tsv', 'graphs', 'state/combined.json') + + # stats.process() + # + # stats.pickle() + + stats.unpickle() + + stats.createGraphs() + + stats.dumpStats() + + pass \ No newline at end of file -- To view, visit https://gerrit.wikimedia.org/r/144682 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I5e45f0ee8a8427307b4250e34cfcd030b7f355d6 Gerrit-PatchSet: 1 Gerrit-Project: analytics/zero-sms Gerrit-Branch: master Gerrit-Owner: Yurik <yu...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits