I'm trying to implement an application that will listen for requests,
run them when they are present but also be able to add new requests
even while it's running. I've tried to do this using the thread and
xmlrpc modules - the idea is that an XML-RPC exposed object tell the
queue thread object to add a job. If there are no jobs running, it
creates a file, adds the new job to the end and then another
consumption thread starts working through the jobs in the file. New
jobs coming in are just added to the end of the file by the queue
thread.

Unfortunately, I can't get it to work. The problem is that the
consumption thread seems to read the job queue before it gets written,
even though I've used a lock. I've also had the application get to the
stage where it ignores ctrl-c, which is a little worrying - I fear it
doesn't bode well for future stability... I don't have a lot of
experience with multi-threaded applications, so I may well have chosen
a poor approach.

I've posted the code below. It's in three parts, the job queue, the
manager that listens for new requests and an application to add jobs
to the queue. Sorry for the long listings...

Any guidance gratefully received,

Peter

===
testqueue.py:

import thread
import time
import shutil
import os

class JobQueue:

        def __init__(self, filename):
                self.queuefile = filename
                self.jobthread = 0
                # lock for the jobfile queue file
                self.jfqlock = thread.allocate_lock()

        def addJob(self, jobfileuri, email):
                self.jfqlock.acquire()
                if not self.jobthread:
                        print "starting jobfile consumption thread"
                        if os.access(self.queuefile, os.R_OK):
                                print "cleaning stale jobfile queue file"
                                try:
                                        os.remove(self.queuefile)
                                except:
                                        print "problem removing jobfile queue 
file"
                                        raise
                        self.jobthread = thread.start_new_thread(self.main, ())
                else:
                        print "using existing jobfile consumption thread in 
file",
self.queuefile
                fh = open(self.queuefile, "a")
                # choose "::::" as a delimiter
                print >> fh, jobfileuri + "::::" + email
                self.jfqlock.release()
                return 1

        def main(self):
                while 1:
                        self.jfqlock.acquire()
                        rfh = open(self.queuefile, "r")
#                       breakpoint()
                        finput = rfh.readline()
                        print "found:", finput
                        if not finput:
                                print "finished jobfiles. Closing thread"
                                rfh.close()
                                self.jobthread = 0
                                self.jfqlock.release()
                                return
                        else:
                                print "found jobfile in queue: attempting to 
run"
                                # most of this is to shift up the lines in the 
file
                                tmpname = self.queuefile + ".tmp"
                                wfh = open(tmpname, "w")
                                line = rfh.readline()
                                while line:
                                        wfh.write(line)
                                        line = rfh.readline()
                                wfh.close()
                                rfh.close()
                                shutil.move(tmpname, self.queuefile)
                                self.jfqlock.release()
                                # lop off the trailing line break
                                print
                                print "***run Starting***"
                                try:
                                        self.runJob(finput[:-1])
                                        print "***run finished***"
                                except:
                                        print "***run failed***"
                                print

        def runJob(self, job):
                time.sleep(2.0)
                print "running job", job
                if not report:
                        print "some problem with run. Cannot mail out report"
                        return


===
queuemanager.py

from testqueue import JobQueue
from SimpleXMLRPCServer import *


class QM:
        def __init__(self, filename):
                self.jq = JobQueue("queue.txt")

        def addJob(self, jobname):
                self.jq.addJob(jobname, "[EMAIL PROTECTED]")

if __name__=="__main__":
        qm = QM("jobqueue.txt")
        rpcserver = SimpleXMLRPCServer(("localhost", 8000))
        rpcserver.register_instance(qm)
        rpcserver.serve_forever()

===
addjob.py:

import xmlrpclib
import sys

server = "localhost"
port = 8000

serveradd = "http://%s:%s"; % (server, port)
manager = xmlrpclib.ServerProxy(serveradd)

jobname = sys.argv[1]

manager.addJob(jobname)
--
http://mail.python.org/mailman/listinfo/python-list

Reply via email to