On 2009-03-24 at 11:57 -0400, Daniel Kahn Gillmor wrote:
> Are these scripts published? (this is out of curiosity more than
> anything else)

Well, since it's just curiosity, I'll pipe in with something roughly
equivalent to the IP generation side:


No DNS integration, but it provides the data in a format which should be
easy to script into DNS zonefiles.  The "no more than 500 behind" value
is chosen to match that of sks-keyservers.net, but the reference value
is derived with some fast and loose stats, instead of being the local
value.  Search for "def handle_ip_valid" in the script.

This is generated by the attached Python 2.6 script, launched from
Apache/mod_wsgi with:
  <IfModule mod_wsgi.c>
    WSGIDaemonProcess   sks.spodhuis.org user=sks display-name=%{GROUP}
    WSGIProcessGroup    sks.spodhuis.org

    WSGIScriptAlias     /sks-peers 

This is a script/module rooted at 'sks-peers'; you'll want to change the
first few kCONSTANTS in the script for local values -- especially to
grant yourself, not me, privileged access to the tool (to see
internals).  /sks-peers/helpz will give you links to the various admin
URLs; they're read-only views of the data, except for rescanz.

You can invoke it without mod_wsgi, just invoke as:
  ./sks_peers.py standalone
and look at http://localhost:8080/ip-valid instead.

I think the Python 2.6-isms are fairly few, but they're there
(numbers.Integral, etc).  You need some non-standard Python modules, all
in easy reach with easy_install; they're the secnd batch of imports and
the only one whose name isn't just the import's top-level module name
has a comment indicating the name.

The script will then on first retrieval go gather data and report back
"no data yet"; it's persistent past the first retrieval, setting up a
data gathering thread to periodically re-poll, and reloading when
the 'membership' file changes.

After about 30 seconds (when the bad server URL retrievals time out)
you'll have data.  /sks-peers is a table view in HTML with links, etc.
It's ugly as I'm not a UI designer.  /sks-peers/ip-valid reports data in
a format that should be fairly comprehensible:
  status line, including a count
  one IP address per line; number of lines given by count in first line
  a line containing just "."

Working output starts:
  IP-Gen/1: status=COMPLETE count=29 tags=skip_1010
and non-working might be:
  IP-Gen/1: status=INVALID count=0 reason=first_scan

Feel free to adapt for your own use (but credit nice); no warranties,
etc etc.  At your own risk.

# $HeadURL: https://svn.spodhuis.org/ksvn/websites/trunk/sks.spodhuis.org/wsgi/sks_peers.py $
# $Id: sks_peers.py 134 2009-03-24 21:32:47Z XXX $
# Author: Phil Pennock
# Copyright 2009; use/modify/copy freely with attribution.
# No warranties whatsoever; only use if you're able to assess this code.
WSGI application to server up a table of peers of this server.
Main entry point is 'application', if 'main' invoked then can choose
to start a standalone webserver.

import gc
from math import sqrt
import numbers
import os
import Queue
import random
import re
import signal
import socket
import sys
import syslog
import textwrap  # purely for /internalz
import threading
import time
import urllib2

from BeautifulSoup import BeautifulSoup
import Cheetah.Template
import dns.resolver     # dnspython
import ipaddr
import selector

sks_data = None
sks_data_lock = None
startup_lock = threading.Lock()  # see sks_peers_init() for rationale
wsgi_selector = None

kHOSTNAME = 'sks.spodhuis.org'
kHOSTNAME_ALT = 'sks-peer.spodhuis.org'
kMAINT_EMAIL = 'webmas...@spodhuis.org'
kSKS_MEMBERSHIP = '/var/sks/membership'
kRECON_PORT = 11370
kHKP_PORT = 11371
kCOUNTRIES_ZONE = 'zz.countries.nerd.dk.'
kIPLIST_NEED_THRESHOLD = 500  # use the sks-keyservers.net value
kSYSLOG_NAME = 'sks-peers'
kSKIP_ENTRIES = ('localhost', '', '::1')

# This one may *not* contain any variables or directives as it may be used
# by quick & dirty handlers, rather than the template system
"""<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
  <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
  <meta http-equiv="Content-Script-Type" content="application/ecmascript; charset=UTF-8">
  <meta http-equiv="Content-Style-Type" content="text/css; charset=UTF-8">
  <meta http-equiv="imagetoolbar" content="no"> <!-- MSIE control -->
  <link rel="stylesheet" href="/style/spodhuis-sks.css" type="text/css" charset="utf-8">
  <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">

  <link rev="made" href="mailto:$maintainer">
  <div class="usererror">$error</div>

  <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
  <link rev="made" href="mailto:$maintainer">
  <title>$my_hostname Peer Mesh</title>
  <h1>$my_hostname Peer Mesh</h1>
   Entries at depth 0 are direct peers.  Others are seen by spidering the peers.
  <table class="sks peertable">

   <caption>SKS has $peer_count peers of $mesh_count visible</caption>

   <tr class="peer host $rowclass">
    <td class="hostname"$rowspan>$html_link$host_aliases_text</td>
    <td class="ipaddr">$ip</td>
    <td class="location">$geo</td>
    <td class="mutual"$rowspan>$mutual</td>
    <td class="version"$rowspan>$version</td>
    <td class="keys"$rowspan>$keycount</td>
    <td class="peer_distance"$rowspan>$distance</td>

   <tr class="peer host failure $rowclass">
    <td class="hostname">$hostname</td>
    <td class="exception" colspan="6">Error: $error</td>

   <tr class="peer more">
    <td class="ipaddr">$ip</td><td class="location">$geo</td>

class Error(Exception):
  """Generic top-level error exception for this tool."""

class InternalUsageError(Error):
  """We screwed the pooch so badly, we don't even call ourselves right."""

def debug(msg='?', *kwargs):
  syslog.syslog(syslog.LOG_DEBUG, msg % kwargs)

def log(msg='?', *kwargs):
  syslog.syslog(syslog.LOG_INFO, msg % kwargs)

def suicide_delayed(delay=3):
  def kill_me():
    os.kill(os.getpid(), signal.SIGTERM)

# signal handler params must come first
def act_reload_queue(signal=None, frame=None, note=None):
  global sks_data_lock, sks_data
  with sks_data_lock:
    if sks_data is None:
      return False
    if 'gatherer' not in sks_data:
      return False
    g = sks_data['gatherer']
  if note is None and signal is not None:
    note = 'Signal %d received' % signal
  return g.rescan(note)

def html_escape(s):
  """HTML Entity escaping."""
  return str(s).replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')

def html_interp(*params):
  """HTML Entity escape each param, returning results as single tuple.

  Useful for interpolation by providing this as single % param.
  return tuple(map(html_escape, params))

def wrap_check_privileged(handler):
  """WSGI method decorator, check authorisation to access."""
  def _wsgi_hook(environ, start_response):
    ip = environ.get('REMOTE_ADDR', None)
    if ip:
      ip = ipaddr.IP(ip)
      return return_error(environ, start_response)
    # Only IPv4 objects have .IsLoopback() -- *sigh* factories returning
    # objects with inconsistent APIs
    if hasattr(ip, 'IsLoopback'):
      if ip.IsLoopback():
        return handler(environ, start_response)
    elif isinstance(ip, ipaddr.IPv6):
      loop = ipaddr.IP('::1')
      if ip == loop:
        return handler(environ, start_response)
    for block in kPRIVILEGED_ACCESS:
      net = ipaddr.IP(block)
      if net.Contains(ip):
        return handler(environ, start_response)
    return return_error(environ, start_response)
  return _wsgi_hook

def _sort_hostnames(iterable):
  """Sort the iterable parameter as hostnames, least-specific label to most."""
  keyed = []
  for entry in iterable:
    key = entry.split('.')
    keyed.append( (key, entry) )

  def _cmp_keyed(a, b):
    return cmp(a[0], b[0]) or cmp(a[1], b[1])

  for entry in sorted(keyed, cmp=_cmp_keyed):
    yield entry[1]

class AddressInfo(object):
  _all_lock = threading.Lock()
  _all_ai = []
  NEW_ITEM = 1
  NEW_NAME = 2
  OLD_ITEM = 3

  def find_address_info(cls, obj, insert=False):
    newips = [x[0] for x in obj]
    #debug('AI.fai(%s): have IPs: %s', obj.hostname, ' '.join(newips))
    with cls._all_lock:
      for ai in cls._all_ai:
        for ip in newips:
          if ip in ai.addresses:
            return ai
      if insert:
    return None

  def register_address_info(cls, obj):
    existing = cls.find_address_info(obj, insert=True)
    if not existing:
      return cls.NEW_ITEM
    added_name = existing.add_alt_name(obj.hostname)
    if added_name:
      return cls.NEW_NAME
    return cls.OLD_ITEM

  """Information about the IP addressing of an SKS peer."""
  def __init__(self, hostname, walk_state, walker_cb=None, **kwargs):
    self.hostname = hostname
    self.portstr = '%d' % kRECON_PORT
    self._walk_state = walk_state
    self._walker_cb = walker_cb
    self.addresses = list()
    self.alt_names = list()
    self.geography = dict()
    #debug('AddressInfo(%s): created', hostname)
    self.update_lock = threading.Lock()
    with self.update_lock:
# resolve geography first, to populate the dict, to keep iter sane below
      status = self.__class__.register_address_info(self)
      self._walker_cb(self._walk_state, (self.hostname, self, status))

  def _resolve_txt(cls, searchname):
    answers = dns.resolver.query(searchname, 'TXT')
    if not answers:
      return []
    res = []
    for rdata in answers:
    return res

  def _resolve_ip_geography(cls, ip):
    # This sucks, but even googleip doesn't have a 'reverse' method
    # suitable for use with DNS
    if ip.find(':') != -1:
      return None
    # XXX: Assumes either IPv6 or IPv4
    search = ip.split('.')
    search = '.'.join(search) + '.' + kCOUNTRIES_ZONE
    return list(cls._resolve_txt(search))

  def add_alt_name(self, newname):
    with self.update_lock:
      if newname == self.hostname:
        return False
      if newname in self.alt_names:
        return False
      if newname in self.addresses:
        return False
      return True

  def resolve_ips(self):
    addrs = []
    for (family, socktype, proto, canonname, sockaddr) in socket.getaddrinfo(
        self.hostname, self.portstr,
        socket.AF_UNSPEC, socket.SOCK_STREAM, socket.IPPROTO_TCP,
        socket.AI_ADDRCONFIG | socket.AI_NUMERICSERV):
      (addr, port) = socket.getnameinfo(sockaddr,
          socket.NI_NUMERICHOST | socket.NI_NUMERICSERV)

  def resolve_geographies(self):
    for ip in self.addresses:
      self.geography[ip] = self._resolve_ip_geography(ip)

  def __repr__(self):
    return 'AddressInfo(%s)' % self.hostname

  def __len__(self):
    return len(self.addresses)

  def __iter__(self):
    for ip in self.addresses:
      yield (ip, self.geography[ip])

  def iter_all(self):
    yield self.hostname
    for x in self.alt_names:
      yield x
    for x in self.addresses:
      yield x

class SksNode(object):
  """Information retrieved from an SKS peer."""
  def __init__(self, hostname, walk_state, walker_cb=None, **kwargs):
    self.hostname = hostname
    self.portstr = '%d' % kHKP_PORT
    self._url = 'http://%s:%s/pks/lookup?op=stats' % (self.hostname, self.portstr)
    self.exception = None
    self._walk_state = walk_state
    self.distance = isinstance(walk_state, numbers.Integral) and str(walk_state) or '-1'
    self._walker_cb = walker_cb
    #debug('SksNode(%s): created', hostname)

  def fetch_page(self):
      self._data = urllib2.urlopen(self._url, timeout=kSTATS_FETCH_TIMEOUT).read()
    # urllib2 has URLError and HTTPError, subclassing from IOError; however,
    # it uses httplib internally which tends to throw BadStatusLine; all of
    # httplib's exceptions subclass from a local HTTPException class
    except (IOError, urllib2.httplib.HTTPException), e:
      self.exception = e

  def _table_following(self, search):
    s = self._soup.find(text=search).parent.nextSibling
    # straight s.name fails for text as it has no name attribute
    while getattr(s, 'name', '') != u'table':
      s = s.nextSibling
    return s

  def _plain_rows_of(self, search):
    table = self._table_following(search)
    return set([str(x.contents[0].strip()) for x in table.findAll('td')])

  def _dict_from_plain_rows(self, search):
    table = self._table_following(search)
    return dict([x.contents[0].strip().split(None, 2) for x in table.findAll('td')])

  def _kvdict_from_table(self, search):
    table = self._table_following(search)
    d = dict()
    for r in table.findAll('tr'):
      pair = [x.contents[0].strip() for x in r.findAll('td')]
      d[pair[0].rstrip(':')] = pair[1]
    return d

  def analyse(self):
    if self.exception is not None:
      self._soup = BeautifulSoup(self._data)
    except Exception, e:
      self.exception = e
    self.peers = self._dict_from_plain_rows(u'Gossip Peers')
    if self._walker_cb:
      for peer in self.peers:
        if peer in kSKIP_ENTRIES:
          #debug('Analyse(%s): skipping blacklisted item {%s}', self.hostname, peer)
          self._walker_cb(self._walk_state, str(peer))
      debug('Analyse(%s): no walker', self.hostname)
    self.mutual = False
    for me in (kHOSTNAME, kHOSTNAME_ALT):
        if me in self.peers and int(self.peers[me]) == kRECON_PORT:
          self.mutual = True
      except ValueError, e:
    self.mailsync = self._plain_rows_of(u'Outgoing Mailsync Peers')
    self._settings = self._kvdict_from_table(u'Settings')
    # settings should have: Hostname Version 'HTTP port' 'Recon port' 'Debug level'
    self.version = self._settings['Version']
      t = str(self._soup.find(text=u'Statistics').parent.nextSibling.string)
      if t.startswith('Total number of keys'):
        self.keycount = int(t.split(':')[1])
    except AttributeError, e:
      self.keycount = -1
    del self._data, self._soup

  def __nonzero__(self):
    if self.exception:
      return False
    return True

  def __str__(self):
    if self.exception:
      return str(self.exception)
      return str(self._settings['Hostname'])
    except Exception, e:
      return self.hostname

class PoolWorkerFactory(object):
  """Make generic pool workers."""

  def make(cls, name, worker_class):
    """Our factory entry point.  Generate a PoolWorker."""

    def run_init(self, index, inq, outq, walker_cb):
      """PoolWorker __init__."""
      self.name = '%s[%d]' % (name, index)
      self.daemon = True
      self._in_queue = inq
      self._out_queue = outq
      self._walker_cb = walker_cb

    def do_worker(self, input_data, depth):
      """Invoke the wrapper class's constructor for the current data."""
        return worker_class(input_data, walk_state=depth, walker_cb=self._walker_cb)
      except Exception, e:
        return e

    def do_run(self):
      """The thread run() method for the constructed classes."""
      while True:
        depth, data = self._in_queue.get()
        self._out_queue.put( (data, self._do_worker(data, depth=depth)) )

    d = dict()
    d['__init__'] = run_init
    d['_do_worker'] = do_worker
    d['run'] = do_run
    return type(name, (threading.Thread,), d)

class PoolWorkManager(object):
  """Run workers asynchronously, emitting results in a queue.

  Eg, with ThreadResolve, we create the resolver, pass it hostnames,
  pull out tuples of (hostname, AddressInfo) object in non-deterministic order.

  # There's not much point to getting a shutdown Event too, as we can't have
  # timeouts in Queue obj.join() so no way to sensibly interrupt a collect.
  def __init__(self, worker_name, data_class, thread_count, walker_cb=None):
    if not isinstance(thread_count, numbers.Integral) or thread_count < 1:
      raise InternalUsageError("Not a positive integer: %s" % str(thread_count))
    worker_class = PoolWorkerFactory.make(worker_name, data_class)

    self._in_queue = Queue.Queue()
    self._out_queue = Queue.Queue()
    self._threads = list()
    self._previous_results = {}
    for i in range(0, thread_count):
      t = worker_class(i, self._in_queue, self._out_queue, walker_cb)

  def __len__(self):
    """Number of results still to be returned.  Includes pending."""
    return self._in_queue.qsize() + self._out_queue.qsize()

  def add_depthed(self, depth, query):
    self._in_queue.put((depth, query))

  def add(self, query):
    self.add_depthed(0, query)

  def join(self):

  def get(self):
    res = self._out_queue.get()
    return res

  def collect(self):
    results = self._previous_results
    while self:
      (k, d) = self.get()
      results[k] = d
    self._previous_results = results
    return results

class RecursingWalkManager(threading.Thread):
  """Handle requests to see more data."""

  def __init__(self):
    self.name = 'RecursingWalkManager'
    self.daemon = True
    self._queue = Queue.Queue()
    self._managers = None
    self._have_managers = threading.Semaphore(0)
    self.okay_next_time = set()
    self.seen = { kHOSTNAME: 0, kHOSTNAME_ALT: 0 }  # host -> depth
    self.pending = dict()  # host -> depth
    self._hosts_lock = threading.RLock()

  def __len__(self):
    return self._queue.qsize()

  def set_managers(self, manager_dns, manager_node):
    """Sets the manager objects for handling work dispatch."""
    self._manager_dns = manager_dns
    self._manager_node = manager_node

  def add_dns(self, depth, entry):
    self._queue.put(('DNS', depth, entry))

  def add_node(self, depth, entry):
    self._queue.put(('NODE', depth, entry))

  def set_seen(self, name, depth=0):
    """We have seen an entry with this name, at this depth."""
    with self._hosts_lock:
      if name not in self.seen:
        self.seen[name] = depth

  def _requeue_node(self, node, ai):
    """Shuffle node back from pending to try to reprocess (we have DNS)."""
    with self._hosts_lock:
      if node in self.pending:
        depth = self.pending[node]
        del self.pending[node]
        depth -= 1  # already incremented, will be re-incremented
        self._queue.put(('NODE+DNS', depth, (node, ai)))
        return True
      return False

  def _discard_node(self, node):
    """We have DNS and this is a dup!"""
    with self._hosts_lock:
      if node in self.pending:
        del self.pending[node]
      return True

  def _handle_dns_inthread(self, depth, entry):
    host, ai, status = entry
    with self._hosts_lock:
      if status == AddressInfo.NEW_ITEM:
        #debug('Walk: Seen DNS item for first time: %s', host)
        if not self._requeue_node(host, ai):
      elif host in self.okay_next_time:
        #debug('Walk: Not first time seen DNS, but first time acting on it: %s', host)
        self._requeue_node(host, ai)
        if host == ai.hostname:
          #debug('Walk: skipping previously seen DNS: %s', host)
          #debug('Walk: discarding dup DNS: %s (%s)', host, ' '.join(ai.iter_all()))
        for n in ai.iter_all():
          if n not in self.seen or (depth < self.seen[n] and depth >= 0):
            self.seen[n] = depth

  def _handle_node_inthread(self, depth, entry, ai=None):
    if entry in kSKIP_ENTRIES:
    # original caller passes in original depth, or the requeue decr's
    # before passing back in to compensate
    depth += 1
    with self._hosts_lock:
      if entry in self.seen:
        #debug('Walk: already [%s]', entry)
        if depth < self.seen[entry] and depth >= 0:
          self.seen[entry] = depth
        return False
      if ai is None:
        if entry not in self.pending:
          #debug('Walk: deferring until have DNS [%s]', entry)
          self.pending[entry] = depth
          self._manager_dns.add_depthed(depth, entry)
          #debug('Walk: already deferred [%s]', entry)
        return False
      for v in ai.iter_all():
        if v in self.seen:
          #debug('Walk: already [%s] under name [%s]', entry, v)
          return False
        if v in kSKIP_ENTRIES:
          return False
      self.seen[entry] = depth
      for v in ai.iter_all():
        self.seen[v] = depth
      #debug('Walk: new %s', entry)
    self._manager_node.add_depthed(depth, entry)
    return True

  def run(self):
    while True:
        what, depth, entry = self._queue.get()
        #debug('Walk: [%s] [%d] [%s]', what, depth, repr(entry))
        if what == 'DNS':
          self._handle_dns_inthread(depth, entry)
        if what == 'NODE':
          self._handle_node_inthread(depth, entry)
        if what == 'NODE+DNS':
          self._handle_node_inthread(depth, entry[0], entry[1])
        raise InternalUsageError('Unknown queue tag type "%s"' % what)

class DataGatherer(threading.Thread):
  """Master persistent thread responsible for keeping data current.

  This controls all data gathering threads and sleeps between collections.
  def __init__(self, notify_ev, skip_nodechecker=False):
    self._notify_ev = notify_ev
    self._skip_nodechecker = skip_nodechecker
    self._cmd_queue = Queue.Queue()
    self.name = 'DataGatherer'
    self.daemon = False

  def _get_cmd(self):
    if not self._notify_ev.is_set():
      return None
      cmd = self._cmd_queue.get(block=False)
    except Queue.Empty:
      log('ProgError: notify with empty queue')
      return 'kill'
    if cmd in ('kill', 'scan'):
      return cmd
    raise InternalUsageError('Unknown DG queue cmd {%s}' % cmd)

  def _run(self):
    """The data gathering function.  Does not return until shutdown.

    This manages collection threads, puts the data together in a top-level
    container and replaces the global object with that one, under a lock,
    before sleeping to re-do this.
    global sks_data, sks_data_lock

    walker = RecursingWalkManager()
    resolver = PoolWorkManager(
        'Resolve', AddressInfo, kRESOLVE_THREADS,
    node_checker = PoolWorkManager(
        'SksNodeCheck', SksNode, kSKS_POLL_THREADS,
    walker.set_managers(resolver, node_checker)

    peer_re = re.compile('^([A-Za-z0-9]\S+)\s+\d')
    while True:
      with sks_data_lock:
        if sks_data is not None:
          sks_data['updating'] = time.time()
      log('DG: Gathering data')

      start_over = False
      hostnames = set()
      with open(kSKS_MEMBERSHIP) as fn:
        config_stat = os.fstat(fn.fileno())
        for line in fn:
          m = peer_re.match(line)
          if not m:
      self.hostnames = hostnames
      for h in hostnames:
        if not self._skip_nodechecker:

      sizes = {}
      new_data = {}
      keep_checking = True
      final_safety = False
      while keep_checking:
        keep_checking = False
        for (target, pooled) in [
            (None, None),
# DNS is fast; we can finish collecting DNS after a subset, stalling on
# the URL retrievals, then continue to walk further on and have more DNS
# to do.  This is a change in integrity semantics with the introduction of
# walking.
            ('node_data', node_checker),
            ('addresses', resolver),
          if target is not None:
            new_data[target] = pooled.collect()
            old_size = sizes.get(target, 0)
            new_size = len(new_data[target])
            if new_size != old_size:
              sizes[target] = new_size
              keep_checking = True

          cmd = self._get_cmd()
          if cmd == 'kill':
            log('DG: termination request received (during scan), returning')
          if cmd == 'scan':
            start_over = True
        if len(walker) or len(node_checker) or len(resolver):
          keep_checking = True
# If things completed before that len check, then we'd be incomplete
# If we check "one more time" and find differences, then we need to redo
# But if we saw empty and that last "one more time" saw no changes, then
# we've shown that in the time between seeing no changes and the time
# when the queue was empty, there were no changes because the next time
# around there were no changes either.  So we've covered that interval.
        if keep_checking:
          final_safety = False
        elif not final_safety:
          keep_checking = True
          final_safety = True
# I *think* the above is correct but should look into formally proving it

      if start_over:
        log('DG: early termination of run, starting over')

        'mtime': config_stat.st_mtime,
        'gatherer': self,
      with sks_data_lock:
        sks_data = new_data

      delay = float(random.randint(
      log('DG: Gathering data complete, sleeping %.0f seconds', delay)
      sleeping = True
      while sleeping:
        sleep_starting = time.time()

        slept = time.time() - sleep_starting
        if float(slept) > (delay - 1):
          sleeping = False
          delay -= float(slept)

        cmd = self._get_cmd()
        if cmd == 'kill':
          log('DG: termination request received, returning')
        if cmd == 'scan':
          log('DG: received scan request, starting now')
          sleeping = False

  def run(self):
    """Wrapper around _run to collect exceptions safely."""
    global sks_data, sks_data_lock
    except Exception, e:
      with sks_data_lock:
        sks_data = { 'exception': e }

  def kill(self):
    """Returns true iff successfully killed."""
    return not self.is_alive()

  def rescan(self, reason=None):
    """Cut short the sleep, scan now."""
    global sks_data, sks_data_lock
    if reason is None:
      reason = 'no reason given for rescan'
    with sks_data_lock:
      if 'rescan_triggered' in sks_data:
        log('DG/rescan ignoring retrigger [%s]', reason)
        return False
      sks_data['rescan_triggered'] = time.time()
    log('DG/rescan asking for scan [%s]', reason)
    return True

def gen_page_namespace(**kwdict):
  """Return a page namespace dictionary with standard values filled in."""
  page_namespace = dict()
  #TODO: should really URL-encode maintainer too
  page_namespace['maintainer'] = html_escape(kMAINT_EMAIL)
  page_namespace['my_hostname'] = html_escape(kHOSTNAME)
  return page_namespace

def handle_peers_page(environ, start_response):
  """Write the peers page.  WSGI."""
  global sks_data, sks_data_lock

  still_gathering = True
    with sks_data_lock:
      _sks_data = sks_data
    if _sks_data is None:
      addresses = {}
      node_data = {}
      last_mtime = 0.0
      gatherer = None
      updating_since = None
    elif 'addresses' in _sks_data:
      still_gathering = False
      addresses = _sks_data['addresses']
      node_data = _sks_data['node_data']
    elif 'exception' in _sks_data:
      data_gather_exception = _sks_data['exception']
      addresses = {}
      raise InternalUsageError("sks_data is not None but lacks addresses field")
    if _sks_data is not None:
      last_mtime = _sks_data.get('mtime', 0.0)
      gatherer = _sks_data.get('gatherer')
      updating_since = _sks_data.get('updating')
    del _sks_data

    page_namespace = gen_page_namespace()
    page_head = Cheetah.Template.Template(kPAGE_TEMPLATE_HEAD, searchList=[page_namespace])
    page_foot = Cheetah.Template.Template(kPAGE_TEMPLATE_FOOT, searchList=[page_namespace])

    if updating_since is None:
      page_namespace['scanning_active'] = ''
      scanning_for = time.time() - updating_since
      page_namespace['scanning_active'] = '<h3>Data gathering run in progress (for last %.0f seconds)' % scanning_for
    page_namespace['warning'] = still_gathering and '<h2>Still gathering data!</h2>' or ''
    if 'data_gather_exception' in locals():
      page_namespace['warning'] = '<h2>DATA GATHERING FAILED</h2><div>%s</div>' % html_escape(data_gather_exception)

    if gatherer is not None:
      if not gatherer.is_alive():
        page_namespace['warning'] += '<h2>DATA GATHERING THREAD DEAD</h2>'

    if addresses:
      config_stat = os.stat(kSKS_MEMBERSHIP)
      if config_stat.st_mtime != last_mtime:
        if gatherer.rescan('membership mtime update'):
          page_namespace['warning'] += '\n<h2>Config changed, will rescan in background</h2>'

    host_namespace = dict()
    host = Cheetah.Template.Template(kPAGE_TEMPLATE_HOST, searchList=[host_namespace, page_namespace])
    hostmore = Cheetah.Template.Template(kPAGE_TEMPLATE_HOSTMORE, searchList=[host_namespace, page_namespace])
    hosterr = Cheetah.Template.Template(kPAGE_TEMPLATE_HOSTERR, searchList=[host_namespace, page_namespace])
  except Exception, e:
    start_response('500 Error', [('Content-Type', 'text/plain; charset=UTF-8')], sys.exc_info())
    yield 'Bleh, we failed:\n\n'
    yield str(e)
    yield '\n'
    raise StopIteration()

  start_response('200 OK', [
    ('Content-Type', 'text/html; charset=UTF-8'),

  # PEP 333 is explicit that WSGI uses strings, not Unicode
  yield str(page_head)

  count_total = 0
  count_direct = 0
  ordered_entries = []
  by_depth = {}  # depth, hostname
  if node_data:
    for h, v in node_data.iteritems():
      if hasattr(v, 'distance'):
        depth = int(v.distance)
        depth = -1
      if depth not in by_depth:
        by_depth[depth] = []
    if -1 in by_depth:
      shuffle = by_depth[-1]
      del by_depth[-1]
      shuffle_to = sorted(by_depth.keys()).pop() + 1
      by_depth[shuffle_to] = shuffle
    for d in sorted(by_depth.keys()):
    count_direct = len(by_depth[0])
    ordered_entries = _sort_hostnames(addresses.keys())
    count_direct = len(addresses)

  for h in ordered_entries:
    host_namespace['hostname'] = html_escape(h)
    info = addresses.get(h, [('n/a', None)])
    sksnode = node_data.get(h, False)
    count_total += 1
    host_namespace['rowcount'] = count_total
    host_namespace['rowclass'] = count_total % 2 and 'odd' or 'even'

    if isinstance(info, Exception) or not info:
      host_namespace['error'] = str(info)
      yield str(hosterr)

    if len(info) > 1:
      host_namespace['rowspan'] = ' rowspan="%d"' % len(info)
      host_namespace['rowspan'] = ''
    sks_url = 'http://%s:11371/pks/lookup?op=stats' % h
    html_link = '<a href="%s">%s</a>' % html_interp(sks_url, h)
    host_namespace['sks_url'] = sks_url
    host_namespace['html_link'] = html_link

    if isinstance(sksnode, Exception) or not sksnode:
      host_namespace['mutual'] = 'Err'
      host_namespace['version'] = '?'
      host_namespace['keycount'] = '0'
      if hasattr(sksnode, 'distance'):
        host_namespace['distance'] = str(sksnode.distance)
        host_namespace['distance'] = '?'
      host_namespace['mutual'] = sksnode.mutual and 'Yes' or 'No'
      host_namespace['version'] = str(sksnode.version)
      host_namespace['keycount'] = str(sksnode.keycount)
      host_namespace['distance'] = str(sksnode.distance)

    if info.alt_names:
      host_namespace['host_aliases_text'] = \
          ' <span class="host_aliases">%s</span>' % ' '.join(info.alt_names)
      host_namespace['host_aliases_text'] = ''

    first = True
    for item in info:
      host_namespace['ip'] = html_escape(item[0])
      geo = item[1]
      if geo is None:
        host_namespace['geo'] = ''
        host_namespace['geo'] = ' '.join(geo).upper()
      if first:
        yield str(host)
        first = False
        yield str(hostmore)

  page_namespace['peer_count'] = count_direct
  page_namespace['mesh_count'] = count_total
  yield str(page_foot)

def sks_peers_init(skip_nodechecker=False):
  """Init global state, creating threads as needed.

  On first call, returns an object which has a kill() method, to try to shut
  things down cleanly.  On subsequent calls, returns None.

  Normally we will only ever be called once, but if we're being served as an
  app so that we're launched on-demand then we'll be called via application();
  if we are set to multi-threaded, then there can be concurrent calls to
  application() and thus a race.  Break the race with a lock.

  It's okay to create a lock at top-level, but not threads.
  global startup_lock, sks_data, sks_data_lock, wsgi_selector
  with startup_lock:
    if sks_data_lock is not None:
      return None
    sks_data_lock = threading.Lock()
    dg_shutdown = threading.Event()
    gatherer = DataGatherer(dg_shutdown, skip_nodechecker)
    s = selector.Selector()
    url_handlers = []
    def _add_handler(url, func):
      s.add(url, GET=func)
    # No matter how I hook in, when using wsgiref.simple_server.make_server I
    # need '/' but with mod_wsgi it's ''.
    _add_handler('', handle_peers_page)
    _add_handler('/', handle_peers_page)
    _add_handler('/ip-valid', handle_ip_valid)
    _add_handler('/helpz', get_handler_helpz(url_handlers))
    _add_handler('/threadz', handle_threadz)
    _add_handler('/environz', handle_environz)
    _add_handler('/rescanz', handle_rescanz)
    _add_handler('/internalz', handle_internalz)
    s.status404 = return_error
    wsgi_selector = s
    syslog.openlog(kSYSLOG_NAME, syslog.LOG_PID, kSYSLOG_FACILITY)
    signal.signal(signal.SIGUSR1, act_reload_queue)
    log('Initialised (uid=%d)', os.getuid())
    return gatherer

