If you don't mind to use the coroutine library eventlet you can
implement a single threaded solution. See example below. For your use
case you need to change controller to load the shelve every
eventlet.sleep(n) seconds.
Regards, Andreas
# eventlet single thread demo
prc_publish.eventlet
Price Publisher
# imports
from argparse import ArgumentParser
import eventlet
import logging
import os
import random
import sys
import cPickle as pickle
LOG = logging.getLogger()
# definitions
def main(argv=None):
if argv is None:
argv = sys.argv
LOG.info(starting '%s %s', os.path.basename(argv[0]),
.join(argv[1:]))
# parse options and arguments
parser = ArgumentParser(description=Price Publisher)
parser.add_argument(-f, --file, dest=filename,
help=read configuration from %(dest)s)
parser.add_argument(-p, --port, default=8001, type=int,
help=server port [default: %(default)s)
args = parser.parse_args()
print args
# create product dict
prds = { }
pubqs = []
for n in range(10):
key = AB + {:04}.format(n)
prds[AB + key] = Pricer(key)
# start one thread for price changes
eventlet.spawn(controller, prds, pubqs)
address = ('localhost', 8010)
eventlet.spawn(listener, address, pubqs)
# main thread runs eventlet loop
while True:
eventlet.sleep(10)
def listener(address, pubqs):
sock = eventlet.listen(address)
while True:
LOG.info('waiting for connection on %s', address)
cx, remote = sock.accept()
LOG.info(accepting connection from %s, remote)
inq = eventlet.queue.Queue()
pubqs.append(inq)
eventlet.spawn(receiver, cx)
eventlet.spawn(publisher, pubqs, inq, cx)
def publisher(pubqs, inq, cx):
LOG.info(Publisher running)
try:
while True:
# what happens if client does not pick up
# what happens if client dies during queue wait
try:
with eventlet.Timeout(1):
item = inq.get()
s = pickle.dumps(item, pickle.HIGHEST_PROTOCOL)
# s = {0[0]} {0[1]}\n\r.format(item)
cx.send(s)
except eventlet.Timeout:
# raises IOError if connection lost
cx.fileno()
# if connection closes
except IOError, e:
LOG.info(e)
# make sure to close the socket
finally:
cx.close()
pubqs.remove(inq)
LOG.info(Publisher terminated)
def receiver(cx):
LOG.info(Receiver running)
try:
while True:
# what happens if client does not pick up
s = cx.recv(4096)
if not s:
break
LOG.info(s)
# if connection closes
except IOError, e:
LOG.info(e)
# make sure to close the socket
finally:
cx.close()
LOG.info(Receiver terminated)
def controller(prds, pubqs):
while True:
LOG.info(controller: price update cycle, %i pubqs,
len(pubqs))
Pricer.VOLA = update_vola(Pricer.VOLA)
for prd in prds.values():
prd.run()
for pubq in pubqs:
pubq.put((prd.name, prd.prc))
eventlet.sleep(5)
def update_vola(old_vola):
new_vola = max(old_vola + random.choice((-1, +1)) * 0.01, 0.01)
return new_vola
class Pricer(object):
VOLA = 0.01
def __init__(self, name):
self.name = name
self.prc = random.random() * 100.0
def run(self):
self.prc += random.choice((-1, +1)) * self.prc * self.VOLA
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s.%(msecs)03i %(levelname).
4s %(funcName)10s: %(message)s',
datefmt='%H:%M:%S')
main()
--
http://mail.python.org/mailman/listinfo/python-list