On 2009-03-24 at 11:57 -0400, Daniel Kahn Gillmor wrote:
> Are these scripts published? (this is out of curiosity more than
> anything else)

Well, since it's just curiosity, I'll pipe in with something roughly
equivalent to the IP generation side:

  http://sks.spodhuis.org/sks-peers/ip-valid

No DNS integration, but it provides the data in a format which should be
easy to script into DNS zonefiles.  The "no more than 500 behind" value
is chosen to match that of sks-keyservers.net, but the reference value
is derived with some fast and loose stats, instead of being the local
value.  Search for "def handle_ip_valid" in the script.

This is generated by the attached Python 2.6 script, launched from
Apache/mod_wsgi with:
  <IfModule mod_wsgi.c>
    WSGIDaemonProcess   sks.spodhuis.org user=sks display-name=%{GROUP}
    WSGIProcessGroup    sks.spodhuis.org

    WSGIScriptAlias     /sks-peers 
"/www/sites/sks.spodhuis.org/wsgi/sks_peers.py"
  </IfModule>

This is a script/module rooted at 'sks-peers'; you'll want to change the
first few kCONSTANTS in the script for local values -- especially to
grant yourself, not me, privileged access to the tool (to see
internals).  /sks-peers/helpz will give you links to the various admin
URLs; they're read-only views of the data, except for rescanz.

You can invoke it without mod_wsgi, just invoke as:
  ./sks_peers.py standalone
and look at http://localhost:8080/ip-valid instead.

I think the Python 2.6-isms are fairly few, but they're there
(numbers.Integral, etc).  You need some non-standard Python modules, all
in easy reach with easy_install; they're the secnd batch of imports and
the only one whose name isn't just the import's top-level module name
has a comment indicating the name.

The script will then on first retrieval go gather data and report back
"no data yet"; it's persistent past the first retrieval, setting up a
data gathering thread to periodically re-poll, and reloading when
the 'membership' file changes.

After about 30 seconds (when the bad server URL retrievals time out)
you'll have data.  /sks-peers is a table view in HTML with links, etc.
It's ugly as I'm not a UI designer.  /sks-peers/ip-valid reports data in
a format that should be fairly comprehensible:
  status line, including a count
  one IP address per line; number of lines given by count in first line
  a line containing just "."

Working output starts:
  IP-Gen/1: status=COMPLETE count=29 tags=skip_1010
and non-working might be:
  IP-Gen/1: status=INVALID count=0 reason=first_scan

Feel free to adapt for your own use (but credit nice); no warranties,
etc etc.  At your own risk.

-Phil
#!/usr/local/bin/python2.6
#
# $HeadURL: https://svn.spodhuis.org/ksvn/websites/trunk/sks.spodhuis.org/wsgi/sks_peers.py $
# $Id: sks_peers.py 134 2009-03-24 21:32:47Z XXX $
#
# Author: Phil Pennock
# Copyright 2009; use/modify/copy freely with attribution.
# No warranties whatsoever; only use if you're able to assess this code.
#
"""
WSGI application to server up a table of peers of this server.
Main entry point is 'application', if 'main' invoked then can choose
to start a standalone webserver.
"""

import gc
from math import sqrt
import numbers
import os
import Queue
import random
import re
import signal
import socket
import sys
import syslog
import textwrap  # purely for /internalz
import threading
import time
import urllib2

from BeautifulSoup import BeautifulSoup
import Cheetah.Template
import dns.resolver     # dnspython
import ipaddr
import selector

sks_data = None
sks_data_lock = None
startup_lock = threading.Lock()  # see sks_peers_init() for rationale
wsgi_selector = None

kPRIVILEGED_ACCESS = ('94.142.240.6',
                      '94.142.241.88/29',
                      '2a02:898:0:30::31:1',
                      '2a02:898:31::/48')
kHOSTNAME = 'sks.spodhuis.org'
kHOSTNAME_ALT = 'sks-peer.spodhuis.org'
kMAINT_EMAIL = 'webmas...@spodhuis.org'
kSKS_MEMBERSHIP = '/var/sks/membership'
kRESOLVE_THREADS = 20
kSKS_POLL_THREADS = 20
kRECON_PORT = 11370
kHKP_PORT = 11371
kSTATS_FETCH_TIMEOUT = 30
kCOUNTRIES_ZONE = 'zz.countries.nerd.dk.'
kIPLIST_SANITY_MIN = 2600000
kIPLIST_NEED_THRESHOLD = 500  # use the sks-keyservers.net value
kINTER_SCAN_INTERVAL_SECS = 6 * 3600
kINTER_SCAN_INTERVAL_JITTER = 120
kSYSLOG_NAME = 'sks-peers'
kSYSLOG_FACILITY = syslog.LOG_DAEMON
kSKIP_ENTRIES = ('localhost', '127.0.0.1', '::1')

# This one may *not* contain any variables or directives as it may be used
# by quick & dirty handlers, rather than the template system
kPAGE_TEMPLATE_BASIC_HEAD = \
"""<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
<html>
 <head>
  <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
  <meta http-equiv="Content-Script-Type" content="application/ecmascript; charset=UTF-8">
  <meta http-equiv="Content-Style-Type" content="text/css; charset=UTF-8">
  <meta http-equiv="imagetoolbar" content="no"> <!-- MSIE control -->
  <link rel="stylesheet" href="/style/spodhuis-sks.css" type="text/css" charset="utf-8">
  <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
"""

kPAGE_TEMPLATE_BADUSER = kPAGE_TEMPLATE_BASIC_HEAD + """#slurp
  <link rev="made" href="mailto:$maintainer">
  <title>$summary</title>
 </head>
 <body>
  <h1>$summary</h1>
  <div class="usererror">$error</div>
 </body>
</html>
"""

