commit 14840ed2db075bbc1d0991b974becc3826a50969 Author: Karsten Loesing <karsten.loes...@gmx.net> Date: Fri Aug 14 13:46:12 2015 +0200
Create new clients module from metrics-task #8462. --- detector/.gitignore | 2 - detector/country_info.py | 252 --------- detector/detector.py | 437 --------------- detector/detector.sh | 6 - modules/clients/.gitignore | 2 + modules/clients/build.xml | 44 ++ modules/clients/country_info.py | 252 +++++++++ modules/clients/detector.py | 437 +++++++++++++++ modules/clients/init-userstats.sql | 575 ++++++++++++++++++++ modules/clients/merge-clients.R | 19 + .../src/org/torproject/metrics/clients/Main.java | 465 ++++++++++++++++ modules/clients/test-userstats.sql | 478 ++++++++++++++++ modules/clients/userstats-detector.R | 18 + shared/bin/80-run-clients-stats.sh | 30 + shared/bin/99-copy-stats-files.sh | 1 + 15 files changed, 2321 insertions(+), 697 deletions(-) diff --git a/detector/.gitignore b/detector/.gitignore deleted file mode 100644 index 29a7166..0000000 --- a/detector/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -*.csv - diff --git a/detector/country_info.py b/detector/country_info.py deleted file mode 100644 index e23728e..0000000 --- a/detector/country_info.py +++ /dev/null @@ -1,252 +0,0 @@ -# -*- coding: utf-8 -*- - -countries = { - "ad" : "Andorra", - "ae" : "the United Arab Emirates", - "af" : "Afghanistan", - "ag" : "Antigua and Barbuda", - "ai" : "Anguilla", - "al" : "Albania", - "am" : "Armenia", - "an" : "the Netherlands Antilles", - "ao" : "Angola", - "aq" : "Antarctica", - "ar" : "Argentina", - "as" : "American Samoa", - "at" : "Austria", - "au" : "Australia", - "aw" : "Aruba", - "ax" : "the Aland Islands", - "az" : "Azerbaijan", - "ba" : "Bosnia and Herzegovina", - "bb" : "Barbados", - "bd" : "Bangladesh", - "be" : "Belgium", - "bf" : "Burkina Faso", - "bg" : "Bulgaria", - "bh" : "Bahrain", - "bi" : "Burundi", - "bj" : "Benin", - "bl" : "Saint Bartelemey", - "bm" : "Bermuda", - "bn" : "Brunei", - "bo" : "Bolivia", - "br" : "Brazil", - "bs" : "the Bahamas", - "bt" : "Bhutan", - "bv" : "the Bouvet Island", - "bw" : "Botswana", - "by" : "Belarus", - "bz" : "Belize", - "ca" : "Canada", - "cc" : "the Cocos (Keeling) Islands", - "cd" : "the Democratic Republic of the Congo", - "cf" : "Central African Republic", - "cg" : "Congo", - "ch" : "Switzerland", - "ci" : u"Côte d'Ivoire", - "ck" : "the Cook Islands", - "cl" : "Chile", - "cm" : "Cameroon", - "cn" : "China", - "co" : "Colombia", - "cr" : "Costa Rica", - "cu" : "Cuba", - "cv" : "Cape Verde", - "cx" : "the Christmas Island", - "cy" : "Cyprus", - "cz" : "the Czech Republic", - "de" : "Germany", - "dj" : "Djibouti", - "dk" : "Denmark", - "dm" : "Dominica", - "do" : "the Dominican Republic", - "dz" : "Algeria", - "ec" : "Ecuador", - "ee" : "Estonia", - "eg" : "Egypt", - "eh" : "the Western Sahara", - "er" : "Eritrea", - "es" : "Spain", - "et" : "Ethiopia", - "fi" : "Finland", - "fj" : "Fiji", - "fk" : "the Falkland Islands (Malvinas)", - "fm" : "the Federated States of Micronesia", - "fo" : "the Faroe Islands", - "fr" : "France", - "fx" : "Metropolitan France", - "ga" : "Gabon", - "gb" : "the United Kingdom", - "gd" : "Grenada", - "ge" : "Georgia", - "gf" : "French Guiana", - "gg" : "Guernsey", - "gh" : "Ghana", - "gi" : "Gibraltar", - "gl" : "Greenland", - "gm" : "Gambia", - "gn" : "Guinea", - "gp" : "Guadeloupe", - "gq" : "Equatorial Guinea", - "gr" : "Greece", - "gs" : "South Georgia and the South Sandwich Islands", - "gt" : "Guatemala", - "gu" : "Guam", - "gw" : "Guinea-Bissau", - "gy" : "Guyana", - "hk" : "Hong Kong", - "hm" : "Heard Island and McDonald Islands", - "hn" : "Honduras", - "hr" : "Croatia", - "ht" : "Haiti", - "hu" : "Hungary", - "id" : "Indonesia", - "ie" : "Ireland", - "il" : "Israel", - "im" : "the Isle of Man", - "in" : "India", - "io" : "the British Indian Ocean Territory", - "iq" : "Iraq", - "ir" : "Iran", - "is" : "Iceland", - "it" : "Italy", - "je" : "Jersey", - "jm" : "Jamaica", - "jo" : "Jordan", - "jp" : "Japan", - "ke" : "Kenya", - "kg" : "Kyrgyzstan", - "kh" : "Cambodia", - "ki" : "Kiribati", - "km" : "Comoros", - "kn" : "Saint Kitts and Nevis", - "kp" : "North Korea", - "kr" : "the Republic of Korea", - "kw" : "Kuwait", - "ky" : "the Cayman Islands", - "kz" : "Kazakhstan", - "la" : "Laos", - "lb" : "Lebanon", - "lc" : "Saint Lucia", - "li" : "Liechtenstein", - "lk" : "Sri Lanka", - "lr" : "Liberia", - "ls" : "Lesotho", - "lt" : "Lithuania", - "lu" : "Luxembourg", - "lv" : "Latvia", - "ly" : "Libya", - "ma" : "Morocco", - "mc" : "Monaco", - "md" : "the Republic of Moldova", - "me" : "Montenegro", - "mf" : "Saint Martin", - "mg" : "Madagascar", - "mh" : "the Marshall Islands", - "mk" : "Macedonia", - "ml" : "Mali", - "mm" : "Burma", - "mn" : "Mongolia", - "mo" : "Macau", - "mp" : "the Northern Mariana Islands", - "mq" : "Martinique", - "mr" : "Mauritania", - "ms" : "Montserrat", - "mt" : "Malta", - "mu" : "Mauritius", - "mv" : "the Maldives", - "mw" : "Malawi", - "mx" : "Mexico", - "my" : "Malaysia", - "mz" : "Mozambique", - "na" : "Namibia", - "nc" : "New Caledonia", - "ne" : "Niger", - "nf" : "Norfolk Island", - "ng" : "Nigeria", - "ni" : "Nicaragua", - "nl" : "the Netherlands", - "no" : "Norway", - "np" : "Nepal", - "nr" : "Nauru", - "nu" : "Niue", - "nz" : "New Zealand", - "om" : "Oman", - "pa" : "Panama", - "pe" : "Peru", - "pf" : "French Polynesia", - "pg" : "Papua New Guinea", - "ph" : "the Philippines", - "pk" : "Pakistan", - "pl" : "Poland", - "pm" : "Saint Pierre and Miquelon", - "pn" : "the Pitcairn Islands", - "pr" : "Puerto Rico", - "ps" : "the Palestinian Territory", - "pt" : "Portugal", - "pw" : "Palau", - "py" : "Paraguay", - "qa" : "Qatar", - "re" : "Reunion", - "ro" : "Romania", - "rs" : "Serbia", - "ru" : "Russia", - "rw" : "Rwanda", - "sa" : "Saudi Arabia", - "sb" : "the Solomon Islands", - "sc" : "the Seychelles", - "sd" : "Sudan", - "se" : "Sweden", - "sg" : "Singapore", - "sh" : "Saint Helena", - "si" : "Slovenia", - "sj" : "Svalbard and Jan Mayen", - "sk" : "Slovakia", - "sl" : "Sierra Leone", - "sm" : "San Marino", - "sn" : "Senegal", - "so" : "Somalia", - "sr" : "Suriname", - "ss" : "South Sudan", - "st" : u"São Tomé and PrÃncipe", - "sv" : "El Salvador", - "sy" : "the Syrian Arab Republic", - "sz" : "Swaziland", - "tc" : "Turks and Caicos Islands", - "td" : "Chad", - "tf" : "the French Southern Territories", - "tg" : "Togo", - "th" : "Thailand", - "tj" : "Tajikistan", - "tk" : "Tokelau", - "tl" : "East Timor", - "tm" : "Turkmenistan", - "tn" : "Tunisia", - "to" : "Tonga", - "tr" : "Turkey", - "tt" : "Trinidad and Tobago", - "tv" : "Tuvalu", - "tw" : "Taiwan", - "tz" : "the United Republic of Tanzania", - "ua" : "Ukraine", - "ug" : "Uganda", - "um" : "the United States Minor Outlying Islands", - "us" : "the United States", - "uy" : "Uruguay", - "uz" : "Uzbekistan", - "va" : "Vatican City", - "vc" : "Saint Vincent and the Grenadines", - "ve" : "Venezuela", - "vg" : "the British Virgin Islands", - "vi" : "the United States Virgin Islands", - "vn" : "Vietnam", - "vu" : "Vanuatu", - "wf" : "Wallis and Futuna", - "ws" : "Samoa", - "ye" : "Yemen", - "yt" : "Mayotte", - "za" : "South Africa", - "zm" : "Zambia", - "zw" : "Zimbabwe" - } diff --git a/detector/detector.py b/detector/detector.py deleted file mode 100644 index 611f25b..0000000 --- a/detector/detector.py +++ /dev/null @@ -1,437 +0,0 @@ -## Copyright (c) 2011 George Danezis <gd...@microsoft.com> -## -## All rights reserved. -## -## Redistribution and use in source and binary forms, with or without -## modification, are permitted (subject to the limitations in the -## disclaimer below) provided that the following conditions are met: -## -## * Redistributions of source code must retain the above copyright -## notice, this list of conditions and the following disclaimer. -## -## * Redistributions in binary form must reproduce the above copyright -## notice, this list of conditions and the following disclaimer in the -## documentation and/or other materials provided with the -## distribution. -## -## * Neither the name of <Owner Organization> nor the names of its -## contributors may be used to endorse or promote products derived -## from this software without specific prior written permission. -## -## NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE -## GRANTED BY THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT -## HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED -## WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF -## MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -## DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE -## LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -## CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -## SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR -## BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, -## WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE -## OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN -## IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -## -## (Clear BSD license: http://labs.metacarta.com/license-explanation.html#license) - -## This script reads a .csv file of the number of Tor users and finds -## anomalies that might be indicative of censorship. - -# Dep: matplotlib -from pylab import * -import matplotlib - -# Dep: numpy -import numpy - -# Dep: scipy -import scipy.stats -from scipy.stats.distributions import norm -from scipy.stats.distributions import poisson - -# Std lib -from datetime import date -from datetime import timedelta -import os.path - -# Country code -> Country names -import country_info - -# write utf8 to file -import codecs - -days = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"] - -def get_country_name_from_cc(country_code): - if (country_code.lower() in country_info.countries): - return country_info.countries[country_code.lower()] - return country_code # if we didn't find the cc in our map - -""" -Represents a .csv file containing information on the number of -connecting Tor users per country. - -'store': Dictionary with (<country code>, <counter>) as key, and the number of users as value. - <country code> can also be "date"... -'all_dates': List of the data intervals (with default timedelta: 1 day). -'country_codes': List of all relevant country codes. -'MAX_INDEX': Length of store, number of country codes etc. -'date_min': The oldest date found in the .csv. -'date_min': The latest date found in the .csv. -""" -class torstatstore: - def __init__(self, file_name): - f = file(file_name) - country_codes = f.readline() - country_codes = country_codes.strip().split(",") - - store = {} - MAX_INDEX = 0 - for i, line in enumerate(f): - MAX_INDEX += 1 - line_parsed = line.strip().split(",") - for j, (ccode, val) in enumerate(zip(country_codes,line_parsed)): - processed_val = None - if ccode == "date": - try: - year, month, day = int(val[:4]), int(val[5:7]), int(val[8:10]) - processed_val = date(year, month, day) - except Exception, e: - print "Parsing error (ignoring line %s):" % j - print "%s" % val,e - break - - elif val != "NA": - processed_val = int(val) - store[(ccode, i)] = processed_val - - # min and max - date_min = store[("date", 0)] - date_max = store[("date", i)] - - all_dates = [] - d = date_min - dt = timedelta(days=1) - while d <= date_max: - all_dates += [d] - d = d + dt - - # Save for later - self.store = store - self.all_dates = all_dates - self.country_codes = country_codes - self.MAX_INDEX = MAX_INDEX - self.date_min = date_min - self.date_max = date_max - - """Return a list representing a time series of 'ccode' with respect - to the number of connected users. - """ - def get_country_series(self, ccode): - assert ccode in self.country_codes - series = {} - for d in self.all_dates: - series[d] = None - for i in range(self.MAX_INDEX): - series[self.store[("date", i)]] = self.store[(ccode, i)] - sx = [] - for d in self.all_dates: - sx += [series[d]] - return sx - - """Return an ordered list containing tuples of the form (<number of - users>, <country code>). The list is ordered with respect to the - number of users for each country. - """ - def get_largest(self, number): - exclude = set(["all", "??", "date"]) - l = [(self.store[(c, self.MAX_INDEX-1)], c) for c in self.country_codes if c not in exclude] - l.sort() - l.reverse() - return l[:number] - - """Return a dictionary, with <country code> as key, and the time - series of the country code as the value. - """ - def get_largest_locations(self, number): - l = self.get_largest(number) - res = {} - for _, ccode in l[:number]: - res[ccode] = self.get_country_series(ccode) - return res - -"""Return a list containing lists (?) where each such list contains -the difference in users for a time delta of 'days' -""" -def n_day_rel(series, days): - rel = [] - for i, v in enumerate(series): - if series[i] is None: - rel += [None] - continue - - if i - days < 0 or series[i-days] is None or series[i-days] == 0: - rel += [None] - else: - rel += [ float(series[i]) / series[i-days]] - return rel - -# Main model: computes the expected min / max range of number of users -def make_tendencies_minmax(l, INTERVAL = 1): - lminus1 = dict([(ccode, n_day_rel(l[ccode], INTERVAL)) for ccode in l]) - c = lminus1[lminus1.keys()[0]] - dists = [] - minx = [] - maxx = [] - for i in range(len(c)): - vals = [lminus1[ccode][i] for ccode in lminus1.keys() if lminus1[ccode][i] != None] - if len(vals) < 8: - dists += [None] - minx += [None] - maxx += [None] - else: - vals.sort() - median = vals[len(vals)/2] - q1 = vals[len(vals)/4] - q2 = vals[(3*len(vals))/4] - qd = q2 - q1 - vals = [v for v in vals if median - qd*4 < v and v < median + qd*4] - if len(vals) < 8: - dists += [None] - minx += [None] - maxx += [None] - continue - mu, signma = norm.fit(vals) - dists += [(mu, signma)] - maxx += [norm.ppf(0.9999, mu, signma)] - minx += [norm.ppf(1 - 0.9999, mu, signma)] - ## print minx[-1], maxx[-1] - return minx, maxx - -# Makes pretty plots -def raw_plot(series, minc, maxc, labels, xtitle): - assert len(xtitle) == 3 - fname, stitle, slegend = xtitle - - font = {'family' : 'Bitstream Vera Sans', - 'weight' : 'normal', - 'size' : 8} - matplotlib.rc('font', **font) - - ylim( (-max(series)*0.1, max(series)*1.1) ) - plot(labels, series, linewidth=1.0, label="Users") - - wherefill = [] - for mm,mx in zip(minc, maxc): - wherefill += [not (mm == None and mx == None)] - assert mm < mx or (mm == None and mx == None) - - fill_between(labels, minc, maxc, where=wherefill, color="gray", label="Prediction") - - vdown = [] - vup = [] - for i,v in enumerate(series): - if minc[i] != None and v < minc[i]: - vdown += [v] - vup += [None] - elif maxc[i] != None and v > maxc[i]: - vdown += [None] - vup += [v] - else: - vup += [None] - vdown += [None] - - plot(labels, vdown, 'o', ms=10, lw=2, alpha=0.5, mfc='orange', label="Downturns") - plot(labels, vup, 'o', ms=10, lw=2, alpha=0.5, mfc='green', label="Upturns") - - legend(loc=2) - - xlabel('Time (days)') - ylabel('Users') - title(stitle) - grid(True) - F = gcf() - - F.set_size_inches(10,5) - F.savefig(fname, format="png", dpi = (150)) - close() - -def absolute_plot(series, minc, maxc, labels,INTERVAL, xtitle): - in_minc = [] - in_maxc = [] - for i, v in enumerate(series): - if i > 0 and i - INTERVAL >= 0 and series[i] != None and series[i-INTERVAL] != None and series[i-INTERVAL] != 0 and minc[i]!= None and maxc[i]!= None: - in_minc += [minc[i] * poisson.ppf(1-0.9999, series[i-INTERVAL])] - in_maxc += [maxc[i] * poisson.ppf(0.9999, series[i-INTERVAL])] - if not in_minc[-1] < in_maxc[-1]: - print in_minc[-1], in_maxc[-1], series[i-INTERVAL], minc[i], maxc[i] - assert in_minc[-1] < in_maxc[-1] - else: - in_minc += [None] - in_maxc += [None] - raw_plot(series, in_minc, in_maxc, labels, xtitle) - -"""Return the number of downscores and upscores of a time series -'series', given tendencies 'minc' and 'maxc' for the time interval -'INTERVAL'. - -If 'scoring_interval' is specifed we only consider upscore/downscore -that happened in the latest 'scoring_interval' days. -""" -def censor_score(series, minc, maxc, INTERVAL, scoring_interval=None): - upscore = 0 - downscore = 0 - - if scoring_interval is None: - scoring_interval = len(series) - assert(len(series) >= scoring_interval) - - for i, v in enumerate(series): - if i > 0 and i - INTERVAL >= 0 and series[i] != None and series[i-INTERVAL] != None and series[i-INTERVAL] != 0 and minc[i]!= None and maxc[i]!= None: - in_minc = minc[i] * poisson.ppf(1-0.9999, series[i-INTERVAL]) - in_maxc = maxc[i] * poisson.ppf(0.9999, series[i-INTERVAL]) - if (i >= (len(series) - scoring_interval)): - downscore += 1 if minc[i] != None and v < in_minc else 0 - upscore += 1 if maxc[i] != None and v > in_maxc else 0 - - return downscore, upscore - -def plot_target(tss, TARGET, xtitle, minx, maxx, DAYS=365, INTERV = 7): - ctarget = tss.get_country_series(TARGET) - c = n_day_rel(ctarget, INTERV) - absolute_plot(ctarget[-DAYS:], minx[-DAYS:], maxx[-DAYS:], tss.all_dates[-DAYS:],INTERV, xtitle = xtitle) - -def write_censorship_report_prologue(report_file, dates, notification_period): - if (notification_period == 1): - date_str = "%s" % (dates[-1]) # no need for date range if it's just one day - else: - date_str = "%s to %s" % (dates[-notification_period], dates[-1]) - - prologue = "=======================\n" - prologue += "Automatic Censorship Report for %s\n" % (date_str) - prologue += "=======================\n\n" - report_file.write(prologue) - -## Make a league table of censorship + nice graphs -def plot_all(tss, minx, maxx, INTERV, DAYS=None, rdir="img"): - rdir = os.path.realpath(rdir) - if not os.path.exists(rdir) or not os.path.isdir(rdir): - print "ERROR: %s does not exist or is not a directory." % rdir - return - - summary_file = file(os.path.join(rdir, "summary.txt"), "w") - - if DAYS == None: - DAYS = 6*31 - - s = tss.get_largest(200) - scores = [] - for num, li in s: - print ".", - ds,us = censor_score(tss.get_country_series(li)[-DAYS:], minx[-DAYS:], maxx[-DAYS:], INTERV) - # print ds, us - scores += [(ds,num, us, li)] - scores.sort() - scores.reverse() - s = "\n=======================\n" - s+= "Report for %s to %s\n" % (tss.all_dates[-DAYS], tss.all_dates[-1]) - s+= "=======================\n" - print s - summary_file.write(s) - for a,nx, b,c in scores: - if a > 0: - s = "%s -- down: %2d (up: %2d affected: %s)" % (c, a, b, nx) - print s - summary_file.write(s + "\n") - xtitle = (os.path.join(rdir, "%03d-%s-censor.png" % (a,c)), "Tor report for %s -- down: %2d (up: %2d affected: %s)" % (c, a, b, nx),"") - plot_target(tss, c,xtitle, minx, maxx, DAYS, INTERV) - summary_file.close() - -"""Write a CSV report on the minimum/maximum users of each country per date.""" -def write_all(tss, minc, maxc, RANGES_FILE, INTERVAL=7): - ranges_file = file(RANGES_FILE, "w") - ranges_file.write("date,country,minusers,maxusers\n") - exclude = set(["all", "??", "date"]) - for c in tss.country_codes: - if c in exclude: - continue - series = tss.get_country_series(c) - for i, v in enumerate(series): - if i > 0 and i - INTERVAL >= 0 and series[i] != None and series[i-INTERVAL] != None and series[i-INTERVAL] != 0 and minc[i]!= None and maxc[i]!= None: - minv = minc[i] * poisson.ppf(1-0.9999, series[i-INTERVAL]) - maxv = maxc[i] * poisson.ppf(0.9999, series[i-INTERVAL]) - if not minv < maxv: - print minv, maxv, series[i-INTERVAL], minc[i], maxc[i] - assert minv < maxv - ranges_file.write("%s,%s,%s,%s\n" % (tss.all_dates[i], c, minv, maxv)) - ranges_file.close() - -"""Return a URL that points to a graph in metrics.tpo that displays -the number of direct Tor users in country 'country_code', for a -'period'-days period. - -Let's hope that the metrics.tpo URL scheme doesn't change often. -""" -def get_tor_usage_graph_url_for_cc_and_date(country_code, dates, period): - url = "https://metrics.torproject.org/users.html?graph=userstats-relay-country&start=%s&end=%s&country=%s&events=on#userstats-relay-country\n" % \ - (dates[-period], dates[-1], country_code) - return url - -"""Write a file containing a short censorship report over the last -'notification_period' days. -""" -def write_ml_report(tss, minx, maxx, INTERV, DAYS, notification_period=None): - if notification_period is None: - notification_period = DAYS - - report_file = codecs.open('short_censorship_report.txt', 'w', 'utf-8') - file_prologue_written = False - - s = tss.get_largest(None) # no restrictions, get 'em all. - scores = [] - for num, li in s: - ds,us = censor_score(tss.get_country_series(li)[-DAYS:], minx[-DAYS:], maxx[-DAYS:], INTERV, notification_period) - scores += [(ds,num, us, li)] - scores.sort() - scores.reverse() - - for downscores,users_n,upscores,country_code in scores: - if (downscores > 0) or (upscores > 0): - if not file_prologue_written: - write_censorship_report_prologue(report_file, tss.all_dates, notification_period) - file_prologue_written = True - - if ((upscores > 0) and (downscores == 0)): - s = "We detected an unusual spike of Tor users in %s (%d upscores, %d users):\n" % \ - (get_country_name_from_cc(country_code), upscores, users_n) - else: - s = "We detected %d potential censorship events in %s (users: %d, upscores: %d):\n" % \ - (downscores, get_country_name_from_cc(country_code), users_n, upscores) - - # Also give out a link for the appropriate usage graph for a 90-days period. - s += get_tor_usage_graph_url_for_cc_and_date(country_code, tss.all_dates, 90) - - report_file.write(s + "\n") - - report_file.close() - -# INTERV is the time interval to model connection rates; -# consider maximum DAYS days back. -def detect(CSV_FILE = "userstats-detector.csv", - RANGES_FILE = "userstats-ranges.csv", GRAPH_DIR = "img", - INTERV = 7, DAYS = 6 * 31, REPORT = True): - tss = torstatstore(CSV_FILE) - l = tss.get_largest_locations(50) - minx, maxx = make_tendencies_minmax(l, INTERV) - #plot_all(tss, minx, maxx, INTERV, DAYS, rdir=GRAPH_DIR) - write_all(tss, minx, maxx, RANGES_FILE, INTERV) - - if REPORT: - # Make our short report; only consider events of the last day - write_ml_report(tss, minx, maxx, INTERV, DAYS, 1) - -def main(): - detect() - -if __name__ == "__main__": - main() diff --git a/detector/detector.sh b/detector/detector.sh deleted file mode 100755 index 56f6886..0000000 --- a/detector/detector.sh +++ /dev/null @@ -1,6 +0,0 @@ -#!/bin/bash -wget -qO direct-users.csv --no-check-certificate https://metrics.torproject.org/csv/direct-users.csv -wget -qO userstats-detector.csv --no-check-certificate https://metrics.torproject.org/csv/userstats-detector.csv -python detector.py -cat short_censorship_report.txt | mail -E -s 'Possible censorship events' tor-censorship-eve...@lists.torproject.org - diff --git a/modules/clients/.gitignore b/modules/clients/.gitignore new file mode 100644 index 0000000..29a7166 --- /dev/null +++ b/modules/clients/.gitignore @@ -0,0 +1,2 @@ +*.csv + diff --git a/modules/clients/build.xml b/modules/clients/build.xml new file mode 100644 index 0000000..f90e138 --- /dev/null +++ b/modules/clients/build.xml @@ -0,0 +1,44 @@ +<project default="run" name="clients" basedir="."> + + <property name="sources" value="src"/> + <property name="classes" value="classes"/> + <path id="classpath"> + <pathelement path="${classes}"/> + <fileset dir="/usr/share/java"> + <include name="commons-codec-1.6.jar"/> + <include name="commons-compress-1.4.1.jar"/> + <include name="commons-lang-2.6.jar"/> + </fileset> + <fileset dir="../../deps/metrics-lib"> + <include name="descriptor.jar"/> + </fileset> + </path> + + <target name="metrics-lib"> + <ant dir="../../deps/metrics-lib"/> + </target> + + <target name="compile" depends="metrics-lib"> + <mkdir dir="${classes}"/> + <javac destdir="${classes}" + srcdir="${sources}" + source="1.6" + target="1.6" + debug="true" + deprecation="true" + optimize="false" + failonerror="true" + includeantruntime="false"> + <classpath refid="classpath"/> + </javac> + </target> + + <target name="run" depends="compile"> + <java fork="true" + maxmemory="2g" + classname="org.torproject.metrics.clients.Main"> + <classpath refid="classpath"/> + </java> + </target> +</project> + diff --git a/modules/clients/country_info.py b/modules/clients/country_info.py new file mode 100644 index 0000000..e23728e --- /dev/null +++ b/modules/clients/country_info.py @@ -0,0 +1,252 @@ +# -*- coding: utf-8 -*- + +countries = { + "ad" : "Andorra", + "ae" : "the United Arab Emirates", + "af" : "Afghanistan", + "ag" : "Antigua and Barbuda", + "ai" : "Anguilla", + "al" : "Albania", + "am" : "Armenia", + "an" : "the Netherlands Antilles", + "ao" : "Angola", + "aq" : "Antarctica", + "ar" : "Argentina", + "as" : "American Samoa", + "at" : "Austria", + "au" : "Australia", + "aw" : "Aruba", + "ax" : "the Aland Islands", + "az" : "Azerbaijan", + "ba" : "Bosnia and Herzegovina", + "bb" : "Barbados", + "bd" : "Bangladesh", + "be" : "Belgium", + "bf" : "Burkina Faso", + "bg" : "Bulgaria", + "bh" : "Bahrain", + "bi" : "Burundi", + "bj" : "Benin", + "bl" : "Saint Bartelemey", + "bm" : "Bermuda", + "bn" : "Brunei", + "bo" : "Bolivia", + "br" : "Brazil", + "bs" : "the Bahamas", + "bt" : "Bhutan", + "bv" : "the Bouvet Island", + "bw" : "Botswana", + "by" : "Belarus", + "bz" : "Belize", + "ca" : "Canada", + "cc" : "the Cocos (Keeling) Islands", + "cd" : "the Democratic Republic of the Congo", + "cf" : "Central African Republic", + "cg" : "Congo", + "ch" : "Switzerland", + "ci" : u"Côte d'Ivoire", + "ck" : "the Cook Islands", + "cl" : "Chile", + "cm" : "Cameroon", + "cn" : "China", + "co" : "Colombia", + "cr" : "Costa Rica", + "cu" : "Cuba", + "cv" : "Cape Verde", + "cx" : "the Christmas Island", + "cy" : "Cyprus", + "cz" : "the Czech Republic", + "de" : "Germany", + "dj" : "Djibouti", + "dk" : "Denmark", + "dm" : "Dominica", + "do" : "the Dominican Republic", + "dz" : "Algeria", + "ec" : "Ecuador", + "ee" : "Estonia", + "eg" : "Egypt", + "eh" : "the Western Sahara", + "er" : "Eritrea", + "es" : "Spain", + "et" : "Ethiopia", + "fi" : "Finland", + "fj" : "Fiji", + "fk" : "the Falkland Islands (Malvinas)", + "fm" : "the Federated States of Micronesia", + "fo" : "the Faroe Islands", + "fr" : "France", + "fx" : "Metropolitan France", + "ga" : "Gabon", + "gb" : "the United Kingdom", + "gd" : "Grenada", + "ge" : "Georgia", + "gf" : "French Guiana", + "gg" : "Guernsey", + "gh" : "Ghana", + "gi" : "Gibraltar", + "gl" : "Greenland", + "gm" : "Gambia", + "gn" : "Guinea", + "gp" : "Guadeloupe", + "gq" : "Equatorial Guinea", + "gr" : "Greece", + "gs" : "South Georgia and the South Sandwich Islands", + "gt" : "Guatemala", + "gu" : "Guam", + "gw" : "Guinea-Bissau", + "gy" : "Guyana", + "hk" : "Hong Kong", + "hm" : "Heard Island and McDonald Islands", + "hn" : "Honduras", + "hr" : "Croatia", + "ht" : "Haiti", + "hu" : "Hungary", + "id" : "Indonesia", + "ie" : "Ireland", + "il" : "Israel", + "im" : "the Isle of Man", + "in" : "India", + "io" : "the British Indian Ocean Territory", + "iq" : "Iraq", + "ir" : "Iran", + "is" : "Iceland", + "it" : "Italy", + "je" : "Jersey", + "jm" : "Jamaica", + "jo" : "Jordan", + "jp" : "Japan", + "ke" : "Kenya", + "kg" : "Kyrgyzstan", + "kh" : "Cambodia", + "ki" : "Kiribati", + "km" : "Comoros", + "kn" : "Saint Kitts and Nevis", + "kp" : "North Korea", + "kr" : "the Republic of Korea", + "kw" : "Kuwait", + "ky" : "the Cayman Islands", + "kz" : "Kazakhstan", + "la" : "Laos", + "lb" : "Lebanon", + "lc" : "Saint Lucia", + "li" : "Liechtenstein", + "lk" : "Sri Lanka", + "lr" : "Liberia", + "ls" : "Lesotho", + "lt" : "Lithuania", + "lu" : "Luxembourg", + "lv" : "Latvia", + "ly" : "Libya", + "ma" : "Morocco", + "mc" : "Monaco", + "md" : "the Republic of Moldova", + "me" : "Montenegro", + "mf" : "Saint Martin", + "mg" : "Madagascar", + "mh" : "the Marshall Islands", + "mk" : "Macedonia", + "ml" : "Mali", + "mm" : "Burma", + "mn" : "Mongolia", + "mo" : "Macau", + "mp" : "the Northern Mariana Islands", + "mq" : "Martinique", + "mr" : "Mauritania", + "ms" : "Montserrat", + "mt" : "Malta", + "mu" : "Mauritius", + "mv" : "the Maldives", + "mw" : "Malawi", + "mx" : "Mexico", + "my" : "Malaysia", + "mz" : "Mozambique", + "na" : "Namibia", + "nc" : "New Caledonia", + "ne" : "Niger", + "nf" : "Norfolk Island", + "ng" : "Nigeria", + "ni" : "Nicaragua", + "nl" : "the Netherlands", + "no" : "Norway", + "np" : "Nepal", + "nr" : "Nauru", + "nu" : "Niue", + "nz" : "New Zealand", + "om" : "Oman", + "pa" : "Panama", + "pe" : "Peru", + "pf" : "French Polynesia", + "pg" : "Papua New Guinea", + "ph" : "the Philippines", + "pk" : "Pakistan", + "pl" : "Poland", + "pm" : "Saint Pierre and Miquelon", + "pn" : "the Pitcairn Islands", + "pr" : "Puerto Rico", + "ps" : "the Palestinian Territory", + "pt" : "Portugal", + "pw" : "Palau", + "py" : "Paraguay", + "qa" : "Qatar", + "re" : "Reunion", + "ro" : "Romania", + "rs" : "Serbia", + "ru" : "Russia", + "rw" : "Rwanda", + "sa" : "Saudi Arabia", + "sb" : "the Solomon Islands", + "sc" : "the Seychelles", + "sd" : "Sudan", + "se" : "Sweden", + "sg" : "Singapore", + "sh" : "Saint Helena", + "si" : "Slovenia", + "sj" : "Svalbard and Jan Mayen", + "sk" : "Slovakia", + "sl" : "Sierra Leone", + "sm" : "San Marino", + "sn" : "Senegal", + "so" : "Somalia", + "sr" : "Suriname", + "ss" : "South Sudan", + "st" : u"São Tomé and PrÃncipe", + "sv" : "El Salvador", + "sy" : "the Syrian Arab Republic", + "sz" : "Swaziland", + "tc" : "Turks and Caicos Islands", + "td" : "Chad", + "tf" : "the French Southern Territories", + "tg" : "Togo", + "th" : "Thailand", + "tj" : "Tajikistan", + "tk" : "Tokelau", + "tl" : "East Timor", + "tm" : "Turkmenistan", + "tn" : "Tunisia", + "to" : "Tonga", + "tr" : "Turkey", + "tt" : "Trinidad and Tobago", + "tv" : "Tuvalu", + "tw" : "Taiwan", + "tz" : "the United Republic of Tanzania", + "ua" : "Ukraine", + "ug" : "Uganda", + "um" : "the United States Minor Outlying Islands", + "us" : "the United States", + "uy" : "Uruguay", + "uz" : "Uzbekistan", + "va" : "Vatican City", + "vc" : "Saint Vincent and the Grenadines", + "ve" : "Venezuela", + "vg" : "the British Virgin Islands", + "vi" : "the United States Virgin Islands", + "vn" : "Vietnam", + "vu" : "Vanuatu", + "wf" : "Wallis and Futuna", + "ws" : "Samoa", + "ye" : "Yemen", + "yt" : "Mayotte", + "za" : "South Africa", + "zm" : "Zambia", + "zw" : "Zimbabwe" + } diff --git a/modules/clients/detector.py b/modules/clients/detector.py new file mode 100644 index 0000000..611f25b --- /dev/null +++ b/modules/clients/detector.py @@ -0,0 +1,437 @@ +## Copyright (c) 2011 George Danezis <gd...@microsoft.com> +## +## All rights reserved. +## +## Redistribution and use in source and binary forms, with or without +## modification, are permitted (subject to the limitations in the +## disclaimer below) provided that the following conditions are met: +## +## * Redistributions of source code must retain the above copyright +## notice, this list of conditions and the following disclaimer. +## +## * Redistributions in binary form must reproduce the above copyright +## notice, this list of conditions and the following disclaimer in the +## documentation and/or other materials provided with the +## distribution. +## +## * Neither the name of <Owner Organization> nor the names of its +## contributors may be used to endorse or promote products derived +## from this software without specific prior written permission. +## +## NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE +## GRANTED BY THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT +## HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED +## WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +## MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +## DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +## LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +## CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +## SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR +## BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +## WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE +## OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN +## IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +## +## (Clear BSD license: http://labs.metacarta.com/license-explanation.html#license) + +## This script reads a .csv file of the number of Tor users and finds +## anomalies that might be indicative of censorship. + +# Dep: matplotlib +from pylab import * +import matplotlib + +# Dep: numpy +import numpy + +# Dep: scipy +import scipy.stats +from scipy.stats.distributions import norm +from scipy.stats.distributions import poisson + +# Std lib +from datetime import date +from datetime import timedelta +import os.path + +# Country code -> Country names +import country_info + +# write utf8 to file +import codecs + +days = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"] + +def get_country_name_from_cc(country_code): + if (country_code.lower() in country_info.countries): + return country_info.countries[country_code.lower()] + return country_code # if we didn't find the cc in our map + +""" +Represents a .csv file containing information on the number of +connecting Tor users per country. + +'store': Dictionary with (<country code>, <counter>) as key, and the number of users as value. + <country code> can also be "date"... +'all_dates': List of the data intervals (with default timedelta: 1 day). +'country_codes': List of all relevant country codes. +'MAX_INDEX': Length of store, number of country codes etc. +'date_min': The oldest date found in the .csv. +'date_min': The latest date found in the .csv. +""" +class torstatstore: + def __init__(self, file_name): + f = file(file_name) + country_codes = f.readline() + country_codes = country_codes.strip().split(",") + + store = {} + MAX_INDEX = 0 + for i, line in enumerate(f): + MAX_INDEX += 1 + line_parsed = line.strip().split(",") + for j, (ccode, val) in enumerate(zip(country_codes,line_parsed)): + processed_val = None + if ccode == "date": + try: + year, month, day = int(val[:4]), int(val[5:7]), int(val[8:10]) + processed_val = date(year, month, day) + except Exception, e: + print "Parsing error (ignoring line %s):" % j + print "%s" % val,e + break + + elif val != "NA": + processed_val = int(val) + store[(ccode, i)] = processed_val + + # min and max + date_min = store[("date", 0)] + date_max = store[("date", i)] + + all_dates = [] + d = date_min + dt = timedelta(days=1) + while d <= date_max: + all_dates += [d] + d = d + dt + + # Save for later + self.store = store + self.all_dates = all_dates + self.country_codes = country_codes + self.MAX_INDEX = MAX_INDEX + self.date_min = date_min + self.date_max = date_max + + """Return a list representing a time series of 'ccode' with respect + to the number of connected users. + """ + def get_country_series(self, ccode): + assert ccode in self.country_codes + series = {} + for d in self.all_dates: + series[d] = None + for i in range(self.MAX_INDEX): + series[self.store[("date", i)]] = self.store[(ccode, i)] + sx = [] + for d in self.all_dates: + sx += [series[d]] + return sx + + """Return an ordered list containing tuples of the form (<number of + users>, <country code>). The list is ordered with respect to the + number of users for each country. + """ + def get_largest(self, number): + exclude = set(["all", "??", "date"]) + l = [(self.store[(c, self.MAX_INDEX-1)], c) for c in self.country_codes if c not in exclude] + l.sort() + l.reverse() + return l[:number] + + """Return a dictionary, with <country code> as key, and the time + series of the country code as the value. + """ + def get_largest_locations(self, number): + l = self.get_largest(number) + res = {} + for _, ccode in l[:number]: + res[ccode] = self.get_country_series(ccode) + return res + +"""Return a list containing lists (?) where each such list contains +the difference in users for a time delta of 'days' +""" +def n_day_rel(series, days): + rel = [] + for i, v in enumerate(series): + if series[i] is None: + rel += [None] + continue + + if i - days < 0 or series[i-days] is None or series[i-days] == 0: + rel += [None] + else: + rel += [ float(series[i]) / series[i-days]] + return rel + +# Main model: computes the expected min / max range of number of users +def make_tendencies_minmax(l, INTERVAL = 1): + lminus1 = dict([(ccode, n_day_rel(l[ccode], INTERVAL)) for ccode in l]) + c = lminus1[lminus1.keys()[0]] + dists = [] + minx = [] + maxx = [] + for i in range(len(c)): + vals = [lminus1[ccode][i] for ccode in lminus1.keys() if lminus1[ccode][i] != None] + if len(vals) < 8: + dists += [None] + minx += [None] + maxx += [None] + else: + vals.sort() + median = vals[len(vals)/2] + q1 = vals[len(vals)/4] + q2 = vals[(3*len(vals))/4] + qd = q2 - q1 + vals = [v for v in vals if median - qd*4 < v and v < median + qd*4] + if len(vals) < 8: + dists += [None] + minx += [None] + maxx += [None] + continue + mu, signma = norm.fit(vals) + dists += [(mu, signma)] + maxx += [norm.ppf(0.9999, mu, signma)] + minx += [norm.ppf(1 - 0.9999, mu, signma)] + ## print minx[-1], maxx[-1] + return minx, maxx + +# Makes pretty plots +def raw_plot(series, minc, maxc, labels, xtitle): + assert len(xtitle) == 3 + fname, stitle, slegend = xtitle + + font = {'family' : 'Bitstream Vera Sans', + 'weight' : 'normal', + 'size' : 8} + matplotlib.rc('font', **font) + + ylim( (-max(series)*0.1, max(series)*1.1) ) + plot(labels, series, linewidth=1.0, label="Users") + + wherefill = [] + for mm,mx in zip(minc, maxc): + wherefill += [not (mm == None and mx == None)] + assert mm < mx or (mm == None and mx == None) + + fill_between(labels, minc, maxc, where=wherefill, color="gray", label="Prediction") + + vdown = [] + vup = [] + for i,v in enumerate(series): + if minc[i] != None and v < minc[i]: + vdown += [v] + vup += [None] + elif maxc[i] != None and v > maxc[i]: + vdown += [None] + vup += [v] + else: + vup += [None] + vdown += [None] + + plot(labels, vdown, 'o', ms=10, lw=2, alpha=0.5, mfc='orange', label="Downturns") + plot(labels, vup, 'o', ms=10, lw=2, alpha=0.5, mfc='green', label="Upturns") + + legend(loc=2) + + xlabel('Time (days)') + ylabel('Users') + title(stitle) + grid(True) + F = gcf() + + F.set_size_inches(10,5) + F.savefig(fname, format="png", dpi = (150)) + close() + +def absolute_plot(series, minc, maxc, labels,INTERVAL, xtitle): + in_minc = [] + in_maxc = [] + for i, v in enumerate(series): + if i > 0 and i - INTERVAL >= 0 and series[i] != None and series[i-INTERVAL] != None and series[i-INTERVAL] != 0 and minc[i]!= None and maxc[i]!= None: + in_minc += [minc[i] * poisson.ppf(1-0.9999, series[i-INTERVAL])] + in_maxc += [maxc[i] * poisson.ppf(0.9999, series[i-INTERVAL])] + if not in_minc[-1] < in_maxc[-1]: + print in_minc[-1], in_maxc[-1], series[i-INTERVAL], minc[i], maxc[i] + assert in_minc[-1] < in_maxc[-1] + else: + in_minc += [None] + in_maxc += [None] + raw_plot(series, in_minc, in_maxc, labels, xtitle) + +"""Return the number of downscores and upscores of a time series +'series', given tendencies 'minc' and 'maxc' for the time interval +'INTERVAL'. + +If 'scoring_interval' is specifed we only consider upscore/downscore +that happened in the latest 'scoring_interval' days. +""" +def censor_score(series, minc, maxc, INTERVAL, scoring_interval=None): + upscore = 0 + downscore = 0 + + if scoring_interval is None: + scoring_interval = len(series) + assert(len(series) >= scoring_interval) + + for i, v in enumerate(series): + if i > 0 and i - INTERVAL >= 0 and series[i] != None and series[i-INTERVAL] != None and series[i-INTERVAL] != 0 and minc[i]!= None and maxc[i]!= None: + in_minc = minc[i] * poisson.ppf(1-0.9999, series[i-INTERVAL]) + in_maxc = maxc[i] * poisson.ppf(0.9999, series[i-INTERVAL]) + if (i >= (len(series) - scoring_interval)): + downscore += 1 if minc[i] != None and v < in_minc else 0 + upscore += 1 if maxc[i] != None and v > in_maxc else 0 + + return downscore, upscore + +def plot_target(tss, TARGET, xtitle, minx, maxx, DAYS=365, INTERV = 7): + ctarget = tss.get_country_series(TARGET) + c = n_day_rel(ctarget, INTERV) + absolute_plot(ctarget[-DAYS:], minx[-DAYS:], maxx[-DAYS:], tss.all_dates[-DAYS:],INTERV, xtitle = xtitle) + +def write_censorship_report_prologue(report_file, dates, notification_period): + if (notification_period == 1): + date_str = "%s" % (dates[-1]) # no need for date range if it's just one day + else: + date_str = "%s to %s" % (dates[-notification_period], dates[-1]) + + prologue = "=======================\n" + prologue += "Automatic Censorship Report for %s\n" % (date_str) + prologue += "=======================\n\n" + report_file.write(prologue) + +## Make a league table of censorship + nice graphs +def plot_all(tss, minx, maxx, INTERV, DAYS=None, rdir="img"): + rdir = os.path.realpath(rdir) + if not os.path.exists(rdir) or not os.path.isdir(rdir): + print "ERROR: %s does not exist or is not a directory." % rdir + return + + summary_file = file(os.path.join(rdir, "summary.txt"), "w") + + if DAYS == None: + DAYS = 6*31 + + s = tss.get_largest(200) + scores = [] + for num, li in s: + print ".", + ds,us = censor_score(tss.get_country_series(li)[-DAYS:], minx[-DAYS:], maxx[-DAYS:], INTERV) + # print ds, us + scores += [(ds,num, us, li)] + scores.sort() + scores.reverse() + s = "\n=======================\n" + s+= "Report for %s to %s\n" % (tss.all_dates[-DAYS], tss.all_dates[-1]) + s+= "=======================\n" + print s + summary_file.write(s) + for a,nx, b,c in scores: + if a > 0: + s = "%s -- down: %2d (up: %2d affected: %s)" % (c, a, b, nx) + print s + summary_file.write(s + "\n") + xtitle = (os.path.join(rdir, "%03d-%s-censor.png" % (a,c)), "Tor report for %s -- down: %2d (up: %2d affected: %s)" % (c, a, b, nx),"") + plot_target(tss, c,xtitle, minx, maxx, DAYS, INTERV) + summary_file.close() + +"""Write a CSV report on the minimum/maximum users of each country per date.""" +def write_all(tss, minc, maxc, RANGES_FILE, INTERVAL=7): + ranges_file = file(RANGES_FILE, "w") + ranges_file.write("date,country,minusers,maxusers\n") + exclude = set(["all", "??", "date"]) + for c in tss.country_codes: + if c in exclude: + continue + series = tss.get_country_series(c) + for i, v in enumerate(series): + if i > 0 and i - INTERVAL >= 0 and series[i] != None and series[i-INTERVAL] != None and series[i-INTERVAL] != 0 and minc[i]!= None and maxc[i]!= None: + minv = minc[i] * poisson.ppf(1-0.9999, series[i-INTERVAL]) + maxv = maxc[i] * poisson.ppf(0.9999, series[i-INTERVAL]) + if not minv < maxv: + print minv, maxv, series[i-INTERVAL], minc[i], maxc[i] + assert minv < maxv + ranges_file.write("%s,%s,%s,%s\n" % (tss.all_dates[i], c, minv, maxv)) + ranges_file.close() + +"""Return a URL that points to a graph in metrics.tpo that displays +the number of direct Tor users in country 'country_code', for a +'period'-days period. + +Let's hope that the metrics.tpo URL scheme doesn't change often. +""" +def get_tor_usage_graph_url_for_cc_and_date(country_code, dates, period): + url = "https://metrics.torproject.org/users.html?graph=userstats-relay-country&start=%s&end=%s&country=%s&events=on#userstats-relay-country\n" % \ + (dates[-period], dates[-1], country_code) + return url + +"""Write a file containing a short censorship report over the last +'notification_period' days. +""" +def write_ml_report(tss, minx, maxx, INTERV, DAYS, notification_period=None): + if notification_period is None: + notification_period = DAYS + + report_file = codecs.open('short_censorship_report.txt', 'w', 'utf-8') + file_prologue_written = False + + s = tss.get_largest(None) # no restrictions, get 'em all. + scores = [] + for num, li in s: + ds,us = censor_score(tss.get_country_series(li)[-DAYS:], minx[-DAYS:], maxx[-DAYS:], INTERV, notification_period) + scores += [(ds,num, us, li)] + scores.sort() + scores.reverse() + + for downscores,users_n,upscores,country_code in scores: + if (downscores > 0) or (upscores > 0): + if not file_prologue_written: + write_censorship_report_prologue(report_file, tss.all_dates, notification_period) + file_prologue_written = True + + if ((upscores > 0) and (downscores == 0)): + s = "We detected an unusual spike of Tor users in %s (%d upscores, %d users):\n" % \ + (get_country_name_from_cc(country_code), upscores, users_n) + else: + s = "We detected %d potential censorship events in %s (users: %d, upscores: %d):\n" % \ + (downscores, get_country_name_from_cc(country_code), users_n, upscores) + + # Also give out a link for the appropriate usage graph for a 90-days period. + s += get_tor_usage_graph_url_for_cc_and_date(country_code, tss.all_dates, 90) + + report_file.write(s + "\n") + + report_file.close() + +# INTERV is the time interval to model connection rates; +# consider maximum DAYS days back. +def detect(CSV_FILE = "userstats-detector.csv", + RANGES_FILE = "userstats-ranges.csv", GRAPH_DIR = "img", + INTERV = 7, DAYS = 6 * 31, REPORT = True): + tss = torstatstore(CSV_FILE) + l = tss.get_largest_locations(50) + minx, maxx = make_tendencies_minmax(l, INTERV) + #plot_all(tss, minx, maxx, INTERV, DAYS, rdir=GRAPH_DIR) + write_all(tss, minx, maxx, RANGES_FILE, INTERV) + + if REPORT: + # Make our short report; only consider events of the last day + write_ml_report(tss, minx, maxx, INTERV, DAYS, 1) + +def main(): + detect() + +if __name__ == "__main__": + main() diff --git a/modules/clients/init-userstats.sql b/modules/clients/init-userstats.sql new file mode 100644 index 0000000..7c5df3d --- /dev/null +++ b/modules/clients/init-userstats.sql @@ -0,0 +1,575 @@ +-- Copyright 2013 The Tor Project +-- See LICENSE for licensing information + +-- Use enum types for dimensions that may only change if we write new code +-- to support them. For example, if there's a new node type beyond relay +-- and bridge, we'll have to write code to support it. This is in +-- contrast to dimensions like country, transport, or version which don't +-- have their possible values hard-coded anywhere. +CREATE TYPE node AS ENUM ('relay', 'bridge'); +CREATE TYPE metric AS ENUM ('responses', 'bytes', 'status'); + +-- All new data first goes into the imported table. The import tool +-- should do some trivial checks for invalid or duplicate data, but +-- ultimately, we're going to do these checks in the database. For +-- example, the import tool could avoid importing data from the same +-- descriptor more than once, but it's fine to import the same history +-- string from distinct descriptors multiple times. The import tool must, +-- however, make sure that stats_end is not greater than 00:00:00 of the +-- day following stats_start. There are no constraints set on this table, +-- because importing data should be really, really fast. Once the newly +-- imported data is successfully processed, the imported table is emptied. +CREATE TABLE imported ( + + -- The 40-character upper-case hex string identifies a descriptor + -- uniquely and is used to join metrics (responses, bytes, status) + -- published by the same node (relay or bridge). + fingerprint CHARACTER(40) NOT NULL, + + -- The node type is used to decide the statistics that this entry will + -- be part of. + node node NOT NULL, + + -- The metric of this entry describes the stored observation type. + -- We'll want to store different metrics published by a node: + -- - 'responses' are the number of v3 network status consensus requests + -- that the node responded to; + -- - 'bytes' are the number of bytes that the node wrote when answering + -- directory requests; + -- - 'status' are the intervals when the node was listed as running in + -- the network status published by either the directory authorities or + -- bridge authority. + metric metric NOT NULL, + + -- The two-letter lower-case country code that the observation in this + -- entry can be attributed to; can be '??' if no country information is + -- known for this entry, or '' (empty string) if this entry summarizes + -- observations for all countries. + country CHARACTER VARYING(2) NOT NULL, + + -- The pluggable transport name that the observation in this entry can + -- be attributed to; can be '<OR>' if no pluggable transport was used, + -- '<??>' if an unknown pluggable transport was used, or '' (empty + -- string) if this entry summarizes observations for all transports. + transport CHARACTER VARYING(20) NOT NULL, + + -- The IP address version that the observation in this entry can be + -- attributed to; can be 'v4' or 'v6' or '' (empty string) if this entry + -- summarizes observations for all IP address versions. + version CHARACTER VARYING(2) NOT NULL, + + -- The interval start of this observation. + stats_start TIMESTAMP WITHOUT TIME ZONE NOT NULL, + + -- The interval end of this observation. This timestamp must be greater + -- than stats_start and must not be greater than 00:00:00 of the day + -- following stats_start, which the import tool must make sure. + stats_end TIMESTAMP WITHOUT TIME ZONE NOT NULL, + + -- Finally, the observed value. + val DOUBLE PRECISION NOT NULL +); + +-- After importing new data into the imported table, they are merged into +-- the merged table using the merge() function. The merged table contains +-- the same data as the imported table, except: +-- (1) there are no duplicate or overlapping entries in the merged table +-- with respect to stats_start and stats_end and the same fingerprint, +-- node, metric, country, transport, and version columns; +-- (2) all subsequent intervals with the same node, metric, country, +-- transport, version, and stats_start date are compressed into a +-- single entry. +CREATE TABLE merged ( + + -- The unique key that is only used when merging newly imported data + -- into this table. + id SERIAL PRIMARY KEY, + + -- All other columns have the same meaning as in the imported table. + fingerprint CHARACTER(40) NOT NULL, + node node NOT NULL, + metric metric NOT NULL, + country CHARACTER VARYING(2) NOT NULL, + transport CHARACTER VARYING(20) NOT NULL, + version CHARACTER VARYING(2) NOT NULL, + stats_start TIMESTAMP WITHOUT TIME ZONE NOT NULL, + stats_end TIMESTAMP WITHOUT TIME ZONE NOT NULL, + val DOUBLE PRECISION NOT NULL +); + +-- After merging new data into the merged table, they are aggregated to +-- daily user number estimates using the aggregate() function. Only dates +-- with new data in the imported table will be recomputed in the +-- aggregated table. The aggregated components follow the algorithm +-- proposed in Tor Tech Report 2012-10-001. +CREATE TABLE aggregated ( + + -- The date of these aggregated observations. + date DATE NOT NULL, + + -- The node, country, transport, and version columns all have the same + -- meaning as in the imported table. + node node NOT NULL, + country CHARACTER VARYING(2) NOT NULL DEFAULT '', + transport CHARACTER VARYING(20) NOT NULL DEFAULT '', + version CHARACTER VARYING(2) NOT NULL DEFAULT '', + + -- Total number of reported responses, possibly broken down by country, + -- transport, or version if either of them is not ''. See r(R) in the + -- tech report. + rrx DOUBLE PRECISION NOT NULL DEFAULT 0, + + -- Total number of seconds of nodes reporting responses, possibly broken + -- down by country, transport, or version if either of them is not ''. + -- This would be referred to as n(R) in the tech report, though it's not + -- used there. + nrx DOUBLE PRECISION NOT NULL DEFAULT 0, + + -- Total number of reported bytes. See h(H) in the tech report. + hh DOUBLE PRECISION NOT NULL DEFAULT 0, + + -- Total number of seconds of nodes in the status. See n(N) in the tech + -- report. + nn DOUBLE PRECISION NOT NULL DEFAULT 0, + + -- Number of reported bytes of nodes that reported both responses and + -- bytes. See h(R intersect H) in the tech report. + hrh DOUBLE PRECISION NOT NULL DEFAULT 0, + + -- Number of seconds of nodes reporting bytes. See n(H) in the tech + -- report. + nh DOUBLE PRECISION NOT NULL DEFAULT 0, + + -- Number of seconds of nodes reporting responses but no bytes. See + -- n(R \ H) in the tech report. + nrh DOUBLE PRECISION NOT NULL DEFAULT 0 +); + +CREATE LANGUAGE plpgsql; + +-- Merge new entries from the imported table into the merged table, and +-- compress them while doing so. This function first executes a query to +-- match all entries in the imported table with adjacent or even +-- overlapping entries in the merged table. It then loops over query +-- results and either inserts or updates entries in the merged table. The +-- idea is to leave query optimization to the database and only touch +-- as few entries as possible while running this function. +CREATE OR REPLACE FUNCTION merge() RETURNS VOID AS $$ +DECLARE + + -- The current record that we're handling in the loop body. + cur RECORD; + + -- Various information about the last record we processed, so that we + -- can merge the current record with the last one if possible. + last_fingerprint CHARACTER(40) := NULL; + last_node node; + last_metric metric; + last_country CHARACTER VARYING(2); + last_transport CHARACTER VARYING(20); + last_version CHARACTER VARYING(2); + last_start TIMESTAMP WITHOUT TIME ZONE; + last_end TIMESTAMP WITHOUT TIME ZONE; + last_id INTEGER; + last_val DOUBLE PRECISION; + + -- Interval end and value of the last record before updating them in the + -- last loop step. In a few edge cases, we may update an entry and + -- learn in the next loop step that the updated entry overlaps with the + -- subsequent entry. In these cases we'll have to undo the update, + -- which is why we're storing the updated values. + undo_end TIMESTAMP WITHOUT TIME ZONE; + undo_val DOUBLE PRECISION; + +BEGIN + RAISE NOTICE '% Starting to merge.', timeofday(); + + -- TODO Maybe we'll have to materialize a merged_part table that only + -- contains dates IN (SELECT DISTINCT DATE(stats_start) FROM imported) + -- and use that in the query below. + + -- Loop over results from a query that joins new entries in the imported + -- table with existing entries in the merged table. + FOR cur IN SELECT DISTINCT + + -- Select id, interval start and end, and value of the existing entry + -- in merged; all these fields may be null if the imported entry is + -- not adjacent to an existing one. + merged.id AS merged_id, + merged.stats_start AS merged_start, + merged.stats_end AS merged_end, + merged.val AS merged_val, + + -- Select interval start and end and value of the newly imported + -- entry. + imported.stats_start AS imported_start, + imported.stats_end AS imported_end, + imported.val AS imported_val, + + -- Select columns that define the group of entries that can be merged + -- in the merged table. + imported.fingerprint AS fingerprint, + imported.node AS node, + imported.metric AS metric, + imported.country AS country, + imported.transport AS transport, + imported.version AS version + + -- Select these columns from all entries in the imported table, plus + -- do an outer join on the merged table to find adjacent entries that + -- we might want to merge the new entries with. It's possible that we + -- handle the same imported entry twice, if it starts directly after + -- one existing entry and ends directly before another existing entry. + FROM imported LEFT JOIN merged + + -- First two join conditions are to find adjacent intervals. In fact, + -- we also include overlapping intervals here, so that we can skip the + -- overlapping entry in the imported table. + ON imported.stats_end >= merged.stats_start AND + imported.stats_start <= merged.stats_end AND + + -- Further join conditions are same date, fingerprint, node, etc., + -- so that we don't merge entries that don't belong together. + DATE(imported.stats_start) = DATE(merged.stats_start) AND + imported.fingerprint = merged.fingerprint AND + imported.node = merged.node AND + imported.metric = merged.metric AND + imported.country = merged.country AND + imported.transport = merged.transport AND + imported.version = merged.version + + -- Ordering is key, or our approach to merge subsequent entries is + -- going to break. + ORDER BY imported.fingerprint, imported.node, imported.metric, + imported.country, imported.transport, imported.version, + imported.stats_start, merged.stats_start, imported.stats_end + + -- Now go through the results one by one. + LOOP + + -- Log that we're done with the query and about to start merging. + IF last_fingerprint IS NULL THEN + RAISE NOTICE '% Query returned, now merging entries.', timeofday(); + END IF; + + -- If we're processing the very first entry or if we have reached a + -- new group of entries that belong together, (re-)set last_* + -- variables. + IF last_fingerprint IS NULL OR + DATE(cur.imported_start) <> DATE(last_start) OR + cur.fingerprint <> last_fingerprint OR + cur.node <> last_node OR + cur.metric <> last_metric OR + cur.country <> last_country OR + cur.transport <> last_transport OR + cur.version <> last_version THEN + last_id := -1; + last_start := '1970-01-01 00:00:00'; + last_end := '1970-01-01 00:00:00'; + last_val := -1; + END IF; + + -- Remember all fields that determine the group of which entries + -- belong together. + last_fingerprint := cur.fingerprint; + last_node := cur.node; + last_metric := cur.metric; + last_country := cur.country; + last_transport := cur.transport; + last_version := cur.version; + + -- If the existing entry that we're currently looking at starts before + -- the previous entry ends, we have created two overlapping entries in + -- the last iteration, and that is not allowed. Undo the previous + -- change. + IF cur.merged_start IS NOT NULL AND + cur.merged_start < last_end AND + undo_end IS NOT NULL AND undo_val IS NOT NULL THEN + UPDATE merged SET stats_end = undo_end, val = undo_val + WHERE id = last_id; + undo_end := NULL; + undo_val := NULL; + + -- If there is no adjacent entry to the one we're about to merge, + -- insert it as new entry. + ELSIF cur.merged_end IS NULL THEN + IF cur.imported_start > last_end THEN + last_start := cur.imported_start; + last_end := cur.imported_end; + last_val := cur.imported_val; + INSERT INTO merged (fingerprint, node, metric, country, transport, + version, stats_start, stats_end, val) + VALUES (last_fingerprint, last_node, last_metric, last_country, + last_transport, last_version, last_start, last_end, + last_val) + RETURNING id INTO last_id; + + -- If there was no adjacent entry before starting to merge, but + -- there is now one ending right before the new entry starts, merge + -- the new entry into the existing one. + ELSIF cur.imported_start = last_end THEN + last_val := last_val + cur.imported_val; + last_end := cur.imported_end; + UPDATE merged SET stats_end = last_end, val = last_val + WHERE id = last_id; + END IF; + + -- There's no risk of this entry overlapping with the next. + undo_end := NULL; + undo_val := NULL; + + -- If the new entry ends right when an existing entry starts, but + -- there's a gap between when the previously processed entry ends and + -- when the new entry starts, merge the new entry with the existing + -- entry we're currently looking at. + ELSIF cur.imported_end = cur.merged_start THEN + IF cur.imported_start > last_end THEN + last_id := cur.merged_id; + last_start := cur.imported_start; + last_end := cur.merged_end; + last_val := cur.imported_val + cur.merged_val; + UPDATE merged SET stats_start = last_start, val = last_val + WHERE id = last_id; + + -- If the new entry ends right when an existing entry starts and + -- there's no gap between when the previousl processed entry ends + -- and when the new entry starts, merge the new entry with the other + -- two entries. This happens by deleting the previous entry and + -- expanding the subsequent entry to cover all three entries. + ELSIF cur.imported_start = last_end THEN + DELETE FROM merged WHERE id = last_id; + last_id := cur.merged_id; + last_end := cur.merged_end; + last_val := last_val + cur.merged_val; + UPDATE merged SET stats_start = last_start, val = last_val + WHERE id = last_id; + END IF; + + -- There's no risk of this entry overlapping with the next. + undo_end := NULL; + undo_val := NULL; + + -- If the new entry starts right when an existing entry ends, but + -- there's a gap between the previously processed entry and the + -- existing one, extend the existing entry. There's a special case + -- when this operation is false and must be undone, which is when the + -- newly added entry overlaps with the subsequent entry. That's why + -- we have to store the old interval end and value, so that this + -- operation can be undone in the next loop iteration. + ELSIF cur.imported_start = cur.merged_end THEN + IF last_end < cur.imported_start THEN + undo_end := cur.merged_end; + undo_val := cur.merged_val; + last_id := cur.merged_id; + last_start := cur.merged_start; + last_end := cur.imported_end; + last_val := cur.merged_val + cur.imported_val; + UPDATE merged SET stats_end = last_end, val = last_val + WHERE id = last_id; + + -- If the new entry starts right when an existing entry ends and + -- there's no gap between the previously processed entry and the + -- existing entry, extend the existing entry. This is very similar + -- to the previous case. The same reasoning about possibly having + -- to undo this operation applies. + ELSE + undo_end := cur.merged_end; + undo_val := last_val; + last_end := cur.imported_end; + last_val := last_val + cur.imported_val; + UPDATE merged SET stats_end = last_end, val = last_val + WHERE id = last_id; + END IF; + + -- If none of the cases above applies, there must have been an overlap + -- between the new entry and an existing one. Skip the new entry. + ELSE + last_id := cur.merged_id; + last_start := cur.merged_start; + last_end := cur.merged_end; + last_val := cur.merged_val; + undo_end := NULL; + undo_val := NULL; + END IF; + END LOOP; + + -- That's it, we're done merging. + RAISE NOTICE '% Finishing merge.', timeofday(); + RETURN; +END; +$$ LANGUAGE plpgsql; + +-- Aggregate user estimates for all dates that have updated entries in the +-- merged table. This function first creates a temporary table with +-- new or updated observations, then removes all existing estimates for +-- the dates to be updated, and finally inserts newly computed aggregates +-- for these dates. +CREATE OR REPLACE FUNCTION aggregate() RETURNS VOID AS $$ +BEGIN + RAISE NOTICE '% Starting aggregate step.', timeofday(); + + -- Create a new temporary table containing all relevant information + -- needed to update the aggregated table. In this table, we sum up all + -- observations of a given type by reporting node. This query is + -- (temporarily) materialized, because we need to combine its entries + -- multiple times in various ways. A (non-materialized) view would have + -- meant to re-compute this query multiple times. + CREATE TEMPORARY TABLE update AS + SELECT fingerprint, node, metric, country, transport, version, + DATE(stats_start), SUM(val) AS val, + SUM(CAST(EXTRACT(EPOCH FROM stats_end - stats_start) + AS DOUBLE PRECISION)) AS seconds + FROM merged + WHERE DATE(stats_start) IN ( + SELECT DISTINCT DATE(stats_start) FROM imported) + GROUP BY fingerprint, node, metric, country, transport, version, + DATE(stats_start); + + -- Delete all entries from the aggregated table that we're about to + -- re-compute. + DELETE FROM aggregated WHERE date IN (SELECT DISTINCT date FROM update); + + -- Insert partly empty results for all existing combinations of date, + -- node ('relay' or 'bridge'), country, transport, and version. Only + -- the rrx and nrx fields will contain number and seconds of reported + -- responses for the given combination of date, node, etc., while the + -- other fields will be updated below. + INSERT INTO aggregated (date, node, country, transport, version, rrx, + nrx) + SELECT date, node, country, transport, version, SUM(val) AS rrx, + SUM(seconds) AS nrx + FROM update WHERE metric = 'responses' + GROUP BY date, node, country, transport, version; + + -- Create another temporary table with only those entries that aren't + -- broken down by any dimension. This table is much smaller, so the + -- following operations are much faster. + CREATE TEMPORARY TABLE update_no_dimensions AS + SELECT fingerprint, node, metric, date, val, seconds FROM update + WHERE country = '' + AND transport = '' + AND version = ''; + + -- Update results in the aggregated table by setting aggregates based + -- on reported directory bytes. These aggregates are only based on + -- date and node, so that the same values are set for all combinations + -- of country, transport, and version. + UPDATE aggregated + SET hh = aggregated_bytes.hh, nh = aggregated_bytes.nh + FROM ( + SELECT date, node, SUM(val) AS hh, SUM(seconds) AS nh + FROM update_no_dimensions + WHERE metric = 'bytes' + GROUP BY date, node + ) aggregated_bytes + WHERE aggregated.date = aggregated_bytes.date + AND aggregated.node = aggregated_bytes.node; + + -- Update results based on nodes being contained in the network status. + UPDATE aggregated + SET nn = aggregated_status.nn + FROM ( + SELECT date, node, SUM(seconds) AS nn + FROM update_no_dimensions + WHERE metric = 'status' + GROUP BY date, node + ) aggregated_status + WHERE aggregated.date = aggregated_status.date + AND aggregated.node = aggregated_status.node; + + -- Update results based on nodes reporting both bytes and responses. + UPDATE aggregated + SET hrh = aggregated_bytes_responses.hrh + FROM ( + SELECT bytes.date, bytes.node, + SUM((LEAST(bytes.seconds, responses.seconds) + * bytes.val) / bytes.seconds) AS hrh + FROM update_no_dimensions bytes + LEFT JOIN update_no_dimensions responses + ON bytes.date = responses.date + AND bytes.fingerprint = responses.fingerprint + AND bytes.node = responses.node + WHERE bytes.metric = 'bytes' + AND responses.metric = 'responses' + GROUP BY bytes.date, bytes.node + ) aggregated_bytes_responses + WHERE aggregated.date = aggregated_bytes_responses.date + AND aggregated.node = aggregated_bytes_responses.node; + + -- Update results based on notes reporting responses but no bytes. + UPDATE aggregated + SET nrh = aggregated_responses_bytes.nrh + FROM ( + SELECT responses.date, responses.node, + SUM(GREATEST(0, responses.seconds + - COALESCE(bytes.seconds, 0))) AS nrh + FROM update_no_dimensions responses + LEFT JOIN update_no_dimensions bytes + ON responses.date = bytes.date + AND responses.fingerprint = bytes.fingerprint + AND responses.node = bytes.node + WHERE responses.metric = 'responses' + AND bytes.metric = 'bytes' + GROUP BY responses.date, responses.node + ) aggregated_responses_bytes + WHERE aggregated.date = aggregated_responses_bytes.date + AND aggregated.node = aggregated_responses_bytes.node; + + -- We're done aggregating new data. + RAISE NOTICE '% Finishing aggregate step.', timeofday(); + RETURN; +END; +$$ LANGUAGE plpgsql; + +-- User-friendly view on the aggregated table that implements the +-- algorithm proposed in Tor Tech Report 2012-10-001. This view returns +-- user number estimates for both relay and bridge staistics, possibly +-- broken down by country or transport or version. +CREATE OR REPLACE VIEW estimated AS SELECT + + -- The date of this user number estimate. + a.date, + + -- The node type, which is either 'relay' or 'bridge'. + a.node, + + -- The two-letter lower-case country code of this estimate; can be '??' + -- for an estimate of users that could not be resolved to any country, + -- or '' (empty string) for an estimate of all users, regardless of + -- country. + a.country, + + -- The pluggable transport name of this estimate; can be '<OR>' for an + -- estimate of users that did not use any pluggable transport, '<??>' + -- for unknown pluggable transports, or '' (empty string) for an + -- estimate of all users, regardless of transport. + a.transport, + + -- The IP address version of this estimate; can be 'v4' or 'v6', or '' + -- (empty string) for an estimate of all users, regardless of IP address + -- version. + a.version, + + -- Estimated fraction of nodes reporting directory requests, which is + -- used to extrapolate observed requests to estimated total requests in + -- the network. The closer this fraction is to 1.0, the more precise + -- the estimation. + CAST(a.frac * 100 AS INTEGER) AS frac, + + -- Finally, the estimate number of users. + CAST(a.rrx / (a.frac * 10) AS INTEGER) AS users + + -- Implement the estimation method in a subquery, so that the ugly + -- formula only has to be written once. + FROM ( + SELECT date, node, country, transport, version, rrx, nrx, + (hrh * nh + hh * nrh) / (hh * nn) AS frac + FROM aggregated WHERE hh * nn > 0.0) a + + -- Only include estimates with at least 10% of nodes reporting directory + -- request statistics. + WHERE a.frac BETWEEN 0.1 AND 1.0 + + -- Order results. + ORDER BY date DESC, node, version, transport, country; + diff --git a/modules/clients/merge-clients.R b/modules/clients/merge-clients.R new file mode 100644 index 0000000..cce7e9d --- /dev/null +++ b/modules/clients/merge-clients.R @@ -0,0 +1,19 @@ +require(reshape) +r <- read.csv("userstats-ranges.csv", stringsAsFactors = FALSE) +r <- melt(r, id.vars = c("date", "country")) +r <- data.frame(date = r$date, node = "relay", country = r$country, + transport = "", version = "", + variable = ifelse(r$variable == "maxusers", "upper", "lower"), + value = floor(r$value)) +u <- read.csv("userstats.csv", stringsAsFactors = FALSE) +u <- melt(u, id.vars = c("date", "node", "country", "transport", + "version")) +u <- data.frame(date = u$date, node = u$node, country = u$country, + transport = u$transport, version = u$version, + variable = ifelse(u$variable == "frac", "frac", "clients"), + value = u$value) +c <- rbind(r, u) +c <- cast(c, date + node + country + transport + version ~ variable) +c <- c[order(as.Date(c$date), c$node, c$country, c$transport, c$version), ] +write.csv(c, "clients.csv", quote = FALSE, row.names = FALSE, na = "") + diff --git a/modules/clients/src/org/torproject/metrics/clients/Main.java b/modules/clients/src/org/torproject/metrics/clients/Main.java new file mode 100644 index 0000000..2e6712b --- /dev/null +++ b/modules/clients/src/org/torproject/metrics/clients/Main.java @@ -0,0 +1,465 @@ +/* Copyright 2013 The Tor Project + * See LICENSE for licensing information */ +package org.torproject.metrics.clients; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.SortedMap; +import java.util.TimeZone; +import java.util.TreeMap; + +import org.torproject.descriptor.BandwidthHistory; +import org.torproject.descriptor.BridgeNetworkStatus; +import org.torproject.descriptor.Descriptor; +import org.torproject.descriptor.DescriptorFile; +import org.torproject.descriptor.DescriptorReader; +import org.torproject.descriptor.DescriptorSourceFactory; +import org.torproject.descriptor.ExtraInfoDescriptor; +import org.torproject.descriptor.NetworkStatusEntry; +import org.torproject.descriptor.RelayNetworkStatusConsensus; + +public class Main { + + public static void main(String[] args) throws Exception { + parseArgs(args); + parseRelayDescriptors(); + parseBridgeDescriptors(); + closeOutputFiles(); + } + + private static boolean writeToSingleFile = true; + private static boolean byStatsDateNotByDescHour = false; + + private static void parseArgs(String[] args) { + if (args.length == 0) { + writeToSingleFile = true; + } else if (args.length == 1 && args[0].equals("--stats-date")) { + writeToSingleFile = false; + byStatsDateNotByDescHour = true; + } else if (args.length == 1 && args[0].equals("--desc-hour")) { + writeToSingleFile = false; + byStatsDateNotByDescHour = false; + } else { + System.err.println("Usage: java " + Main.class.getName() + + " [ --stats-date | --desc-hour ]"); + System.exit(1); + } + } + + private static final long ONE_HOUR_MILLIS = 60L * 60L * 1000L, + ONE_DAY_MILLIS = 24L * ONE_HOUR_MILLIS, + ONE_WEEK_MILLIS = 7L * ONE_DAY_MILLIS; + + private static void parseRelayDescriptors() throws Exception { + DescriptorReader descriptorReader = + DescriptorSourceFactory.createDescriptorReader(); + descriptorReader.setExcludeFiles(new File( + "status/relay-descriptors")); + descriptorReader.addDirectory(new File( + "../../shared/in/recent/relay-descriptors")); + Iterator<DescriptorFile> descriptorFiles = + descriptorReader.readDescriptors(); + while (descriptorFiles.hasNext()) { + DescriptorFile descriptorFile = descriptorFiles.next(); + for (Descriptor descriptor : descriptorFile.getDescriptors()) { + if (descriptor instanceof ExtraInfoDescriptor) { + parseRelayExtraInfoDescriptor((ExtraInfoDescriptor) descriptor); + } else if (descriptor instanceof RelayNetworkStatusConsensus) { + parseRelayNetworkStatusConsensus( + (RelayNetworkStatusConsensus) descriptor); + } + } + } + } + + private static void parseRelayExtraInfoDescriptor( + ExtraInfoDescriptor descriptor) throws IOException { + long publishedMillis = descriptor.getPublishedMillis(); + String fingerprint = descriptor.getFingerprint(). + toUpperCase(); + long dirreqStatsEndMillis = descriptor.getDirreqStatsEndMillis(); + long dirreqStatsIntervalLengthMillis = + descriptor.getDirreqStatsIntervalLength() * 1000L; + SortedMap<String, Integer> requests = descriptor.getDirreqV3Reqs(); + BandwidthHistory dirreqWriteHistory = + descriptor.getDirreqWriteHistory(); + parseRelayDirreqV3Reqs(fingerprint, publishedMillis, + dirreqStatsEndMillis, dirreqStatsIntervalLengthMillis, requests); + parseRelayDirreqWriteHistory(fingerprint, publishedMillis, + dirreqWriteHistory); + } + + private static void parseRelayDirreqV3Reqs(String fingerprint, + long publishedMillis, long dirreqStatsEndMillis, + long dirreqStatsIntervalLengthMillis, + SortedMap<String, Integer> requests) throws IOException { + if (requests == null || + publishedMillis - dirreqStatsEndMillis > ONE_WEEK_MILLIS || + dirreqStatsIntervalLengthMillis != ONE_DAY_MILLIS) { + /* Cut off all observations that are one week older than + * the descriptor publication time, or we'll have to update + * weeks of aggregate values every hour. */ + return; + } + long statsStartMillis = dirreqStatsEndMillis + - dirreqStatsIntervalLengthMillis; + long utcBreakMillis = (dirreqStatsEndMillis / ONE_DAY_MILLIS) + * ONE_DAY_MILLIS; + for (int i = 0; i < 2; i++) { + long fromMillis = i == 0 ? statsStartMillis + : utcBreakMillis; + long toMillis = i == 0 ? utcBreakMillis : dirreqStatsEndMillis; + if (fromMillis >= toMillis) { + continue; + } + double intervalFraction = ((double) (toMillis - fromMillis)) + / ((double) dirreqStatsIntervalLengthMillis); + double sum = 0L; + for (Map.Entry<String, Integer> e : requests.entrySet()) { + String country = e.getKey(); + double reqs = ((double) e.getValue()) - 4.0; + sum += reqs; + writeOutputLine(fingerprint, "relay", "responses", country, + "", "", fromMillis, toMillis, reqs * intervalFraction, + publishedMillis); + } + writeOutputLine(fingerprint, "relay", "responses", "", "", + "", fromMillis, toMillis, sum * intervalFraction, + publishedMillis); + } + } + + private static void parseRelayDirreqWriteHistory(String fingerprint, + long publishedMillis, BandwidthHistory dirreqWriteHistory) + throws IOException { + if (dirreqWriteHistory == null || + publishedMillis - dirreqWriteHistory.getHistoryEndMillis() + > ONE_WEEK_MILLIS) { + return; + /* Cut off all observations that are one week older than + * the descriptor publication time, or we'll have to update + * weeks of aggregate values every hour. */ + } + long intervalLengthMillis = + dirreqWriteHistory.getIntervalLength() * 1000L; + for (Map.Entry<Long, Long> e : + dirreqWriteHistory.getBandwidthValues().entrySet()) { + long intervalEndMillis = e.getKey(); + long intervalStartMillis = + intervalEndMillis - intervalLengthMillis; + for (int i = 0; i < 2; i++) { + long fromMillis = intervalStartMillis; + long toMillis = intervalEndMillis; + double writtenBytes = (double) e.getValue(); + if (intervalStartMillis / ONE_DAY_MILLIS < + intervalEndMillis / ONE_DAY_MILLIS) { + long utcBreakMillis = (intervalEndMillis + / ONE_DAY_MILLIS) * ONE_DAY_MILLIS; + if (i == 0) { + toMillis = utcBreakMillis; + } else if (i == 1) { + fromMillis = utcBreakMillis; + } + double intervalFraction = ((double) (toMillis - fromMillis)) + / ((double) intervalLengthMillis); + writtenBytes *= intervalFraction; + } else if (i == 1) { + break; + } + writeOutputLine(fingerprint, "relay", "bytes", "", "", "", + fromMillis, toMillis, writtenBytes, publishedMillis); + } + } + } + + private static void parseRelayNetworkStatusConsensus( + RelayNetworkStatusConsensus consensus) throws IOException { + long fromMillis = consensus.getValidAfterMillis(); + long toMillis = consensus.getFreshUntilMillis(); + for (NetworkStatusEntry statusEntry : + consensus.getStatusEntries().values()) { + String fingerprint = statusEntry.getFingerprint(). + toUpperCase(); + if (statusEntry.getFlags().contains("Running")) { + writeOutputLine(fingerprint, "relay", "status", "", "", "", + fromMillis, toMillis, 0.0, fromMillis); + } + } + } + + private static void parseBridgeDescriptors() throws Exception { + DescriptorReader descriptorReader = + DescriptorSourceFactory.createDescriptorReader(); + descriptorReader.setExcludeFiles(new File( + "status/bridge-descriptors")); + descriptorReader.addDirectory(new File( + "../../shared/in/recent/bridge-descriptors")); + Iterator<DescriptorFile> descriptorFiles = + descriptorReader.readDescriptors(); + while (descriptorFiles.hasNext()) { + DescriptorFile descriptorFile = descriptorFiles.next(); + for (Descriptor descriptor : descriptorFile.getDescriptors()) { + if (descriptor instanceof ExtraInfoDescriptor) { + parseBridgeExtraInfoDescriptor( + (ExtraInfoDescriptor) descriptor); + } else if (descriptor instanceof BridgeNetworkStatus) { + parseBridgeNetworkStatus((BridgeNetworkStatus) descriptor); + } + } + } + } + + private static void parseBridgeExtraInfoDescriptor( + ExtraInfoDescriptor descriptor) throws IOException { + String fingerprint = descriptor.getFingerprint().toUpperCase(); + long publishedMillis = descriptor.getPublishedMillis(); + long dirreqStatsEndMillis = descriptor.getDirreqStatsEndMillis(); + long dirreqStatsIntervalLengthMillis = + descriptor.getDirreqStatsIntervalLength() * 1000L; + parseBridgeDirreqV3Resp(fingerprint, publishedMillis, + dirreqStatsEndMillis, dirreqStatsIntervalLengthMillis, + descriptor.getDirreqV3Resp(), + descriptor.getBridgeIps(), + descriptor.getBridgeIpTransports(), + descriptor.getBridgeIpVersions()); + + parseBridgeDirreqWriteHistory(fingerprint, publishedMillis, + descriptor.getDirreqWriteHistory()); + } + + private static void parseBridgeDirreqV3Resp(String fingerprint, + long publishedMillis, long dirreqStatsEndMillis, + long dirreqStatsIntervalLengthMillis, + SortedMap<String, Integer> responses, + SortedMap<String, Integer> bridgeIps, + SortedMap<String, Integer> bridgeIpTransports, + SortedMap<String, Integer> bridgeIpVersions) throws IOException { + if (responses == null || + publishedMillis - dirreqStatsEndMillis > ONE_WEEK_MILLIS || + dirreqStatsIntervalLengthMillis != ONE_DAY_MILLIS) { + /* Cut off all observations that are one week older than + * the descriptor publication time, or we'll have to update + * weeks of aggregate values every hour. */ + return; + } + long statsStartMillis = dirreqStatsEndMillis + - dirreqStatsIntervalLengthMillis; + long utcBreakMillis = (dirreqStatsEndMillis / ONE_DAY_MILLIS) + * ONE_DAY_MILLIS; + double resp = ((double) responses.get("ok")) - 4.0; + if (resp > 0.0) { + for (int i = 0; i < 2; i++) { + long fromMillis = i == 0 ? statsStartMillis + : utcBreakMillis; + long toMillis = i == 0 ? utcBreakMillis : dirreqStatsEndMillis; + if (fromMillis >= toMillis) { + continue; + } + double intervalFraction = ((double) (toMillis - fromMillis)) + / ((double) dirreqStatsIntervalLengthMillis); + writeOutputLine(fingerprint, "bridge", "responses", "", "", + "", fromMillis, toMillis, resp * intervalFraction, + publishedMillis); + parseBridgeRespByCategory(fingerprint, fromMillis, toMillis, resp, + dirreqStatsIntervalLengthMillis, "country", bridgeIps, + publishedMillis); + parseBridgeRespByCategory(fingerprint, fromMillis, toMillis, resp, + dirreqStatsIntervalLengthMillis, "transport", + bridgeIpTransports, publishedMillis); + parseBridgeRespByCategory(fingerprint, fromMillis, toMillis, resp, + dirreqStatsIntervalLengthMillis, "version", bridgeIpVersions, + publishedMillis); + } + } + } + + private static void parseBridgeRespByCategory(String fingerprint, + long fromMillis, long toMillis, double resp, + long dirreqStatsIntervalLengthMillis, String category, + SortedMap<String, Integer> frequencies, long publishedMillis) + throws IOException { + double total = 0.0; + SortedMap<String, Double> frequenciesCopy = + new TreeMap<String, Double>(); + if (frequencies != null) { + for (Map.Entry<String, Integer> e : frequencies.entrySet()) { + if (e.getValue() < 4.0) { + continue; + } + double r = ((double) e.getValue()) - 4.0; + frequenciesCopy.put(e.getKey(), r); + total += r; + } + } + /* If we're not told any frequencies, or at least none of them are + * greater than 4, put in a default that we'll attribute all responses + * to. */ + if (total == 0) { + if (category.equals("country")) { + frequenciesCopy.put("??", 4.0); + } else if (category.equals("transport")) { + frequenciesCopy.put("<OR>", 4.0); + } else if (category.equals("version")) { + frequenciesCopy.put("v4", 4.0); + } + total = 4.0; + } + for (Map.Entry<String, Double> e : frequenciesCopy.entrySet()) { + double intervalFraction = ((double) (toMillis - fromMillis)) + / ((double) dirreqStatsIntervalLengthMillis); + double val = resp * intervalFraction * e.getValue() / total; + if (category.equals("country")) { + writeOutputLine(fingerprint, "bridge", "responses", e.getKey(), + "", "", fromMillis, toMillis, val, publishedMillis); + } else if (category.equals("transport")) { + writeOutputLine(fingerprint, "bridge", "responses", "", + e.getKey(), "", fromMillis, toMillis, val, publishedMillis); + } else if (category.equals("version")) { + writeOutputLine(fingerprint, "bridge", "responses", "", "", + e.getKey(), fromMillis, toMillis, val, publishedMillis); + } + } + } + + private static void parseBridgeDirreqWriteHistory(String fingerprint, + long publishedMillis, BandwidthHistory dirreqWriteHistory) + throws IOException { + if (dirreqWriteHistory == null || + publishedMillis - dirreqWriteHistory.getHistoryEndMillis() + > ONE_WEEK_MILLIS) { + /* Cut off all observations that are one week older than + * the descriptor publication time, or we'll have to update + * weeks of aggregate values every hour. */ + return; + } + long intervalLengthMillis = + dirreqWriteHistory.getIntervalLength() * 1000L; + for (Map.Entry<Long, Long> e : + dirreqWriteHistory.getBandwidthValues().entrySet()) { + long intervalEndMillis = e.getKey(); + long intervalStartMillis = + intervalEndMillis - intervalLengthMillis; + for (int i = 0; i < 2; i++) { + long fromMillis = intervalStartMillis; + long toMillis = intervalEndMillis; + double writtenBytes = (double) e.getValue(); + if (intervalStartMillis / ONE_DAY_MILLIS < + intervalEndMillis / ONE_DAY_MILLIS) { + long utcBreakMillis = (intervalEndMillis + / ONE_DAY_MILLIS) * ONE_DAY_MILLIS; + if (i == 0) { + toMillis = utcBreakMillis; + } else if (i == 1) { + fromMillis = utcBreakMillis; + } + double intervalFraction = ((double) (toMillis - fromMillis)) + / ((double) intervalLengthMillis); + writtenBytes *= intervalFraction; + } else if (i == 1) { + break; + } + writeOutputLine(fingerprint, "bridge", "bytes", "", + "", "", fromMillis, toMillis, writtenBytes, publishedMillis); + } + } + } + + private static void parseBridgeNetworkStatus(BridgeNetworkStatus status) + throws IOException { + long publishedMillis = status.getPublishedMillis(); + long fromMillis = (publishedMillis / ONE_HOUR_MILLIS) + * ONE_HOUR_MILLIS; + long toMillis = fromMillis + ONE_HOUR_MILLIS; + for (NetworkStatusEntry statusEntry : + status.getStatusEntries().values()) { + String fingerprint = statusEntry.getFingerprint(). + toUpperCase(); + if (statusEntry.getFlags().contains("Running")) { + writeOutputLine(fingerprint, "bridge", "status", "", "", "", + fromMillis, toMillis, 0.0, publishedMillis); + } + } + } + + private static Map<String, BufferedWriter> openOutputFiles = + new HashMap<String, BufferedWriter>(); + private static void writeOutputLine(String fingerprint, String node, + String metric, String country, String transport, String version, + long fromMillis, long toMillis, double val, long publishedMillis) + throws IOException { + if (fromMillis > toMillis) { + return; + } + String fromDateTime = formatDateTimeMillis(fromMillis); + String toDateTime = formatDateTimeMillis(toMillis); + BufferedWriter bw = getOutputFile(fromDateTime, publishedMillis); + bw.write(String.format("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%.1f\n", + fingerprint, node, metric, country, transport, version, + fromDateTime, toDateTime, val)); + } + + private static SimpleDateFormat dateTimeFormat = null; + private static String formatDateTimeMillis(long millis) { + if (dateTimeFormat == null) { + dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + dateTimeFormat.setLenient(false); + dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + } + return dateTimeFormat.format(millis); + } + + private static BufferedWriter getOutputFile(String fromDateTime, + long publishedMillis) throws IOException { + String outputFileName; + if (writeToSingleFile) { + outputFileName = "out/userstats.sql"; + } else if (byStatsDateNotByDescHour) { + outputFileName = "out/userstats-" + fromDateTime.substring(0, 10) + + ".sql"; + } else { + String publishedHourDateTime = formatDateTimeMillis( + (publishedMillis / ONE_HOUR_MILLIS) * ONE_HOUR_MILLIS); + outputFileName = "out/userstats-" + + publishedHourDateTime.substring(0, 10) + "-" + + publishedHourDateTime.substring(11, 13) + ".sql"; + } + BufferedWriter bw = openOutputFiles.get(outputFileName); + if (bw == null) { + bw = openOutputFile(outputFileName); + openOutputFiles.put(outputFileName, bw); + } + return bw; + } + + private static BufferedWriter openOutputFile(String outputFileName) + throws IOException { + File outputFile = new File(outputFileName); + outputFile.getParentFile().mkdirs(); + BufferedWriter bw = new BufferedWriter(new FileWriter( + outputFileName)); + bw.write("BEGIN;\n"); + bw.write("LOCK TABLE imported NOWAIT;\n"); + bw.write("COPY imported (fingerprint, node, metric, country, " + + "transport, version, stats_start, stats_end, val) FROM " + + "stdin;\n"); + return bw; + } + + private static void closeOutputFiles() throws IOException { + for (BufferedWriter bw : openOutputFiles.values()) { + bw.write("\\.\n"); + bw.write("SELECT merge();\n"); + bw.write("SELECT aggregate();\n"); + bw.write("TRUNCATE imported;\n"); + bw.write("COMMIT;\n"); + bw.close(); + } + } +} + diff --git a/modules/clients/test-userstats.sql b/modules/clients/test-userstats.sql new file mode 100644 index 0000000..66f8b82 --- /dev/null +++ b/modules/clients/test-userstats.sql @@ -0,0 +1,478 @@ +BEGIN; +SET search_path TO tap, public; +SELECT plan(152); +SET client_min_messages = warning; + +-- Make sure enums are as expected. +SELECT has_enum('node'); +SELECT enum_has_labels('node', ARRAY['relay', 'bridge']); +SELECT has_enum('metric'); +SELECT enum_has_labels('metric', ARRAY['responses', 'bytes', 'status']); + +-- Make sure that the imported table is exactly as the importer expects +-- it. +SELECT has_table('imported'); +SELECT has_column('imported', 'fingerprint'); +SELECT col_type_is('imported', 'fingerprint', 'CHARACTER(40)'); +SELECT col_not_null('imported', 'fingerprint'); +SELECT has_column('imported', 'node'); +SELECT col_type_is('imported', 'node', 'node'); +SELECT col_not_null('imported', 'node'); +SELECT has_column('imported', 'metric'); +SELECT col_type_is('imported', 'metric', 'metric'); +SELECT col_not_null('imported', 'metric'); +SELECT has_column('imported', 'country'); +SELECT col_type_is('imported', 'country', 'CHARACTER VARYING(2)'); +SELECT col_not_null('imported', 'country'); +SELECT has_column('imported', 'transport'); +SELECT col_type_is('imported', 'transport', 'CHARACTER VARYING(20)'); +SELECT col_not_null('imported', 'transport'); +SELECT has_column('imported', 'version'); +SELECT col_type_is('imported', 'version', 'CHARACTER VARYING(2)'); +SELECT col_not_null('imported', 'version'); +SELECT has_column('imported', 'stats_start'); +SELECT col_type_is('imported', 'stats_start', + 'TIMESTAMP WITHOUT TIME ZONE'); +SELECT col_not_null('imported', 'stats_start'); +SELECT has_column('imported', 'stats_end'); +SELECT col_type_is('imported', 'stats_end', + 'TIMESTAMP WITHOUT TIME ZONE'); +SELECT col_not_null('imported', 'stats_end'); +SELECT has_column('imported', 'val'); +SELECT col_type_is('imported', 'val', 'DOUBLE PRECISION'); +SELECT col_not_null('imported', 'val'); +SELECT hasnt_pk('imported'); + +-- Make sure that the internally-used merged table is exactly as merge() +-- expects it. +SELECT has_table('merged'); +SELECT has_column('merged', 'id'); +SELECT col_type_is('merged', 'id', 'INTEGER'); +SELECT col_is_pk('merged', 'id'); +SELECT has_column('merged', 'fingerprint'); +SELECT col_type_is('merged', 'fingerprint', 'CHARACTER(40)'); +SELECT col_not_null('merged', 'fingerprint'); +SELECT has_column('merged', 'node'); +SELECT col_type_is('merged', 'node', 'node'); +SELECT col_not_null('merged', 'node'); +SELECT has_column('merged', 'metric'); +SELECT col_type_is('merged', 'metric', 'metric'); +SELECT col_not_null('merged', 'metric'); +SELECT has_column('merged', 'country'); +SELECT col_type_is('merged', 'country', 'CHARACTER VARYING(2)'); +SELECT col_not_null('merged', 'country'); +SELECT has_column('merged', 'transport'); +SELECT col_type_is('merged', 'transport', 'CHARACTER VARYING(20)'); +SELECT col_not_null('merged', 'transport'); +SELECT has_column('merged', 'version'); +SELECT col_type_is('merged', 'version', 'CHARACTER VARYING(2)'); +SELECT col_not_null('merged', 'version'); +SELECT has_column('merged', 'stats_start'); +SELECT col_type_is('merged', 'stats_start', + 'TIMESTAMP WITHOUT TIME ZONE'); +SELECT col_not_null('merged', 'stats_start'); +SELECT has_column('merged', 'stats_end'); +SELECT col_type_is('merged', 'stats_end', + 'TIMESTAMP WITHOUT TIME ZONE'); +SELECT col_not_null('merged', 'stats_end'); +SELECT has_column('merged', 'val'); +SELECT col_type_is('merged', 'val', 'DOUBLE PRECISION'); +SELECT col_not_null('merged', 'val'); + +-- Make sure that the internally-used aggregated table is exactly as +-- aggregate() expects it. +SELECT has_table('aggregated'); +SELECT has_column('aggregated', 'date'); +SELECT col_type_is('aggregated', 'date', 'DATE'); +SELECT col_not_null('aggregated', 'date'); +SELECT has_column('aggregated', 'node'); +SELECT col_type_is('aggregated', 'node', 'node'); +SELECT col_not_null('aggregated', 'node'); +SELECT has_column('aggregated', 'country'); +SELECT col_type_is('aggregated', 'country', 'CHARACTER VARYING(2)'); +SELECT col_not_null('aggregated', 'country'); +SELECT col_default_is('aggregated', 'country', ''); +SELECT has_column('aggregated', 'transport'); +SELECT col_type_is('aggregated', 'transport', 'CHARACTER VARYING(20)'); +SELECT col_not_null('aggregated', 'transport'); +SELECT col_default_is('aggregated', 'transport', ''); +SELECT has_column('aggregated', 'version'); +SELECT col_type_is('aggregated', 'version', 'CHARACTER VARYING(2)'); +SELECT col_not_null('aggregated', 'version'); +SELECT col_default_is('aggregated', 'version', ''); +SELECT has_column('aggregated', 'rrx'); +SELECT col_type_is('aggregated', 'rrx', 'DOUBLE PRECISION'); +SELECT col_not_null('aggregated', 'rrx'); +SELECT col_default_is('aggregated', 'rrx', 0); +SELECT has_column('aggregated', 'nrx'); +SELECT col_type_is('aggregated', 'nrx', 'DOUBLE PRECISION'); +SELECT col_not_null('aggregated', 'nrx'); +SELECT col_default_is('aggregated', 'nrx', 0); +SELECT has_column('aggregated', 'hh'); +SELECT col_type_is('aggregated', 'hh', 'DOUBLE PRECISION'); +SELECT col_not_null('aggregated', 'hh'); +SELECT col_default_is('aggregated', 'hh', 0); +SELECT has_column('aggregated', 'nn'); +SELECT col_type_is('aggregated', 'nn', 'DOUBLE PRECISION'); +SELECT col_not_null('aggregated', 'nn'); +SELECT col_default_is('aggregated', 'nn', 0); +SELECT has_column('aggregated', 'hrh'); +SELECT col_type_is('aggregated', 'hrh', 'DOUBLE PRECISION'); +SELECT col_not_null('aggregated', 'hrh'); +SELECT col_default_is('aggregated', 'hrh', 0); +SELECT has_column('aggregated', 'nh'); +SELECT col_type_is('aggregated', 'nh', 'DOUBLE PRECISION'); +SELECT col_not_null('aggregated', 'nh'); +SELECT col_default_is('aggregated', 'nh', 0); +SELECT has_column('aggregated', 'nrh'); +SELECT col_type_is('aggregated', 'nrh', 'DOUBLE PRECISION'); +SELECT col_not_null('aggregated', 'nrh'); +SELECT col_default_is('aggregated', 'nrh', 0); + +-- Create temporary tables that hide the actual tables, so that we don't +-- have to care about existing data, not even in a transaction that we're +-- going to roll back. Temporarily set log level to warning to avoid +-- messages about implicitly created sequences and indexes. +CREATE TEMPORARY TABLE imported ( + fingerprint CHARACTER(40) NOT NULL, + node node NOT NULL, + metric metric NOT NULL, + country CHARACTER VARYING(2) NOT NULL, + transport CHARACTER VARYING(20) NOT NULL, + version CHARACTER VARYING(2) NOT NULL, + stats_start TIMESTAMP WITHOUT TIME ZONE NOT NULL, + stats_end TIMESTAMP WITHOUT TIME ZONE NOT NULL, + val DOUBLE PRECISION NOT NULL +); +CREATE TEMPORARY TABLE merged ( + id SERIAL PRIMARY KEY, + fingerprint CHARACTER(40) NOT NULL, + node node NOT NULL, + metric metric NOT NULL, + country CHARACTER VARYING(2) NOT NULL, + transport CHARACTER VARYING(20) NOT NULL, + version CHARACTER VARYING(2) NOT NULL, + stats_start TIMESTAMP WITHOUT TIME ZONE NOT NULL, + stats_end TIMESTAMP WITHOUT TIME ZONE NOT NULL, + val DOUBLE PRECISION NOT NULL +); +CREATE TEMPORARY TABLE aggregated ( + date DATE NOT NULL, + node node NOT NULL, + country CHARACTER VARYING(2) NOT NULL DEFAULT '', + transport CHARACTER VARYING(20) NOT NULL DEFAULT '', + version CHARACTER VARYING(2) NOT NULL DEFAULT '', + rrx DOUBLE PRECISION NOT NULL DEFAULT 0, + nrx DOUBLE PRECISION NOT NULL DEFAULT 0, + hh DOUBLE PRECISION NOT NULL DEFAULT 0, + nn DOUBLE PRECISION NOT NULL DEFAULT 0, + hrh DOUBLE PRECISION NOT NULL DEFAULT 0, + nh DOUBLE PRECISION NOT NULL DEFAULT 0, + nrh DOUBLE PRECISION NOT NULL DEFAULT 0 +); + +-- Test merging newly imported data. +PREPARE new_imported(TIMESTAMP WITHOUT TIME ZONE, + TIMESTAMP WITHOUT TIME ZONE) AS INSERT INTO imported + (fingerprint, node, metric, country, transport, version, stats_start, + stats_end, val) VALUES ('1234567890123456789012345678901234567890', + 'relay', 'status', '', '', '', $1, $2, 0); +PREPARE new_merged(TIMESTAMP WITHOUT TIME ZONE, + TIMESTAMP WITHOUT TIME ZONE) AS INSERT INTO merged + (fingerprint, node, metric, country, transport, version, stats_start, + stats_end, val) VALUES ('1234567890123456789012345678901234567890', + 'relay', 'status', '', '', '', $1, $2, 0); + +EXECUTE new_imported('2013-04-11 14:00:00', '2013-04-11 15:00:00'); +SELECT merge(); +SELECT bag_eq('SELECT stats_end FROM merged', + $$VALUES ('2013-04-11 15:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$, + 'Should insert new entry into empty table as is'); +DELETE FROM imported; +DELETE FROM merged; + +EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 14:00:00'); +EXECUTE new_imported('2013-04-11 16:00:00', '2013-04-11 17:00:00'); +SELECT merge(); +SELECT bag_eq('SELECT stats_end FROM merged', + $$VALUES ('2013-04-11 14:00:00'::TIMESTAMP WITHOUT TIME ZONE), + ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$, + 'Should insert two non-contiguous entries'); +DELETE FROM imported; +DELETE FROM merged; + +EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 15:00:00'); +EXECUTE new_imported('2013-04-11 15:00:00', '2013-04-11 17:00:00'); +SELECT merge(); +SELECT bag_eq('SELECT stats_end FROM merged', + $$VALUES ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$, + 'Should merge two contiguous entries'); +DELETE FROM imported; +DELETE FROM merged; + +EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 16:00:00'); +EXECUTE new_imported('2013-04-11 14:00:00', '2013-04-11 17:00:00'); +SELECT merge(); +SELECT bag_eq('SELECT stats_end FROM merged', + $$VALUES ('2013-04-11 16:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$, + 'Should skip entry that starts before and ends after the start of ' || + 'another new entry'); +DELETE FROM imported; +DELETE FROM merged; + +EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 15:00:00'); +EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 16:00:00'); +SELECT merge(); +SELECT bag_eq('SELECT stats_end FROM merged', + $$VALUES ('2013-04-11 15:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$, + 'Should skip entry that starts at and ends after the start of ' || + 'another new entry'); +DELETE FROM imported; +DELETE FROM merged; + +EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 16:00:00'); +EXECUTE new_imported('2013-04-11 14:00:00', '2013-04-11 15:00:00'); +SELECT merge(); +SELECT bag_eq('SELECT stats_end FROM merged', + $$VALUES ('2013-04-11 16:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$, + 'Should skip entry that starts after another new entry starts and ' || + 'ends before that entry ends'); +DELETE FROM imported; +DELETE FROM merged; + +EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 16:00:00'); +EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 16:00:00'); +SELECT merge(); +SELECT bag_eq('SELECT stats_end FROM merged', + $$VALUES ('2013-04-11 16:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$, + 'Should skip entry that has same start and end as another new entry'); +DELETE FROM imported; +DELETE FROM merged; + +EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 16:00:00'); +EXECUTE new_imported('2013-04-11 14:00:00', '2013-04-11 16:00:00'); +SELECT merge(); +SELECT bag_eq('SELECT stats_end FROM merged', + $$VALUES ('2013-04-11 16:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$, + 'Should skip entry that starts before and ends at the end of ' || + 'another new entry'); +DELETE FROM imported; +DELETE FROM merged; + +EXECUTE new_merged('2013-04-11 16:00:00', '2013-04-11 17:00:00'); +EXECUTE new_imported('2013-04-11 14:00:00', '2013-04-11 15:00:00'); +SELECT merge(); +SELECT bag_eq('SELECT stats_end FROM merged', + $$VALUES ('2013-04-11 15:00:00'::TIMESTAMP WITHOUT TIME ZONE), + ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$, + 'Should insert entry that ends before existing entry starts'); +DELETE FROM imported; +DELETE FROM merged; + +EXECUTE new_merged('2013-04-11 15:00:00', '2013-04-11 16:00:00'); +EXECUTE new_imported('2013-04-11 14:00:00', '2013-04-11 15:00:00'); +SELECT merge(); +SELECT bag_eq('SELECT stats_end FROM merged', + $$VALUES ('2013-04-11 16:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$, + 'Should merge entry that ends when existing entry starts'); +DELETE FROM imported; +DELETE FROM merged; + +EXECUTE new_merged('2013-04-11 14:00:00', '2013-04-11 15:00:00'); +EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 14:30:00'); +SELECT merge(); +SELECT bag_eq('SELECT stats_start FROM merged', + $$VALUES ('2013-04-11 14:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$, + 'Should skip entry that starts before but ends after existing entry ' || + 'starts'); +DELETE FROM imported; +DELETE FROM merged; + +EXECUTE new_merged('2013-04-11 11:00:00', '2013-04-11 13:00:00'); +EXECUTE new_merged('2013-04-11 14:00:00', '2013-04-11 16:00:00'); +EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 15:00:00'); +SELECT merge(); +SELECT bag_eq('SELECT stats_end FROM merged', + $$VALUES ('2013-04-11 13:00:00'::TIMESTAMP WITHOUT TIME ZONE), + ('2013-04-11 16:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$, + 'Should skip entry that starts when existing entry ends but ' || + 'ends before another entry starts'); +DELETE FROM imported; +DELETE FROM merged; + +EXECUTE new_merged('2013-04-11 14:00:00', '2013-04-11 17:00:00'); +EXECUTE new_imported('2013-04-11 14:00:00', '2013-04-11 15:00:00'); +SELECT merge(); +SELECT bag_eq('SELECT stats_end FROM merged', + $$VALUES ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$, + 'Should skip entry that starts when existing entry starts'); +DELETE FROM imported; +DELETE FROM merged; + +EXECUTE new_merged('2013-04-11 14:00:00', '2013-04-11 17:00:00'); +EXECUTE new_imported('2013-04-11 15:00:00', '2013-04-11 16:00:00'); +SELECT merge(); +SELECT bag_eq('SELECT stats_end FROM merged', + $$VALUES ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$, + 'Should skip entry that starts after and ends before existing entry'); +DELETE FROM imported; +DELETE FROM merged; + +EXECUTE new_merged('2013-04-11 14:00:00', '2013-04-11 17:00:00'); +EXECUTE new_imported('2013-04-11 14:00:00', '2013-04-11 17:00:00'); +SELECT merge(); +SELECT bag_eq('SELECT stats_end FROM merged', + $$VALUES ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$, + 'Should skip entry that is already contained'); +DELETE FROM imported; +DELETE FROM merged; + +EXECUTE new_merged('2013-04-11 14:00:00', '2013-04-11 17:00:00'); +EXECUTE new_imported('2013-04-11 16:00:00', '2013-04-11 17:00:00'); +SELECT merge(); +SELECT bag_eq('SELECT stats_end FROM merged', + $$VALUES ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$, + 'Should skip entry that ends when existing entry ends'); +DELETE FROM imported; +DELETE FROM merged; + +EXECUTE new_merged('2013-04-11 14:00:00', '2013-04-11 17:00:00'); +EXECUTE new_imported('2013-04-11 16:00:00', '2013-04-11 18:00:00'); +SELECT merge(); +SELECT bag_eq('SELECT stats_end FROM merged', + $$VALUES ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$, + 'Should skip entry that starts before but ends after existing entry ' || + 'ends'); +DELETE FROM imported; +DELETE FROM merged; + +EXECUTE new_merged('2013-04-11 14:00:00', '2013-04-11 17:00:00'); +EXECUTE new_merged('2013-04-11 18:00:00', '2013-04-11 19:00:00'); +EXECUTE new_imported('2013-04-11 16:00:00', '2013-04-11 18:00:00'); +SELECT merge(); +SELECT bag_eq('SELECT stats_end FROM merged', + $$VALUES ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE), + ('2013-04-11 19:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$, + 'Should skip entry that starts before existing entry ends and ends ' || + 'when another entry starts'); +DELETE FROM imported; +DELETE FROM merged; + +EXECUTE new_merged('2013-04-11 11:00:00', '2013-04-11 13:00:00'); +EXECUTE new_merged('2013-04-11 15:00:00', '2013-04-11 17:00:00'); +EXECUTE new_imported('2013-04-11 12:00:00', '2013-04-11 16:00:00'); +SELECT merge(); +SELECT bag_eq('SELECT stats_end FROM merged', + $$VALUES ('2013-04-11 13:00:00'::TIMESTAMP WITHOUT TIME ZONE), + ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$, + 'Should skip entry that starts before existing entry ends and ends ' || + 'after another entry starts'); +DELETE FROM imported; +DELETE FROM merged; + +EXECUTE new_merged('2013-04-11 14:00:00', '2013-04-11 15:00:00'); +EXECUTE new_imported('2013-04-11 15:00:00', '2013-04-11 16:00:00'); +SELECT merge(); +SELECT bag_eq('SELECT stats_end FROM merged', + $$VALUES ('2013-04-11 16:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$, + 'Should merge entry that ends when existing entry starts'); +DELETE FROM imported; +DELETE FROM merged; + +EXECUTE new_merged('2013-04-11 14:00:00', '2013-04-11 15:00:00'); +EXECUTE new_imported('2013-04-11 16:00:00', '2013-04-11 17:00:00'); +SELECT merge(); +SELECT bag_eq('SELECT stats_end FROM merged', + $$VALUES ('2013-04-11 15:00:00'::TIMESTAMP WITHOUT TIME ZONE), + ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$, + 'Should insert entry that starts after existing entry ends'); +DELETE FROM imported; +DELETE FROM merged; + +EXECUTE new_merged('2013-04-11 15:00:00', '2013-04-11 16:00:00'); +EXECUTE new_imported('2013-04-11 14:00:00', '2013-04-11 17:00:00'); +SELECT merge(); +SELECT bag_eq('SELECT stats_end FROM merged', + $$VALUES ('2013-04-11 16:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$, + 'Should skip entry that starts before existing entry starts and ' || + 'ends after that entry ends'); +DELETE FROM imported; +DELETE FROM merged; + +EXECUTE new_merged('2013-04-11 13:00:00', '2013-04-11 14:00:00'); +EXECUTE new_merged('2013-04-11 15:00:00', '2013-04-11 16:00:00'); +EXECUTE new_imported('2013-04-11 12:00:00', '2013-04-11 17:00:00'); +SELECT merge(); +SELECT bag_eq('SELECT stats_end FROM merged', + $$VALUES ('2013-04-11 14:00:00'::TIMESTAMP WITHOUT TIME ZONE), + ('2013-04-11 16:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$, + 'Should skip entry that starts before and ends after multiple ' || + 'existing entries'); +DELETE FROM imported; +DELETE FROM merged; + +EXECUTE new_imported('2013-04-11 23:00:00', '2013-04-12 00:00:00'); +EXECUTE new_imported('2013-04-12 00:00:00', '2013-04-12 01:00:00'); +SELECT merge(); +SELECT bag_eq('SELECT stats_end FROM merged', + $$VALUES ('2013-04-12 00:00:00'::TIMESTAMP WITHOUT TIME ZONE), + ('2013-04-12 01:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$, + 'Should insert two contiguous entries that end and start at midnight'); +DELETE FROM imported; +DELETE FROM merged; + +EXECUTE new_imported('2013-04-11 12:00:00', '2013-04-11 17:00:00'); +INSERT INTO imported (fingerprint, node, metric, country, transport, + version, stats_start, stats_end, val) VALUES + ('9876543210987654321098765432109876543210', 'relay', 'status', '', '', + '', '2013-04-11 12:00:00', '2013-04-11 17:00:00', 0); +SELECT merge(); +SELECT bag_eq('SELECT stats_end FROM merged', + $$VALUES ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE), + ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$, + 'Should import two entries with different fingerprints and same ' || + 'start and end'); +DELETE FROM imported; +DELETE FROM merged; + +EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 15:00:00'); +INSERT INTO imported (fingerprint, node, metric, country, transport, + version, stats_start, stats_end, val) VALUES + ('9876543210987654321098765432109876543210', 'relay', 'status', '', '', + '', '2013-04-11 14:00:00', '2013-04-11 16:00:00', 0); +SELECT merge(); +SELECT bag_eq('SELECT stats_end FROM merged', + $$VALUES ('2013-04-11 15:00:00'::TIMESTAMP WITHOUT TIME ZONE), + ('2013-04-11 16:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$, + 'Should import two entries with overlapping starts and ends and ' || + 'different fingerprints'); +DELETE FROM imported; +DELETE FROM merged; + +-- TODO Test aggregating imported and merged data. + +-- Make sure that the results view has the exact definition as expected +-- for the .csv export. +SELECT has_view('estimated'); +SELECT has_column('estimated', 'date'); +SELECT col_type_is('estimated', 'date', 'DATE'); +SELECT has_column('estimated', 'node'); +SELECT col_type_is('estimated', 'node', 'node'); +SELECT has_column('estimated', 'country'); +SELECT col_type_is('estimated', 'country', 'CHARACTER VARYING(2)'); +SELECT has_column('estimated', 'transport'); +SELECT col_type_is('estimated', 'transport', 'CHARACTER VARYING(20)'); +SELECT has_column('estimated', 'version'); +SELECT col_type_is('estimated', 'version', 'CHARACTER VARYING(2)'); +SELECT has_column('estimated', 'frac'); +SELECT col_type_is('estimated', 'frac', 'INTEGER'); +SELECT has_column('estimated', 'users'); +SELECT col_type_is('estimated', 'users', 'INTEGER'); + +-- TODO Test that frac and users are computed correctly in the view. + +-- Finish tests. +SELECT * FROM finish(); +RESET client_min_messages; +ROLLBACK; + diff --git a/modules/clients/userstats-detector.R b/modules/clients/userstats-detector.R new file mode 100644 index 0000000..c3a9041 --- /dev/null +++ b/modules/clients/userstats-detector.R @@ -0,0 +1,18 @@ +library("reshape") +export_userstats_detector <- function(path) { + c <- read.csv("userstats.csv", stringsAsFactors = FALSE) + c <- c[c$country != '' & c$transport == '' & c$version == '' & + c$node == 'relay', ] + u <- data.frame(country = c$country, date = c$date, users = c$users, + stringsAsFactors = FALSE) + u <- rbind(u, data.frame(country = "zy", + aggregate(list(users = u$users), + by = list(date = u$date), sum))) + u <- data.frame(date = u$date, country = u$country, + users = floor(u$users)) + u <- cast(u, date ~ country, value = "users") + names(u)[names(u) == "zy"] <- "all" + write.csv(u, path, quote = FALSE, row.names = FALSE) +} +export_userstats_detector("userstats-detector.csv") + diff --git a/shared/bin/80-run-clients-stats.sh b/shared/bin/80-run-clients-stats.sh new file mode 100755 index 0000000..325570c --- /dev/null +++ b/shared/bin/80-run-clients-stats.sh @@ -0,0 +1,30 @@ +#!/bin/sh +set -e + +cd modules/clients/ + +echo `date` "Parsing descriptors." +ant | grep "\[java\]" + +for i in $(ls out/*.sql) +do + echo `date` "Importing $i." + psql -f $i userstats +done + +echo `date` "Exporting results." +psql -c 'COPY (SELECT * FROM estimated) TO STDOUT WITH CSV HEADER;' userstats > userstats.csv + +echo `date` "Running censorship detector." +R --slave -f userstats-detector.R > /dev/null 2>&1 +python detector.py + +echo `date` "Merging censorship detector results." +R --slave -f merge-clients.R > /dev/null 2>&1 +mkdir -p stats/ +cp clients.csv stats/ + +echo `date` "Terminating." + +cd ../../ + diff --git a/shared/bin/99-copy-stats-files.sh b/shared/bin/99-copy-stats-files.sh index 5493cf8..4a30f24 100755 --- a/shared/bin/99-copy-stats-files.sh +++ b/shared/bin/99-copy-stats-files.sh @@ -3,4 +3,5 @@ mkdir -p shared/stats cp -a modules/legacy/stats/*.csv shared/stats/ cp -a modules/advbwdist/stats/advbwdist.csv shared/stats/ cp -a modules/hidserv/stats/hidserv.csv shared/stats/ +cp -a modules/clients/stats/clients.csv shared/stats/
_______________________________________________ tor-commits mailing list tor-commits@lists.torproject.org https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits