I'm trying to implement an application that will listen for requests, run them when they are present but also be able to add new requests even while it's running. I've tried to do this using the thread and xmlrpc modules - the idea is that an XML-RPC exposed object tell the queue thread object to add a job. If there are no jobs running, it creates a file, adds the new job to the end and then another consumption thread starts working through the jobs in the file. New jobs coming in are just added to the end of the file by the queue thread.
Unfortunately, I can't get it to work. The problem is that the consumption thread seems to read the job queue before it gets written, even though I've used a lock. I've also had the application get to the stage where it ignores ctrl-c, which is a little worrying - I fear it doesn't bode well for future stability... I don't have a lot of experience with multi-threaded applications, so I may well have chosen a poor approach. I've posted the code below. It's in three parts, the job queue, the manager that listens for new requests and an application to add jobs to the queue. Sorry for the long listings... Any guidance gratefully received, Peter === testqueue.py: import thread import time import shutil import os class JobQueue: def __init__(self, filename): self.queuefile = filename self.jobthread = 0 # lock for the jobfile queue file self.jfqlock = thread.allocate_lock() def addJob(self, jobfileuri, email): self.jfqlock.acquire() if not self.jobthread: print "starting jobfile consumption thread" if os.access(self.queuefile, os.R_OK): print "cleaning stale jobfile queue file" try: os.remove(self.queuefile) except: print "problem removing jobfile queue file" raise self.jobthread = thread.start_new_thread(self.main, ()) else: print "using existing jobfile consumption thread in file", self.queuefile fh = open(self.queuefile, "a") # choose "::::" as a delimiter print >> fh, jobfileuri + "::::" + email self.jfqlock.release() return 1 def main(self): while 1: self.jfqlock.acquire() rfh = open(self.queuefile, "r") # breakpoint() finput = rfh.readline() print "found:", finput if not finput: print "finished jobfiles. Closing thread" rfh.close() self.jobthread = 0 self.jfqlock.release() return else: print "found jobfile in queue: attempting to run" # most of this is to shift up the lines in the file tmpname = self.queuefile + ".tmp" wfh = open(tmpname, "w") line = rfh.readline() while line: wfh.write(line) line = rfh.readline() wfh.close() rfh.close() shutil.move(tmpname, self.queuefile) self.jfqlock.release() # lop off the trailing line break print print "***run Starting***" try: self.runJob(finput[:-1]) print "***run finished***" except: print "***run failed***" print def runJob(self, job): time.sleep(2.0) print "running job", job if not report: print "some problem with run. Cannot mail out report" return === queuemanager.py from testqueue import JobQueue from SimpleXMLRPCServer import * class QM: def __init__(self, filename): self.jq = JobQueue("queue.txt") def addJob(self, jobname): self.jq.addJob(jobname, "[EMAIL PROTECTED]") if __name__=="__main__": qm = QM("jobqueue.txt") rpcserver = SimpleXMLRPCServer(("localhost", 8000)) rpcserver.register_instance(qm) rpcserver.serve_forever() === addjob.py: import xmlrpclib import sys server = "localhost" port = 8000 serveradd = "http://%s:%s" % (server, port) manager = xmlrpclib.ServerProxy(serveradd) jobname = sys.argv[1] manager.addJob(jobname) -- http://mail.python.org/mailman/listinfo/python-list