kPAGE_TEMPLATE_HEAD = kPAGE_TEMPLATE_BASIC_HEAD + """#slurp
  <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
  <link rev="made" href="mailto:$maintainer">
  <title>$my_hostname Peer Mesh</title>
 </head>
 <body>
  <h1>$my_hostname Peer Mesh</h1>
$warning
$scanning_active
  <div>
   Entries at depth 0 are direct peers.  Others are seen by spidering the peers.
  </div>
  <table class="sks peertable">
   <thead><tr><th>Host</th><th>IP</th><th>Geocoding</th><th>Mutual</th><th>Version</th><th>Keys</th><th>Distance</th></tr></thead>
   <tbody>
"""

kPAGE_TEMPLATE_FOOT = """#slurp
   <caption>SKS has $peer_count peers of $mesh_count visible</caption>
  </table>
 </body>
</html>
"""

kPAGE_TEMPLATE_HOST = """#slurp
   <tr class="peer host $rowclass">
    <td class="hostname"$rowspan>$html_link$host_aliases_text</td>
    <td class="ipaddr">$ip</td>
    <td class="location">$geo</td>
    <td class="mutual"$rowspan>$mutual</td>
    <td class="version"$rowspan>$version</td>
    <td class="keys"$rowspan>$keycount</td>
    <td class="peer_distance"$rowspan>$distance</td>
   </tr>
"""

kPAGE_TEMPLATE_HOSTERR = """#slurp
   <tr class="peer host failure $rowclass">
    <td class="hostname">$hostname</td>
    <td class="exception" colspan="6">Error: $error</td>
   </tr>
"""

kPAGE_TEMPLATE_HOSTMORE = """#slurp
   <tr class="peer more">
    <td class="ipaddr">$ip</td><td class="location">$geo</td>
   </tr>
"""


class Error(Exception):
  """Generic top-level error exception for this tool."""
  pass


class InternalUsageError(Error):
  """We screwed the pooch so badly, we don't even call ourselves right."""
  pass


def debug(msg='?', *kwargs):
  syslog.syslog(syslog.LOG_DEBUG, msg % kwargs)

def log(msg='?', *kwargs):
  syslog.syslog(syslog.LOG_INFO, msg % kwargs)

def suicide_delayed(delay=3):
  def kill_me():
    time.sleep(delay)
    os.kill(os.getpid(), signal.SIGTERM)
  threading.Thread(target=kill_me).start()

# signal handler params must come first
def act_reload_queue(signal=None, frame=None, note=None):
  global sks_data_lock, sks_data
  with sks_data_lock:
    if sks_data is None:
      return False
    if 'gatherer' not in sks_data:
      return False
    g = sks_data['gatherer']
  if note is None and signal is not None:
    note = 'Signal %d received' % signal
  return g.rescan(note)

def html_escape(s):
  """HTML Entity escaping."""
  return str(s).replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')

def html_interp(*params):
  """HTML Entity escape each param, returning results as single tuple.

  Useful for interpolation by providing this as single % param.
  """
  return tuple(map(html_escape, params))

def wrap_check_privileged(handler):
  """WSGI method decorator, check authorisation to access."""
  def _wsgi_hook(environ, start_response):
    ip = environ.get('REMOTE_ADDR', None)
    if ip:
      ip = ipaddr.IP(ip)
    else:
      return return_error(environ, start_response)
    # Only IPv4 objects have .IsLoopback() -- *sigh* factories returning
    # objects with inconsistent APIs
    if hasattr(ip, 'IsLoopback'):
      if ip.IsLoopback():
        return handler(environ, start_response)
    elif isinstance(ip, ipaddr.IPv6):
      loop = ipaddr.IP('::1')
      if ip == loop:
        return handler(environ, start_response)
    for block in kPRIVILEGED_ACCESS:
      net = ipaddr.IP(block)
      if net.Contains(ip):
        return handler(environ, start_response)
    return return_error(environ, start_response)
    pass
  return _wsgi_hook

def _sort_hostnames(iterable):
  """Sort the iterable parameter as hostnames, least-specific label to most."""
  keyed = []
  for entry in iterable:
    key = entry.split('.')
    key.reverse()
    keyed.append( (key, entry) )

  def _cmp_keyed(a, b):
    return cmp(a[0], b[0]) or cmp(a[1], b[1])

  for entry in sorted(keyed, cmp=_cmp_keyed):
    yield entry[1]


class AddressInfo(object):
  _all_lock = threading.Lock()
  _all_ai = []
  NEW_ITEM = 1
  NEW_NAME = 2
  OLD_ITEM = 3

  @classmethod
  def find_address_info(cls, obj, insert=False):
    newips = [x[0] for x in obj]
    #debug('AI.fai(%s): have IPs: %s', obj.hostname, ' '.join(newips))
    with cls._all_lock:
      for ai in cls._all_ai:
        for ip in newips:
          if ip in ai.addresses:
            return ai
      if insert:
        cls._all_ai.append(obj)
    return None

  @classmethod
  def register_address_info(cls, obj):
    existing = cls.find_address_info(obj, insert=True)
    if not existing:
      return cls.NEW_ITEM
    added_name = existing.add_alt_name(obj.hostname)
    if added_name:
      return cls.NEW_NAME
    return cls.OLD_ITEM

  """Information about the IP addressing of an SKS peer."""
  def __init__(self, hostname, walk_state, walker_cb=None, **kwargs):
    self.hostname = hostname
    self.portstr = '%d' % kRECON_PORT
    self._walk_state = walk_state
    self._walker_cb = walker_cb
    self.addresses = list()
    self.alt_names = list()
    self.geography = dict()
    #debug('AddressInfo(%s): created', hostname)
    self.update_lock = threading.Lock()
    with self.update_lock:
      self.resolve_ips()
      self.resolve_geographies()
