http://www.mediawiki.org/wiki/Special:Code/MediaWiki/70659

Revision: 70659
Author:   tstarling
Date:     2010-08-08 02:20:56 +0000 (Sun, 08 Aug 2010)

Log Message:
-----------
Incomplete pool counter daemon by Domas Mituzas.

Added Paths:
-----------
    trunk/extensions/PoolCounter/poolcounter.py

Added: trunk/extensions/PoolCounter/poolcounter.py
===================================================================
--- trunk/extensions/PoolCounter/poolcounter.py                         (rev 0)
+++ trunk/extensions/PoolCounter/poolcounter.py 2010-08-08 02:20:56 UTC (rev 
70659)
@@ -0,0 +1,228 @@
+#!/usr/bin/env python
+#
+# Better run this with twistd -r epoll -y poolcounter.py -u nobody -g nogroup
+#
+
+from twisted.internet import reactor, protocol
+from twisted.protocols import basic, policies
+from twisted.application import internet, service
+
+import time
+
+port=7531
+
+class Error:
+       badcomm = (1, "Bad command or not enough arguments")
+       nokey = (2, "%s No such key acquired by current connection")
+       haveit = (3, "%s Already acquired")
+       
+       def __init__(self,error,params=[]):
+               self.errno=error[0]
+               self.message=error[1]
+               self.params=params
+               pass
+               
+       def msg(self):
+               return "ERROR %03d %s\n" % (self.errno, self.message % 
self.params)
+
+
+class CounterDatabase(dict):
+       """Counter database organizes counter objects by their name,
+       and provides basis for on-demand creation and destruction of counters
+       
+       It maps:
+               1:1 with CounterFactory
+               1:n with Counter (linked both sides)
+       """
+       def acquireCounter(self, name, client,options={}):
+               """Acquires (and if needed) creates a counter 
+               for a client, as well as increments counts"""
+               
+               if name not in self:
+                       counter = Counter(name, self)
+               else:
+                       counter = self[name]
+                       
+               if client in counter: 
+                       raise Error(Error.haveit, name)
+               
+               counter.acquire(client)
+               return counter
+               
+       def releaseCounter(self, name, client, options={}):
+               """Releases, and if necessary, destroys counter object"""
+               try: 
+                       counter = self[name]
+                       counter[client]
+               except KeyError:
+                       raise Error(Error.nokey, name)
+
+               # Counter manages it's own membership inside the database
+               counter.release(client)
+
+
+class Counter(dict):
+       """Counter object tracks and counts 
+       clients that have acquired it"""
+       # count is size of dictionary
+       count = property(fget=len)
+       age = property(fget = lambda self: int(time.time() - self.init_time))
+       
+       def __hash__(self):
+               """Make this object usable as key in other dictionaries, 
+               after losing such property by inheriting (dict)"""
+               return id(self);
+       
+       def __init__(self, name, db):
+               """Register object in supplied database"""
+               self.database = db
+               self.name = name
+               self.database[name] = self
+               self.init_time = time.time()
+
+       def acquire(self, client):
+               """Register both client inside counter, and counter at client"""
+               self[client] = True
+               client.counts[self] = True
+       
+       def release(self, client):
+               del self[client]
+               del client.counts[self]
+               
+               if len(self) == 0:
+                       del self.database[self.name]
+
+
+class CounterClient(basic.LineReceiver,policies.TimeoutMixin):
+       """Counter protocol, basic functions and connection tracking"""
+       def __init__(self):
+               """Initialize counter objects held by a client"""
+               self.counts = {}
+               
+       def error(self, error, parts=[]):
+               """Write an error, based either on exception message or custom 
error tuple"""
+               if isinstance(error,Error):
+                       self.transport.write(error.msg())
+               else:
+                       self.transport.write("ERROR %03d %s\n" % (error[0], 
error[1] % parts) )
+       
+       def connectionMade(self):
+               """When connection is established, add it to factory 
+               public list of connections, and set timeouts"""
+               self.factory.conns[self] = True
+               self.setTimeout(300)
+               
+       def lineReceived(self,line):
+               """Process the request line, and invoke necessary actions"""
+               
+               request = line.split()
+               
+               self.resetTimeout()
+               self.factory.stats_requests += 1
+               
+               # Development, debugging and introspection functions
+               if (len(request)<=1):
+                       if len(request) == 0: 
+                               pass
+                       elif request[0] == "quit":
+                               self.transport.loseConnection()
+                       # Show counts acquired by connections
+                       elif request[0] == "conns":
+                               self.transport.write( str([(k, [c.name for c in 
k.counts]) 
+                                       for k in self.factory.conns])+"\n")
+                       # Show counter values
+                       elif request[0] == "counts":
+                               self.transport.write(str([(k,v.count) for k,v 
in 
+                                       self.factory.database.items()])+ "\n")
+                       # Just die :) 
+                       elif request[0] == "die":
+                               reactor.stop()
+                       # Basic statistics
+                       elif request[0] == "stats":
+                               self.transport.write("Counters: 
%d\nConnections: %d\nRequests: %d\n" % 
+                                       (len(self.factory.database), 
len(self.factory.conns), self.factory.stats_requests))
+                       else:
+                               self.error(Error.badcomm)
+                       return
+
+               # From here on verbs need nouns and more
+               # This is where real work starts
+               # First we save options provided
+               options = dict([len(y.split(":", 2)) > 1 and y.split(":", 2) or 
(y, True) 
+                       for y in request[2:]])
+
+               verb = request[0]
+               key = request[1]
+
+               # Acquire can specify:
+               # XXX - notification thresholds
+               # XXX - ???
+               # XXX - Profit!!!
+               
+               if verb == "acquire":
+                       # Parse the options, format is: a:x b c:y ...
+                       try: 
+                               counter = 
self.factory.database.acquireCounter(key, self, options)
+                       except Error, e:
+                               self.error(e) 
+                               return
+                       self.transport.write("ACK %s count:%d age:%d\n" % (key, 
counter.count, counter.age) )
+               # Release can specify options:
+               # XXX - abandoned (don't notify)
+               elif verb == "release":
+                       try:
+                               self.factory.database.releaseCounter(key, self, 
options)
+                       except Error, e:
+                               self.error(e)
+                               return
+                       self.transport.write("RELEASED %s\n" % key)
+               # Noop counter fetch
+               elif verb == "count":
+                       if key in self.factory.database:
+                               counter=self.factory.database[key]
+                               count=counter.count
+                               age=counter.age
+                       else:
+                               count=0
+                               age=0
+                       self.transport.write("COUNT %s count:%d age:%d\n" % 
(key, count) )
+               else:
+                       self.error(Error.badcomm)
+                       return
+                       
+       def connectionLost(self, reason):
+               """Abandon counters and deregister connection on lost 
connection in case:
+                       * Disconnected
+                       * "quit" command received
+                       * Timeout hit
+               """
+               for counter in self.counts.copy(): 
+                       counter.release(self)
+               del self.factory.conns[self]
+
+
+class CounterFactory(protocol.ServerFactory):
+       """Counter instance object, owns database and bunch of clients"""
+       protocol = CounterClient
+       
+       def __init__(self):
+               self.database = CounterDatabase()
+               self.stats_requests = 0
+               self.conns = {}
+
+# Linux reactor
+try:
+       from twisted.internet import epollreactor
+       epollreactor.install()
+except: pass
+
+factory = CounterFactory()
+application = service.Application("poolcounter")
+counterservice = internet.TCPServer(port, factory)
+counterservice.setServiceParent(application)
+
+# If invoked directly and ot via twistd...
+if __name__=="__main__":
+       reactor.listenTCP(port, factory)
+       reactor.run()
+


Property changes on: trunk/extensions/PoolCounter/poolcounter.py
___________________________________________________________________
Added: svn:executable
   + *
Added: svn:eol-style
   + native



_______________________________________________
MediaWiki-CVS mailing list
MediaWiki-CVS@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-cvs

Reply via email to