Author: humbedooh Date: Wed Apr 5 17:44:33 2023 New Revision: 1908988 URL: http://svn.apache.org/viewvc?rev=1908988&view=rev Log: fix up mailglomper to work with lists.a.o. initial work, more to be done
Modified: comdev/reporter.apache.org/trunk/scripts/mailglomper2.py Modified: comdev/reporter.apache.org/trunk/scripts/mailglomper2.py URL: http://svn.apache.org/viewvc/comdev/reporter.apache.org/trunk/scripts/mailglomper2.py?rev=1908988&r1=1908987&r2=1908988&view=diff ============================================================================== --- comdev/reporter.apache.org/trunk/scripts/mailglomper2.py (original) +++ comdev/reporter.apache.org/trunk/scripts/mailglomper2.py Wed Apr 5 17:44:33 2023 @@ -1,27 +1,27 @@ -# OBSOLETE OBSOLETE OBSOLETE OBSOLETE OBSOLETE OBSOLETE OBSOLETE OBSOLETE - """ Reads public mailing list data from - http://mail-archives.us.apache.org/mod_mbox/ + https://lists.apache.org/ - listing of mailboxes and from each: - http://mail-archives.us.apache.org/mod_mbox/<list>/yyyymm.mbox + https://lists.apache.org/api/stats.json?list={listpart}&domain={project}.apache.org&d=yyyy-mm - messages per week and per last two rolling quarters (92 days) - + Updates: data/maildata_extended.json - output data for display data/cache/maildata_weekly.json - cache of weekly email stats """ import sys + if sys.hexversion < 0x03000000: raise ImportError("This script requires Python 3") -import re, json, os, time, email.utils, signal, calendar +import json +import time +import signal from datetime import datetime -import urlutils -import urllib.error import traceback -import errtee -import committee_info +import asyncio +import aiohttp +import requests SECS_PER_DAY = 86400 SECS_PER_WEEK = 604800 @@ -32,33 +32,37 @@ __RAO_HOME = "../" __MAILDATA_EXTENDED = __RAO_HOME + "data/maildata_extended.json" -__MAILDATA_CACHE = __RAO_HOME + "data/cache/maildata_weekly.json" +__MAILDATA_CACHE = __RAO_HOME + "data/cache/maildata_weekly.json" -def tsprint(s): # print with timestamp + +def tsprint(s): # print with timestamp msg = "%s %s" % (time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), s) if isinstance(s, Exception): print(msg, file=sys.stderr) - type, value, tb = sys.exc_info() - traceback.print_exception(type, value, tb) + etype, value, tb = sys.exc_info() + traceback.print_exception(etype, value, tb) else: print(msg) + interrupted = False + def handle(signum, frame): """ Handles signals, e.g. ^C (SIGINT) and kill (SIGTERM) Sets the interrupted flag on first signal (graceful shutdown) Raises KeyboardInterrupt on second signal (abrupt shutdown) """ - global interrupted # otherwise handler does not set the same variable + global interrupted # otherwise handler does not set the same variable if signum == 2: - print("") # ensure newline after ^C + print("") # ensure newline after ^C tsprint("Interrupted with %d" % signum) - if interrupted: # second interrupt - raise KeyboardInterrupt # not generated by Python because we catch its signal + if interrupted: # second interrupt + raise KeyboardInterrupt # not generated by Python because we catch its signal interrupted = True + tsprint("Start") startTime = time.time() @@ -66,7 +70,7 @@ startTime = time.time() mls = {} try: - with open(__MAILDATA_CACHE,'r') as f: + with open(__MAILDATA_CACHE, "r") as f: mldcache = json.loads(f.read()) tsprint("Read maildata cache successfully") except: @@ -78,11 +82,11 @@ currentMonth = DTNOW.month currentYear = DTNOW.year NOW = time.time() -after = NOW - (SECS_PER_DAY*92) -wayafter = NOW - (SECS_PER_DAY*92*2) +after = NOW - (SECS_PER_DAY * 92) +wayafter = NOW - (SECS_PER_DAY * 92 * 2) months = [] -for i in range(0,7): +for i in range(0, 7): date = "%04u%02u" % (currentYear, currentMonth) currentMonth -= 1 if currentMonth == 0: @@ -91,7 +95,7 @@ for i in range(0,7): months.append(date) # Remove any stale entries -obsolete = [] # list of keys to remove (cannot do it while iterating) +obsolete = [] # list of keys to remove (cannot do it while iterating) for mld in mldcache.keys(): yyyymm = mld[-6:] if yyyymm not in months: @@ -101,104 +105,36 @@ for key in obsolete: tsprint("Dropping obsolete cache entry: " + key) del mldcache[key] -fc = urlutils.UrlCache(interval=30, silent=True) # generates too much output if not silent # Get the index of mailing lists # Not strictly necessary to cache this, but it makes testing easier -data = fc.get("http://mail-archives.us.apache.org/mod_mbox/", "mod_mbox.html", encoding='utf-8').read() +data = requests.get("https://lists.apache.org/api/preferences.json").json() tsprint("Fetched %u bytes of main data" % len(data)) -""" -N.B. The project name empire-db is truncated to empire in the main list - -Rather than fixing this here, it is done in the scripts that read the output file -This is because those scripts assume that the first hyphen separates the -project name from the mailing list name. -Since list names may contain hyphens (e.g. lucene-net-dev) that is a necessary assumption. - -Potentially the generated file could use a separator that is not allowed in project names, -but this would require converting the input file and potentially allowing both separators in -the files that process the output for a short while. -""" -def weekly_stats(ml, date): +async def weekly_stats(client, listpart, domainpart, date): """ - Read the weekly stats from a mbox file, caching the counts. + Read the weekly stats from a mbox file, caching the counts. """ - fname = "%s-%s" % (ml, date) - stampold = None - lengthold = None -# etagold = None - if fname in mldcache: -# tsprint("Have json cache for: " + fname) - entry = mldcache[fname] - ct = entry['ct'] - stampold = entry['stamp'] - try: - lengthold = entry['length'] - except: - lengthold = 'N/A' -# try: -# etagold = entry['etag'] -# except: -# etagold = 'N/A' - weekly = {} - # JSON keys are always stored as strings; fix these up for main code - for w in entry['weekly']: - weekly[int(w)] = entry['weekly'][w] - else: -# tsprint("Not cached: " + fname) - ct = None - pass - - url = "http://mail-archives.us.apache.org/mod_mbox/%s/%s.mbox" % (ml, date) - stamp, mldata = urlutils.getIfNewer(url, stampold, silent=True) # read binary URL + project = domainpart.replace(".apache.org", "") + if not project: + return {} + ldate = date[:4] + "-" + date[4:] # Convert to lists.a.o format, yyyy-mm + url = f"https://lists.apache.org/api/stats.json?list={listpart}&domain={domainpart}&d={ldate}" + mldata = await (await client.get(url)).json() + weekly = {} + count = 0 + if mldata: # we have a new/updated file to process + for email in mldata.get("emails", []): + timestamp = email["epoch"] + count += 1 + try: + rounded = timestamp - (timestamp % SECS_PER_WEEK) + SECS_PER_WEEK + weekly[rounded] = weekly.get(rounded, 0) + 1 + except Exception as err: + tsprint(err) + return count, weekly - if mldata: # we have a new/updated file to process - try: - length = mldata.headers['Content-Length'] - except: - length = 'unknown' -# try: -# etag = mldata.headers['Etag'] -# except: -# etag = 'unknown' - tsprint("Processing %s (%s > %s) Length: %s (%s)" % (fname, stamp, stampold, length, lengthold)) - # INFRA-11661 - spurious date changes so we check the length - if ct != None and length == lengthold: - tsprint("Unchanged length, using cached data (%d)" % ct) - mldata.close() - mldcache[fname]['stamp'] = stamp # update the stamp so we don't keep trying to download the same file - return ct, weekly - - ct = 0 - weekly = {} - l = 0 - for line in mldata: - l += 1 - c = re.match(b"^From \S+ (.+)", line) # match as binary - if c: - ct += 1 - try: - d = email.utils.parsedate(c.group(1).decode('latin1')) # convert match to string - timestamp = int(calendar.timegm(d)) - rounded = timestamp - (timestamp % SECS_PER_WEEK) + SECS_PER_WEEK - weekly[rounded] = (weekly[rounded] if rounded in weekly else 0) + 1 - except Exception as err: - tsprint(err) - # create the cache entry - mldcache[fname] = { - 'ct': ct, - 'weekly': weekly, - 'stamp': stamp, - 'length': length -# 'etag': etag - } - else: -# tsprint("Returning cache for: " + fname) - pass - # return the new or cached values - return ct, weekly def add_weeks(total, add): for e in add: @@ -207,90 +143,79 @@ def add_weeks(total, add): else: total[e] = add[e] + tsprint("Started") -signal.signal(signal.SIGINT, handle) # This stops Python from raising KeyboardInterrupt +signal.signal(signal.SIGINT, handle) # This stops Python from raising KeyboardInterrupt signal.signal(signal.SIGTERM, handle) -pmcmails = committee_info.PMCmails() -if 'empire-db' in pmcmails: # append entry - pmcmails.append('empire') - -# get all the mailing lists so we can drop only those that are no longer present even if the process is stopped early -mlists = [] -for mlist in re.finditer(r"<a href='([-a-z0-9]+)/'", data): - ml = mlist.group(1) - pfx = ml.split('-')[0] - # skip all but current projects - if not pfx in pmcmails: -# tsprint("Skipping " + ml) # temporary for checking - continue - mlists.append(ml) - -lastCheckpoint = time.time() # when output files were last saved -for ml in mlists: - tsprint("Processing: " + ml) - start = time.time() - mls[ml] = {} - mls[ml]['quarterly'] = [0, 0]; - mls[ml]['weekly'] = {} - - mlct = 0 - try: - for date in months: - key = ml + "-" + date - try: - begin = time.time() - ct, weeks = weekly_stats(ml, date) - add_weeks(mls[ml]['weekly'], weeks) + +async def gather_stats(): + # get all the mailing lists + mlists = [] + for domainpart, lists in sorted(data["lists"].items()): + if ".apache.org" in domainpart: # Only project domains + mlists.extend([f"{listpart}@{domainpart}" for listpart in lists]) + + lastCheckpoint = time.time() # when output files were last saved + for mailinglist in mlists: + listpart, domainpart = mailinglist.split("@", maxsplit=1) + project = domainpart.replace(".apache.org", "") + ml = f"{project}-{listpart}" + start = time.time() + + mls[ml] = {} + mls[ml]["quarterly"] = [0, 0] + mls[ml]["weekly"] = {} + + mlct = 0 + + try: + tsprint(f"Processing {listpart}@{domainpart}") + async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(30)) as client: + results = await asyncio.gather(*[weekly_stats(client, listpart, domainpart, date) for date in months]) + for result in results: + ct, weeks = result + add_weeks(mls[ml]["weekly"], weeks) + mlct += ct for week in weeks: if week >= after: - mls[ml]['quarterly'][0] += weeks[week] + mls[ml]["quarterly"][0] += weeks[week] elif week >= wayafter: - mls[ml]['quarterly'][1] += weeks[week] -# tsprint("Debug: %s has %u mails (%u)" % (key, ct, time.time() - begin)) # total for month - mlct += ct - except urllib.error.HTTPError as err: - if err.code == 404: # Can happen for new lists - tsprint("Warn: could not open %s - %s" % (key, err.reason)) - else: - tsprint(err) - except KeyboardInterrupt: # intercept the handlers signal so we can report it - tsprint("Interrupted processing of %s" % key) - raise # propagate, so does not get confused with graceful stop - except Exception as err: - tsprint(err) - if interrupted: # break at end of file - tsprint("Stopping after processing %s" % key) - break - except KeyboardInterrupt: # catch the handlers signal - tsprint("Interrupted processing of %s" % ml) - - if interrupted: - break - - tsprint("Info: %s has %u mails (%u secs)" % (ml, mlct, time.time() - start)) # total for mail group - now = time.time() - if now - lastCheckpoint > 600: # checkpoint every 10 minutes - lastCheckpoint = now - tsprint("Creating checkpoint of JSON files") - with open(__MAILDATA_EXTENDED,'w+') as f: - json.dump(mls, f, indent=1) # sort_keys is expensive - with open(__MAILDATA_CACHE,"w") as f: - json.dump(mldcache, f, indent=1) # sort_keys is expensive - -tsprint("Completed scanning, writing JSON files (%s)" % str(interrupted)) -with open(__MAILDATA_EXTENDED,'w+') as f: - json.dump(mls, f, indent=1, sort_keys=True) - -# all the possible lists and dates -found = [ ml + "-" + date for ml in mlists for date in months] -obsolete = mldcache.keys() - found # drop any left over -for key in obsolete: - tsprint("Dropping unused cache entry: " + key) - del mldcache[key] -with open(__MAILDATA_CACHE,"w") as f: - json.dump(mldcache, f, indent=1, sort_keys=True) -tsprint("Dumped JSON files") -elapsed = time.time()-startTime -tsprint("Completed in %d seconds" % elapsed) + mls[ml]["quarterly"][1] += weeks[week] + + except KeyboardInterrupt: # catch the handlers signal + tsprint("Interrupted processing of %s" % ml) + + if interrupted: + break + + tsprint("Info: %s has %u mails (%u secs)" % (ml, mlct, time.time() - start)) # total for mail group + now = time.time() + if now - lastCheckpoint > 120: # checkpoint every 2 minutes + lastCheckpoint = now + tsprint("Creating checkpoint of JSON files") + with open(__MAILDATA_EXTENDED, "w+") as f: + json.dump(mls, f, indent=1) # sort_keys is expensive + with open(__MAILDATA_CACHE, "w") as f: + json.dump(mldcache, f, indent=1) # sort_keys is expensive + + tsprint("Completed scanning, writing JSON files (%s)" % str(interrupted)) + with open(__MAILDATA_EXTENDED, "w+") as f: + json.dump(mls, f, indent=1, sort_keys=True) + + # all the possible lists and dates + found = [ml + "-" + date for ml in mlists for date in months] + obsolete = mldcache.keys() - found # drop any left over + for key in obsolete: + tsprint("Dropping unused cache entry: " + key) + del mldcache[key] + with open(__MAILDATA_CACHE, "w") as f: + json.dump(mldcache, f, indent=1, sort_keys=True) + tsprint("Dumped JSON files") + elapsed = time.time() - startTime + tsprint("Completed in %d seconds" % elapsed) + + +if __name__ == "__main__": + asyncio.run(gather_stats())