# resolve geography first, to populate the dict, to keep iter sane below
      status = self.__class__.register_address_info(self)
      self._walker_cb(self._walk_state, (self.hostname, self, status))

  @classmethod
  def _resolve_txt(cls, searchname):
    answers = dns.resolver.query(searchname, 'TXT')
    if not answers:
      return []
    res = []
    for rdata in answers:
      res.extend(rdata.strings)
    return res

  @classmethod
  def _resolve_ip_geography(cls, ip):
    # This sucks, but even googleip doesn't have a 'reverse' method
    # suitable for use with DNS
    if ip.find(':') != -1:
      return None
    # XXX: Assumes either IPv6 or IPv4
    search = ip.split('.')
    search.reverse()
    search = '.'.join(search) + '.' + kCOUNTRIES_ZONE
    return list(cls._resolve_txt(search))

  def add_alt_name(self, newname):
    with self.update_lock:
      if newname == self.hostname:
        return False
      if newname in self.alt_names:
        return False
      if newname in self.addresses:
        return False
      self.alt_names.append(newname)
      return True

  def resolve_ips(self):
    addrs = []
    for (family, socktype, proto, canonname, sockaddr) in socket.getaddrinfo(
        self.hostname, self.portstr,
        socket.AF_UNSPEC, socket.SOCK_STREAM, socket.IPPROTO_TCP,
        socket.AI_ADDRCONFIG | socket.AI_NUMERICSERV):
      (addr, port) = socket.getnameinfo(sockaddr,
          socket.NI_NUMERICHOST | socket.NI_NUMERICSERV)
      addrs.append(addr)
    self.addresses.extend(addrs)

  def resolve_geographies(self):
    for ip in self.addresses:
      self.geography[ip] = self._resolve_ip_geography(ip)

  def __repr__(self):
    return 'AddressInfo(%s)' % self.hostname

  def __len__(self):
    return len(self.addresses)

  def __iter__(self):
    for ip in self.addresses:
      yield (ip, self.geography[ip])

  def iter_all(self):
    yield self.hostname
    for x in self.alt_names:
      yield x
    for x in self.addresses:
      yield x


class SksNode(object):
  """Information retrieved from an SKS peer."""
  def __init__(self, hostname, walk_state, walker_cb=None, **kwargs):
    self.hostname = hostname
    self.portstr = '%d' % kHKP_PORT
    self._url = 'http://%s:%s/pks/lookup?op=stats' % (self.hostname, self.portstr)
    self.exception = None
    self._walk_state = walk_state
    self.distance = isinstance(walk_state, numbers.Integral) and str(walk_state) or '-1'
    self._walker_cb = walker_cb
    #debug('SksNode(%s): created', hostname)
    self.fetch_page()
    self.analyse()

  def fetch_page(self):
    try:
      self._data = urllib2.urlopen(self._url, timeout=kSTATS_FETCH_TIMEOUT).read()
    # urllib2 has URLError and HTTPError, subclassing from IOError; however,
    # it uses httplib internally which tends to throw BadStatusLine; all of
    # httplib's exceptions subclass from a local HTTPException class
    except (IOError, urllib2.httplib.HTTPException), e:
      self.exception = e

  def _table_following(self, search):
    s = self._soup.find(text=search).parent.nextSibling
    # straight s.name fails for text as it has no name attribute
    while getattr(s, 'name', '') != u'table':
      s = s.nextSibling
    return s

  def _plain_rows_of(self, search):
    table = self._table_following(search)
    return set([str(x.contents[0].strip()) for x in table.findAll('td')])

  def _dict_from_plain_rows(self, search):
    table = self._table_following(search)
    return dict([x.contents[0].strip().split(None, 2) for x in table.findAll('td')])

  def _kvdict_from_table(self, search):
    table = self._table_following(search)
    d = dict()
    for r in table.findAll('tr'):
      pair = [x.contents[0].strip() for x in r.findAll('td')]
      d[pair[0].rstrip(':')] = pair[1]
    return d

  def analyse(self):
    if self.exception is not None:
      return
    try:
      self._soup = BeautifulSoup(self._data)
    except Exception, e:
      self.exception = e
      return
    self.peers = self._dict_from_plain_rows(u'Gossip Peers')
    if self._walker_cb:
      for peer in self.peers:
        if peer in kSKIP_ENTRIES:
          #debug('Analyse(%s): skipping blacklisted item {%s}', self.hostname, peer)
          pass
        else:
          self._walker_cb(self._walk_state, str(peer))
    else:
      debug('Analyse(%s): no walker', self.hostname)
    self.mutual = False
    for me in (kHOSTNAME, kHOSTNAME_ALT):
      try:
        if me in self.peers and int(self.peers[me]) == kRECON_PORT:
          self.mutual = True
      except ValueError, e:
        pass
    self.mailsync = self._plain_rows_of(u'Outgoing Mailsync Peers')
    self._settings = self._kvdict_from_table(u'Settings')
    # settings should have: Hostname Version 'HTTP port' 'Recon port' 'Debug level'
    self.version = self._settings['Version']
    try:
      t = str(self._soup.find(text=u'Statistics').parent.nextSibling.string)
      if t.startswith('Total number of keys'):
        self.keycount = int(t.split(':')[1])
    except AttributeError, e:
      self.keycount = -1
    del self._data, self._soup

  def __nonzero__(self):
    if self.exception:
      return False
    return True

  def __str__(self):
    if self.exception:
      return str(self.exception)
    try:
      return str(self._settings['Hostname'])
    except Exception, e:
      return self.hostname


class PoolWorkerFactory(object):
  """Make generic pool workers."""

  @classmethod
  def make(cls, name, worker_class):
    """Our factory entry point.  Generate a PoolWorker."""

    def run_init(self, index, inq, outq, walker_cb):
      """PoolWorker __init__."""
      threading.Thread.__init__(self)
      self.name = '%s[%d]' % (name, index)
      self.daemon = True
      self._in_queue = inq
      self._out_queue = outq
      self._walker_cb = walker_cb

    def do_worker(self, input_data, depth):
      """Invoke the wrapper class's constructor for the current data."""
      try:
        return worker_class(input_data, walk_state=depth, walker_cb=self._walker_cb)
      except Exception, e:
        return e

    def do_run(self):
      """The thread run() method for the constructed classes."""
      while True:
        depth, data = self._in_queue.get()
        self._out_queue.put( (data, self._do_worker(data, depth=depth)) )
        self._in_queue.task_done()

    d = dict()
    d['__init__'] = run_init
    d['_do_worker'] = do_worker
    d['run'] = do_run
    return type(name, (threading.Thread,), d)