def return_error(environ, start_response,
    summary='Page not found',
    error='The URL which you entered is invalid',
  page_namespace = gen_page_namespace(
  error_page = Cheetah.Template.Template(kPAGE_TEMPLATE_BADUSER, searchList=[page_namespace])
  start_response('%s nope' % code, [
    ('Content-Type', 'text/html; charset=UTF-8'),
  yield str(error_page)
  raise StopIteration()

def get_handler_helpz(registered):
  def _handle_helpz(environ, start_response):
    start_response('200 OK', [
      ('Content-Type', 'text/html; charset=UTF-8'),
    prefix = environ['SCRIPT_NAME']
    if prefix:
      end = prefix.rfind('/helpz')
      if end >= 0:
        prefix = prefix[:end]
        prefix = ''
      prefix = ''
    for x in sorted(registered):
      if not x:
      url = html_escape(prefix + x)
      yield '<li><a href="%s">%s</a></li>' % (url, url)
    yield '</ul></body></html>\n'
  return _handle_helpz

def handle_threadz(environ, start_response):
  start_response('200 OK', [
    ('Content-Type', 'text/plain; charset=UTF-8'),
  yield 'Threads active:\n\n'
  for t in sorted(threading.enumerate()):
    aliveness = not t.is_alive() and ' DEAD' or ''
    ident = t.ident and ' [%d]' % t.ident or ''
    daemonic = not t.daemon and ' <non-daemon>' or ''
    yield '%s%s%s%s\n' % (t.name, aliveness, ident, daemonic)
  yield '.\n'

def handle_environz(environ, start_response):
  start_response('200 OK', [
    ('Content-Type', 'text/plain; charset=UTF-8'),
  for pair in sorted(environ.items()):
    yield '%s = {%s}\n' % pair
  yield '.\n'

def handle_rescanz(environ, start_response):
  global sks_data, sks_data_lock
  start_response('200 OK', [
    ('Content-Type', 'text/plain; charset=UTF-8'),
  if act_reload_queue(note='rescanz hit'):
    yield 'rescan triggered\n'
    yield 'rescan not triggered\n'
    with sks_data_lock:
      d = sks_data
    if d is None:
      yield 'First scan not completed, have no sks_data\n'
    elif 'rescan_triggered' in d:
      timediff = time.time() - d['rescan_triggered']
      yield 'Rescan in progress, triggered %.0f seconds ago\n' % timediff
      yield 'Reason unknown\n'

def handle_internalz(environ, start_response):
  start_response('200 OK', [
    ('Content-Type', 'text/plain; charset=UTF-8'),
  def _report_obj(depth, d, name):
    prefix = '  ' * depth + '* '
    results = []
      for i in sorted(d.keys()):
        n = name and name + '[' + i + ']' or i
          if isinstance(d[i], dict) and not i.startswith('__'):
            results.append(prefix + n + ':\n')
            results.extend(_report_obj(depth + 1, d[i], n))
          elif isinstance(d[i], (set, list)) and len(d[i]) > 1:
            results.append(prefix + n + ':\n')
            indent = ' ' * (len(prefix) + 2)
            # textwrap drops trailing newline, even if in input data
            results.append(textwrap.fill(' '.join(map(str, d[i])),
                initial_indent=indent, subsequent_indent=indent) + '\n')
            results.append(prefix + n + ': ' + repr(d[i]) + '\n')
            if isinstance(d[i], object) and (
                d[i].__class__.__module__ == '__main__' or
              results.extend(_report_obj(depth + 1, d[i].__dict__, n))
            elif isinstance(d[i], type) and d[i].__module__ == '__main__':
              results.extend(_report_obj(depth + 1,
                dict([(x, getattr(d[i], x)) for x in dir(d[i]) if not x.startswith('__')]),
        except Exception, e:
          results.append('Walking dict %s failed: %s\n' % (n, e))
    except Exception, e:
      results.append('Iter dict failed %s\n' % name)
    return results
  g = globals()
  return [x for x in _report_obj(1, g, '') if isinstance(x, str)]

def handle_ip_valid(environ, start_response):
  """Emit a list of IPs, first line status/control, last line '.'"""
  global sks_data, sks_data_lock

  with sks_data_lock:
    data = sks_data

  start_response('200 OK', [
    ('Content-Type', 'text/plain; charset=UTF-8'),

  if data is None or 'node_data' not in data or not len(data['node_data']):
    yield 'IP-Gen/1: status=INVALID count=0 reason=first_scan\n.\n'
    raise StopIteration()

  ips_all = {}
  for name, node in data['node_data'].iteritems():
      ip_geo_list = data['addresses'][name]
      if node and str(node.version) != '1.0.10' and int(node.keycount) > 1:
        for ip, geo in ip_geo_list:
          ips_all[ip] = int(node.keycount)

  # We want to discard statistic-distorting outliers, then of what remains,
  # discard those too far away from "normal", but we really want the "best"
  # servers to be our guide, so 1 std-dev of the second-highest remaining
  # value should be safe; in fact, we'll hardcode a limit of how far below.
  # To discard, find mode size (knowing that value can be split across two
  # buckets) and discard more than five stddevs from mode.  The bucketing
  # should be larger than the distance from desired value so that the mode
  # is only split across two buckets, if we assume enough servers that a
  # small number will be down, most will be valid-if-large-enough, so that
  # splitting the count across two buckets won't let the third-best value win
  buckets = {}
  for ip, count in ips_all.iteritems():
    bucket = int(count // 3000)
    if bucket not in buckets:
      buckets[bucket] = []
  largest_bucket = max(buckets)
  first_n = len(buckets[largest_bucket])
  first_mean = sum(buckets[largest_bucket]) / first_n
  first_sd = sqrt(sum((x-first_mean)**2 for x in buckets[largest_bucket]) / first_n)
  first_bounds = (int(first_mean - 5*first_sd), int(first_mean + 5*first_sd))

  first_ips_list = filter(lambda x: first_bounds[0] <= ips_all[x] <= first_bounds[1], ips_all)
  first_ips = dict([(x, ips_all[x]) for x in first_ips_list])
  second_mean = sum(first_ips.values()) / len(first_ips)

  if second_mean < kIPLIST_SANITY_MIN:
    yield 'IP-Gen/1: status=INVALID count=0 reason=broken_data\n.\n'
    raise StopIteration()

  threshold = sorted(first_ips.values())[-2] - kIPLIST_NEED_THRESHOLD
  ips = [x for x in first_ips if first_ips[x] >= threshold]

  count = len(ips)
  log('ip-valid: Yielding %d of %d values', count, len(ips_all))
  yield 'IP-Gen/1: status=COMPLETE count=%d tags=skip_1010\n' % count
  yield '\n'.join(ips)
  yield '\n.\n'

def application(environ, start_response):
  """The main entry point for serving as an application.

  WSGI interface.

  Will init global context if not already init'd.
  global sks_data_lock, wsgi_selector

  if sks_data_lock is None:
    # We're embedded in a web-server and need to bootstrap

  if wsgi_selector is None:
    return return_error(environ, start_response,
        'Internal problem', 'Failed to initialise correctly, please report this', '500')

  i_am = environ.get('SCRIPT_NAME', None)
  if i_am is not None:
    t = environ.get('REQUEST_URI','').replace(i_am, '', 1)
    if not len(t):
      t = '/'
    environ['REQUEST_URI'] = t
  return wsgi_selector(environ, start_response)

def main(argv=None):
  """Entry point when invoked as a normal binary."""
  if argv is None:
    argv = []
  if len(argv) and argv[0] == 'standalone':
    from wsgiref.simple_server import make_server

  if len(argv) and argv[0] == 'nodata':
    shutdown = sks_peers_init(skip_nodechecker=True)
    shutdown = sks_peers_init()
  if shutdown is None:
    raise InternalUsageError('init failed to return a shutdown object')

  if 'make_server' in locals():
    server_host = 'localhost'
    port = 8080

    if len(argv) >= 1:
      if argv[0].startswith(':'):
        argv.insert(0, 'localhost')
        argv[1] = argv[1][1:]
        server_host = argv[0]

      if len(argv) >= 2:
        port = int(argv[1])
      if port < 2 or port > 65535:
        raise OverflowError('port out of range')
    except ValueError, e:
      print >>sys.stderr, 'Unable to parse as valid port: %s' % argv[1]
    except OverflowError, e:
      print >>sys.stderr, 'Port number %d out of range, resetting' % port
      port = 8080

    global wsgi_selector
    server = make_server(server_host, port, wsgi_selector)
    print >>sys.stderr, 'Starting up webserver listening on {%s} %d' % (
        server_host, port)
    except KeyboardInterrupt, e:
      print >>sys.stderr, 'Received keyboard interrupt, shutting down'
      if not shutdown.kill():
        print >>sys.stderr, 'Shutdown failed, aargh.'

    import wsgiref.handlers
    global sks_data, sks_data_lock
    scanning = True
    while scanning:
      with sks_data_lock:
        _sks_data = sks_data
      if _sks_data is None:
        scanning = False
    if shutdown.kill():
      return 0
    return 1

if __name__ == '__main__':

# vim: set filetype=python sw=2 expandtab :