class PoolWorkManager(object):
  """Run workers asynchronously, emitting results in a queue.

  Eg, with ThreadResolve, we create the resolver, pass it hostnames,
  pull out tuples of (hostname, AddressInfo) object in non-deterministic order.
  """

  # There's not much point to getting a shutdown Event too, as we can't have
  # timeouts in Queue obj.join() so no way to sensibly interrupt a collect.
  def __init__(self, worker_name, data_class, thread_count, walker_cb=None):
    if not isinstance(thread_count, numbers.Integral) or thread_count < 1:
      raise InternalUsageError("Not a positive integer: %s" % str(thread_count))
    worker_class = PoolWorkerFactory.make(worker_name, data_class)

    self._in_queue = Queue.Queue()
    self._out_queue = Queue.Queue()
    self._threads = list()
    self._previous_results = {}
    for i in range(0, thread_count):
      t = worker_class(i, self._in_queue, self._out_queue, walker_cb)
      t.start()
      self._threads.append(t)

  def __len__(self):
    """Number of results still to be returned.  Includes pending."""
    return self._in_queue.qsize() + self._out_queue.qsize()

  def add_depthed(self, depth, query):
    self._in_queue.put((depth, query))

  def add(self, query):
    self.add_depthed(0, query)

  def join(self):
    self._in_queue.join()

  def get(self):
    res = self._out_queue.get()
    self._out_queue.task_done()
    return res

  def collect(self):
    results = self._previous_results
    self.join()
    while self:
      (k, d) = self.get()
      results[k] = d
    self._previous_results = results
    return results


class RecursingWalkManager(threading.Thread):
  """Handle requests to see more data."""

  def __init__(self):
    threading.Thread.__init__(self)
    self.name = 'RecursingWalkManager'
    self.daemon = True
    self._queue = Queue.Queue()
    self._managers = None
    self._have_managers = threading.Semaphore(0)
    self.okay_next_time = set()
    self.seen = { kHOSTNAME: 0, kHOSTNAME_ALT: 0 }  # host -> depth
    self.pending = dict()  # host -> depth
    self._hosts_lock = threading.RLock()

  def __len__(self):
    return self._queue.qsize()

  def set_managers(self, manager_dns, manager_node):
    """Sets the manager objects for handling work dispatch."""
    self._manager_dns = manager_dns
    self._manager_node = manager_node
    self._have_managers.release()

  def add_dns(self, depth, entry):
    self._queue.put(('DNS', depth, entry))

  def add_node(self, depth, entry):
    self._queue.put(('NODE', depth, entry))

  def set_seen(self, name, depth=0):
    """We have seen an entry with this name, at this depth."""
    with self._hosts_lock:
      if name not in self.seen:
        self.seen[name] = depth

  def _requeue_node(self, node, ai):
    """Shuffle node back from pending to try to reprocess (we have DNS)."""
    with self._hosts_lock:
      if node in self.pending:
        depth = self.pending[node]
        del self.pending[node]
        depth -= 1  # already incremented, will be re-incremented
        self._queue.put(('NODE+DNS', depth, (node, ai)))
        return True
      return False

  def _discard_node(self, node):
    """We have DNS and this is a dup!"""
    with self._hosts_lock:
      if node in self.pending:
        del self.pending[node]
      return True

  def _handle_dns_inthread(self, depth, entry):
    host, ai, status = entry
    with self._hosts_lock:
      if status == AddressInfo.NEW_ITEM:
        #debug('Walk: Seen DNS item for first time: %s', host)
        if not self._requeue_node(host, ai):
          self.okay_next_time.add(host)
      elif host in self.okay_next_time:
        #debug('Walk: Not first time seen DNS, but first time acting on it: %s', host)
        self._requeue_node(host, ai)
        self.okay_next_time.discard(host)
      else:
        if host == ai.hostname:
          #debug('Walk: skipping previously seen DNS: %s', host)
          pass
        else:
          #debug('Walk: discarding dup DNS: %s (%s)', host, ' '.join(ai.iter_all()))
          pass
        self._discard_node(host)
        for n in ai.iter_all():
          if n not in self.seen or (depth < self.seen[n] and depth >= 0):
            self.seen[n] = depth

  def _handle_node_inthread(self, depth, entry, ai=None):
    if entry in kSKIP_ENTRIES:
      return
    # original caller passes in original depth, or the requeue decr's
    # before passing back in to compensate
    depth += 1
    with self._hosts_lock:
      if entry in self.seen:
        #debug('Walk: already [%s]', entry)
        if depth < self.seen[entry] and depth >= 0:
          self.seen[entry] = depth
        return False
      if ai is None:
        if entry not in self.pending:
          #debug('Walk: deferring until have DNS [%s]', entry)
          self.pending[entry] = depth
          self._manager_dns.add_depthed(depth, entry)
        else:
          #debug('Walk: already deferred [%s]', entry)
          pass
        return False
      for v in ai.iter_all():
        if v in self.seen:
          #debug('Walk: already [%s] under name [%s]', entry, v)
          return False
        if v in kSKIP_ENTRIES:
          return False
      self.seen[entry] = depth
      for v in ai.iter_all():
        self.seen[v] = depth
      #debug('Walk: new %s', entry)
    self._manager_node.add_depthed(depth, entry)
    return True

  def run(self):
    self._have_managers.acquire()
    while True:
      try:
        what, depth, entry = self._queue.get()
        #debug('Walk: [%s] [%d] [%s]', what, depth, repr(entry))
        if what == 'DNS':
          self._handle_dns_inthread(depth, entry)
          continue
        if what == 'NODE':
          self._handle_node_inthread(depth, entry)
          continue
        if what == 'NODE+DNS':
          self._handle_node_inthread(depth, entry[0], entry[1])
          continue
        raise InternalUsageError('Unknown queue tag type "%s"' % what)
      finally:
        self._queue.task_done()


class DataGatherer(threading.Thread):
  """Master persistent thread responsible for keeping data current.

  This controls all data gathering threads and sleeps between collections.
  """
  def __init__(self, notify_ev, skip_nodechecker=False):
    threading.Thread.__init__(self)
    self._notify_ev = notify_ev
    self._skip_nodechecker = skip_nodechecker
    self._cmd_queue = Queue.Queue()
    self.name = 'DataGatherer'
    self.daemon = False

  def _get_cmd(self):
    if not self._notify_ev.is_set():
      return None
    self._notify_ev.clear()
    try:
      cmd = self._cmd_queue.get(block=False)
    except Queue.Empty:
      log('ProgError: notify with empty queue')
      return 'kill'
    if cmd in ('kill', 'scan'):
      return cmd
    raise InternalUsageError('Unknown DG queue cmd {%s}' % cmd)

  def _run(self):
    """The data gathering function.  Does not return until shutdown.

    This manages collection threads, puts the data together in a top-level
    container and replaces the global object with that one, under a lock,
    before sleeping to re-do this.
    """
    global sks_data, sks_data_lock

    walker = RecursingWalkManager()
    resolver = PoolWorkManager(
        'Resolve', AddressInfo, kRESOLVE_THREADS,
        walker_cb=walker.add_dns)
    node_checker = PoolWorkManager(
        'SksNodeCheck', SksNode, kSKS_POLL_THREADS,
        walker_cb=walker.add_node)
    walker.set_managers(resolver, node_checker)
    walker.start()

    peer_re = re.compile('^([A-Za-z0-9]\S+)\s+\d')
    while True:
      with sks_data_lock:
        if sks_data is not None:
          sks_data['updating'] = time.time()
      log('DG: Gathering data')

      start_over = False
      hostnames = set()
      with open(kSKS_MEMBERSHIP) as fn:
        config_stat = os.fstat(fn.fileno())
        for line in fn:
          m = peer_re.match(line)
          if not m:
            continue
          hostnames.add(m.group(1))
      self.hostnames = hostnames
      for h in hostnames:
        walker.set_seen(h)
        resolver.add(h)
        if not self._skip_nodechecker:
          node_checker.add(h)

      sizes = {}
      new_data = {}
      keep_checking = True
      final_safety = False
      while keep_checking:
        keep_checking = False
        for (target, pooled) in [
            (None, None),
# DNS is fast; we can finish collecting DNS after a subset, stalling on
# the URL retrievals, then continue to walk further on and have more DNS
# to do.  This is a change in integrity semantics with the introduction of
# walking.
            ('node_data', node_checker),
            ('addresses', resolver),
            ]:
          if target is not None:
            new_data[target] = pooled.collect()
            old_size = sizes.get(target, 0)
            new_size = len(new_data[target])
            if new_size != old_size:
              sizes[target] = new_size
              keep_checking = True

          cmd = self._get_cmd()
          if cmd == 'kill':
            log('DG: termination request received (during scan), returning')
            return
          if cmd == 'scan':
            start_over = True
            break
        if len(walker) or len(node_checker) or len(resolver):
          keep_checking = True
# If things completed before that len check, then we'd be incomplete
# If we check "one more time" and find differences, then we need to redo
# But if we saw empty and that last "one more time" saw no changes, then
# we've shown that in the time between seeing no changes and the time
# when the queue was empty, there were no changes because the next time
# around there were no changes either.  So we've covered that interval.
        if keep_checking:
          final_safety = False
        elif not final_safety:
          keep_checking = True
          final_safety = True
# I *think* the above is correct but should look into formally proving it

      if start_over:
        log('DG: early termination of run, starting over')
        continue

      new_data.update({
        'mtime': config_stat.st_mtime,
        'gatherer': self,
        })
      with sks_data_lock:
        sks_data = new_data


      gc.collect()
      delay = float(random.randint(
          kINTER_SCAN_INTERVAL_SECS - kINTER_SCAN_INTERVAL_JITTER,
          kINTER_SCAN_INTERVAL_SECS + kINTER_SCAN_INTERVAL_JITTER))
      log('DG: Gathering data complete, sleeping %.0f seconds', delay)
      sleeping = True
      while sleeping:
        sleep_starting = time.time()
        self._notify_ev.wait(delay)

        slept = time.time() - sleep_starting
        if float(slept) > (delay - 1):
          sleeping = False
        else:
          delay -= float(slept)

        cmd = self._get_cmd()
        if cmd == 'kill':
          log('DG: termination request received, returning')
          return
        if cmd == 'scan':
          log('DG: received scan request, starting now')
          sleeping = False
          continue

  def run(self):
    """Wrapper around _run to collect exceptions safely."""
    global sks_data, sks_data_lock
    try:
      self._run()
    except Exception, e:
      with sks_data_lock:
        sks_data = { 'exception': e }

  def kill(self):
    """Returns true iff successfully killed."""
    self._cmd_queue.put('kill')
    self._notify_ev.set()
    self.join(3.0)
    return not self.is_alive()

  def rescan(self, reason=None):
    """Cut short the sleep, scan now."""
    global sks_data, sks_data_lock
    if reason is None:
      reason = 'no reason given for rescan'
    with sks_data_lock:
      if 'rescan_triggered' in sks_data:
        log('DG/rescan ignoring retrigger [%s]', reason)
        return False
      sks_data['rescan_triggered'] = time.time()
    log('DG/rescan asking for scan [%s]', reason)
    self._cmd_queue.put('scan')
    self._notify_ev.set()
    return True


def gen_page_namespace(**kwdict):
  """Return a page namespace dictionary with standard values filled in."""
  page_namespace = dict()
  #TODO: should really URL-encode maintainer too
  page_namespace['maintainer'] = html_escape(kMAINT_EMAIL)
  page_namespace['my_hostname'] = html_escape(kHOSTNAME)
  page_namespace.update(kwdict)
  return page_namespace


def handle_peers_page(environ, start_response):
  """Write the peers page.  WSGI."""
  global sks_data, sks_data_lock

  still_gathering = True
  try:
    with sks_data_lock:
      _sks_data = sks_data
    if _sks_data is None:
      addresses = {}
      node_data = {}
      last_mtime = 0.0
      gatherer = None
      updating_since = None
    elif 'addresses' in _sks_data:
      still_gathering = False
      addresses = _sks_data['addresses']
      node_data = _sks_data['node_data']
    elif 'exception' in _sks_data:
      data_gather_exception = _sks_data['exception']
      addresses = {}
    else:
      raise InternalUsageError("sks_data is not None but lacks addresses field")
    if _sks_data is not None:
      last_mtime = _sks_data.get('mtime', 0.0)
      gatherer = _sks_data.get('gatherer')
      updating_since = _sks_data.get('updating')
    del _sks_data

    page_namespace = gen_page_namespace()
    page_head = Cheetah.Template.Template(kPAGE_TEMPLATE_HEAD, searchList=[page_namespace])
    page_foot = Cheetah.Template.Template(kPAGE_TEMPLATE_FOOT, searchList=[page_namespace])

    if updating_since is None:
      page_namespace['scanning_active'] = ''
    else:
      scanning_for = time.time() - updating_since
      page_namespace['scanning_active'] = '<h3>Data gathering run in progress (for last %.0f seconds)' % scanning_for
    page_namespace['warning'] = still_gathering and '<h2>Still gathering data!</h2>' or ''
    if 'data_gather_exception' in locals():
      page_namespace['warning'] = '<h2>DATA GATHERING FAILED</h2><div>%s</div>' % html_escape(data_gather_exception)

    if gatherer is not None:
      if not gatherer.is_alive():
        page_namespace['warning'] += '<h2>DATA GATHERING THREAD DEAD</h2>'
        suicide_delayed()

    if addresses:
      config_stat = os.stat(kSKS_MEMBERSHIP)
      if config_stat.st_mtime != last_mtime:
        if gatherer.rescan('membership mtime update'):
          page_namespace['warning'] += '\n<h2>Config changed, will rescan in background</h2>'

    host_namespace = dict()
    host = Cheetah.Template.Template(kPAGE_TEMPLATE_HOST, searchList=[host_namespace, page_namespace])
    hostmore = Cheetah.Template.Template(kPAGE_TEMPLATE_HOSTMORE, searchList=[host_namespace, page_namespace])
    hosterr = Cheetah.Template.Template(kPAGE_TEMPLATE_HOSTERR, searchList=[host_namespace, page_namespace])
  except Exception, e:
    start_response('500 Error', [('Content-Type', 'text/plain; charset=UTF-8')], sys.exc_info())
    yield 'Bleh, we failed:\n\n'
    yield str(e)
    yield '\n'
    raise StopIteration()

  start_response('200 OK', [
    ('Content-Type', 'text/html; charset=UTF-8'),
    ])

  # PEP 333 is explicit that WSGI uses strings, not Unicode
  yield str(page_head)

  count_total = 0
  count_direct = 0
  ordered_entries = []
  by_depth = {}  # depth, hostname
  if node_data:
    for h, v in node_data.iteritems():
      if hasattr(v, 'distance'):
        depth = int(v.distance)
      else:
        depth = -1
      if depth not in by_depth:
        by_depth[depth] = []
      by_depth[depth].append(h)
    if -1 in by_depth:
      shuffle = by_depth[-1]
      del by_depth[-1]
      shuffle_to = sorted(by_depth.keys()).pop() + 1
      by_depth[shuffle_to] = shuffle
    for d in sorted(by_depth.keys()):
      ordered_entries.extend(_sort_hostnames(by_depth[d]))
    count_direct = len(by_depth[0])
  else:
    ordered_entries = _sort_hostnames(addresses.keys())
    count_direct = len(addresses)

  for h in ordered_entries:
    host_namespace.clear()
    host_namespace['hostname'] = html_escape(h)
    info = addresses.get(h, [('n/a', None)])
    sksnode = node_data.get(h, False)
    count_total += 1
    host_namespace['rowcount'] = count_total
    host_namespace['rowclass'] = count_total % 2 and 'odd' or 'even'

    if isinstance(info, Exception) or not info:
      host_namespace['error'] = str(info)
      yield str(hosterr)
      continue

    if len(info) > 1:
      host_namespace['rowspan'] = ' rowspan="%d"' % len(info)
    else:
      host_namespace['rowspan'] = ''
    sks_url = 'http://%s:11371/pks/lookup?op=stats' % h
    html_link = '<a href="%s">%s</a>' % html_interp(sks_url, h)
    host_namespace['sks_url'] = sks_url
    host_namespace['html_link'] = html_link

    if isinstance(sksnode, Exception) or not sksnode:
      host_namespace['mutual'] = 'Err'
      host_namespace['version'] = '?'
      host_namespace['keycount'] = '0'
      if hasattr(sksnode, 'distance'):
        host_namespace['distance'] = str(sksnode.distance)
      else:
        host_namespace['distance'] = '?'
    else:
      host_namespace['mutual'] = sksnode.mutual and 'Yes' or 'No'
      host_namespace['version'] = str(sksnode.version)
      host_namespace['keycount'] = str(sksnode.keycount)
      host_namespace['distance'] = str(sksnode.distance)

    if info.alt_names:
      host_namespace['host_aliases_text'] = \
          ' <span class="host_aliases">%s</span>' % ' '.join(info.alt_names)
    else:
      host_namespace['host_aliases_text'] = ''

    first = True
    for item in info:
      host_namespace['ip'] = html_escape(item[0])
      geo = item[1]
      if geo is None:
        host_namespace['geo'] = ''
      else:
        host_namespace['geo'] = ' '.join(geo).upper()
      if first:
        yield str(host)
        first = False
      else:
        yield str(hostmore)

  page_namespace['peer_count'] = count_direct
  page_namespace['mesh_count'] = count_total
  yield str(page_foot)


def sks_peers_init(skip_nodechecker=False):
  """Init global state, creating threads as needed.

  On first call, returns an object which has a kill() method, to try to shut
  things down cleanly.  On subsequent calls, returns None.

  Normally we will only ever be called once, but if we're being served as an
  app so that we're launched on-demand then we'll be called via application();
  if we are set to multi-threaded, then there can be concurrent calls to
  application() and thus a race.  Break the race with a lock.

  It's okay to create a lock at top-level, but not threads.
  """
  global startup_lock, sks_data, sks_data_lock, wsgi_selector
  with startup_lock:
    if sks_data_lock is not None:
      return None
    sks_data_lock = threading.Lock()
    dg_shutdown = threading.Event()
    gatherer = DataGatherer(dg_shutdown, skip_nodechecker)
    gatherer.start()
    s = selector.Selector()
    url_handlers = []
    def _add_handler(url, func):
      url_handlers.append(url)
      s.add(url, GET=func)
    # No matter how I hook in, when using wsgiref.simple_server.make_server I
    # need '/' but with mod_wsgi it's ''.
    _add_handler('', handle_peers_page)
    _add_handler('/', handle_peers_page)
    _add_handler('/ip-valid', handle_ip_valid)
    _add_handler('/helpz', get_handler_helpz(url_handlers))
    _add_handler('/threadz', handle_threadz)
    _add_handler('/environz', handle_environz)
    _add_handler('/rescanz', handle_rescanz)
    _add_handler('/internalz', handle_internalz)
    s.status404 = return_error
    wsgi_selector = s
    syslog.openlog(kSYSLOG_NAME, syslog.LOG_PID, kSYSLOG_FACILITY)
    signal.signal(signal.SIGUSR1, act_reload_queue)
    log('Initialised (uid=%d)', os.getuid())
    return gatherer


def return_error(environ, start_response,
    summary='Page not found',
    error='The URL which you entered is invalid',
    code='404'):
  page_namespace = gen_page_namespace(
      summary=html_escape(summary),
      error=html_escape(error))
  error_page = Cheetah.Template.Template(kPAGE_TEMPLATE_BADUSER, searchList=[page_namespace])
  start_response('%s nope' % code, [
    ('Content-Type', 'text/html; charset=UTF-8'),
    ])
  yield str(error_page)
  raise StopIteration()


def get_handler_helpz(registered):
  @wrap_check_privileged
  def _handle_helpz(environ, start_response):
    start_response('200 OK', [
      ('Content-Type', 'text/html; charset=UTF-8'),
      ])
    yield kPAGE_TEMPLATE_BASIC_HEAD + \
        '<title>/helpz</title></head><body><h1>/helpz</h1><ul>\n'
    prefix = environ['SCRIPT_NAME']
    if prefix:
      end = prefix.rfind('/helpz')
      if end >= 0:
        prefix = prefix[:end]
      else:
        prefix = ''
    else:
      prefix = ''
    for x in sorted(registered):
      if not x:
        continue
      url = html_escape(prefix + x)
      yield '<li><a href="%s">%s</a></li>' % (url, url)
    yield '</ul></body></html>\n'
  return _handle_helpz

@wrap_check_privileged
def handle_threadz(environ, start_response):
  start_response('200 OK', [
    ('Content-Type', 'text/plain; charset=UTF-8'),
    ])
  yield 'Threads active:\n\n'
  for t in sorted(threading.enumerate()):
    aliveness = not t.is_alive() and ' DEAD' or ''
    ident = t.ident and ' [%d]' % t.ident or ''
    daemonic = not t.daemon and ' <non-daemon>' or ''
    yield '%s%s%s%s\n' % (t.name, aliveness, ident, daemonic)
  yield '.\n'


@wrap_check_privileged
def handle_environz(environ, start_response):
  start_response('200 OK', [
    ('Content-Type', 'text/plain; charset=UTF-8'),
    ])
  for pair in sorted(environ.items()):
    yield '%s = {%s}\n' % pair
  yield '.\n'

@wrap_check_privileged
def handle_rescanz(environ, start_response):
  global sks_data, sks_data_lock
  start_response('200 OK', [
    ('Content-Type', 'text/plain; charset=UTF-8'),
    ])
  if act_reload_queue(note='rescanz hit'):
    yield 'rescan triggered\n'
  else:
    yield 'rescan not triggered\n'
    with sks_data_lock:
      d = sks_data
    if d is None:
      yield 'First scan not completed, have no sks_data\n'
    elif 'rescan_triggered' in d:
      timediff = time.time() - d['rescan_triggered']
      yield 'Rescan in progress, triggered %.0f seconds ago\n' % timediff
    else:
      yield 'Reason unknown\n'

@wrap_check_privileged
def handle_internalz(environ, start_response):
  start_response('200 OK', [
    ('Content-Type', 'text/plain; charset=UTF-8'),
    ])
  def _report_obj(depth, d, name):
    prefix = '  ' * depth + '* '
    results = []
    try:
      for i in sorted(d.keys()):
        n = name and name + '[' + i + ']' or i
        try:
          if isinstance(d[i], dict) and not i.startswith('__'):
            results.append(prefix + n + ':\n')
            results.extend(_report_obj(depth + 1, d[i], n))
          elif isinstance(d[i], (set, list)) and len(d[i]) > 1:
            results.append(prefix + n + ':\n')
            indent = ' ' * (len(prefix) + 2)
            # textwrap drops trailing newline, even if in input data
            results.append(textwrap.fill(' '.join(map(str, d[i])),
                width=110,
                initial_indent=indent, subsequent_indent=indent) + '\n')
          else:
            results.append(prefix + n + ': ' + repr(d[i]) + '\n')
            if isinstance(d[i], object) and (
                d[i].__class__.__module__ == '__main__' or
                d[i].__class__.__module__.startswith('_mod_wsgi_')):
              results.extend(_report_obj(depth + 1, d[i].__dict__, n))
            elif isinstance(d[i], type) and d[i].__module__ == '__main__':
              results.extend(_report_obj(depth + 1,
                dict([(x, getattr(d[i], x)) for x in dir(d[i]) if not x.startswith('__')]),
                n))
        except Exception, e:
          results.append('Walking dict %s failed: %s\n' % (n, e))
    except Exception, e:
      results.append('Iter dict failed %s\n' % name)
    return results
  g = globals()
  return [x for x in _report_obj(1, g, '') if isinstance(x, str)]

def handle_ip_valid(environ, start_response):
  """Emit a list of IPs, first line status/control, last line '.'"""
  global sks_data, sks_data_lock

  with sks_data_lock:
    data = sks_data

  start_response('200 OK', [
    ('Content-Type', 'text/plain; charset=UTF-8'),
    ])

  if data is None or 'node_data' not in data or not len(data['node_data']):
    yield 'IP-Gen/1: status=INVALID count=0 reason=first_scan\n.\n'
    raise StopIteration()

  ips_all = {}
  for name, node in data['node_data'].iteritems():
    try:
      ip_geo_list = data['addresses'][name]
      if node and str(node.version) != '1.0.10' and int(node.keycount) > 1:
        for ip, geo in ip_geo_list:
          ips_all[ip] = int(node.keycount)
    except:
      continue

  # We want to discard statistic-distorting outliers, then of what remains,
  # discard those too far away from "normal", but we really want the "best"
  # servers to be our guide, so 1 std-dev of the second-highest remaining
  # value should be safe; in fact, we'll hardcode a limit of how far below.
  # To discard, find mode size (knowing that value can be split across two
  # buckets) and discard more than five stddevs from mode.  The bucketing
  # should be larger than the distance from desired value so that the mode
  # is only split across two buckets, if we assume enough servers that a
  # small number will be down, most will be valid-if-large-enough, so that
  # splitting the count across two buckets won't let the third-best value win
  buckets = {}
  for ip, count in ips_all.iteritems():
    bucket = int(count // 3000)
    if bucket not in buckets:
      buckets[bucket] = []
    buckets[bucket].append(count)
  largest_bucket = max(buckets)
  first_n = len(buckets[largest_bucket])
  first_mean = sum(buckets[largest_bucket]) / first_n
  first_sd = sqrt(sum((x-first_mean)**2 for x in buckets[largest_bucket]) / first_n)
  first_bounds = (int(first_mean - 5*first_sd), int(first_mean + 5*first_sd))

  first_ips_list = filter(lambda x: first_bounds[0] <= ips_all[x] <= first_bounds[1], ips_all)
  first_ips = dict([(x, ips_all[x]) for x in first_ips_list])
  second_mean = sum(first_ips.values()) / len(first_ips)

  if second_mean < kIPLIST_SANITY_MIN:
    yield 'IP-Gen/1: status=INVALID count=0 reason=broken_data\n.\n'
    raise StopIteration()

  threshold = sorted(first_ips.values())[-2] - kIPLIST_NEED_THRESHOLD
  ips = [x for x in first_ips if first_ips[x] >= threshold]

  count = len(ips)
  log('ip-valid: Yielding %d of %d values', count, len(ips_all))
  yield 'IP-Gen/1: status=COMPLETE count=%d tags=skip_1010\n' % count
  yield '\n'.join(ips)
  yield '\n.\n'


def application(environ, start_response):
  """The main entry point for serving as an application.

  WSGI interface.

  Will init global context if not already init'd.
  """
  global sks_data_lock, wsgi_selector

  if sks_data_lock is None:
    # We're embedded in a web-server and need to bootstrap
    sks_peers_init()

  if wsgi_selector is None:
    return return_error(environ, start_response,
        'Internal problem', 'Failed to initialise correctly, please report this', '500')

  i_am = environ.get('SCRIPT_NAME', None)
  if i_am is not None:
    t = environ.get('REQUEST_URI','').replace(i_am, '', 1)
    if not len(t):
      t = '/'
    environ['REQUEST_URI'] = t
  return wsgi_selector(environ, start_response)


def main(argv=None):
  """Entry point when invoked as a normal binary."""
  if argv is None:
    argv = []
  if len(argv) and argv[0] == 'standalone':
    from wsgiref.simple_server import make_server
    argv.pop(0)

  if len(argv) and argv[0] == 'nodata':
    shutdown = sks_peers_init(skip_nodechecker=True)
    argv.pop(0)
  else:
    shutdown = sks_peers_init()
  if shutdown is None:
    raise InternalUsageError('init failed to return a shutdown object')

  if 'make_server' in locals():
    server_host = 'localhost'
    port = 8080

    if len(argv) >= 1:
      if argv[0].startswith(':'):
        argv.insert(0, 'localhost')
        argv[1] = argv[1][1:]
      else:
        server_host = argv[0]

    try:
      if len(argv) >= 2:
        port = int(argv[1])
      if port < 2 or port > 65535:
        raise OverflowError('port out of range')
    except ValueError, e:
      print >>sys.stderr, 'Unable to parse as valid port: %s' % argv[1]
    except OverflowError, e:
      print >>sys.stderr, 'Port number %d out of range, resetting' % port
      port = 8080

    global wsgi_selector
    server = make_server(server_host, port, wsgi_selector)
    print >>sys.stderr, 'Starting up webserver listening on {%s} %d' % (
        server_host, port)
    try:
      server.serve_forever()
    except KeyboardInterrupt, e:
      print >>sys.stderr, 'Received keyboard interrupt, shutting down'
      if not shutdown.kill():
        print >>sys.stderr, 'Shutdown failed, aargh.'
        sys.exit(1)

  else:
    import wsgiref.handlers
    global sks_data, sks_data_lock
    scanning = True
    while scanning:
      with sks_data_lock:
        _sks_data = sks_data
      if _sks_data is None:
        time.sleep(1)
      else:
        scanning = False
    wsgiref.handlers.CGIHandler().run(handle_peers_page)
    if shutdown.kill():
      return 0
    return 1


if __name__ == '__main__':
  main(sys.argv[1:])

# vim: set filetype=python sw=2 expandtab :

Attachment: pgprkvAPVxcRu.pgp
Description: PGP signature

_______________________________________________
Sks-devel mailing list
Sks-devel@nongnu.org
http://lists.nongnu.org/mailman/listinfo/sks-devel

Reply via email to