On Wed, 28 Jun 2006, Terry Carroll wrote: > On Wed, 28 Jun 2006, Tino Dai wrote: > > > Ok, I think I'm going to back up and explain what I'm am heading towards. > > I'm working on an app that fire off a bunch of threads. Each one of these > > threads in connected via queues to another thread in a sequence like a > > chain. And how I tell the next stage thread that there is data in the queue > > is via semaphore. > > You can just use a series of Queues, where each Queue represents the work > being passed from one thread to the other....
Okay, I was bored tonight, so I cooked up an illustration. Here's an example with five stages. Stage 1 takes a string and fills an input queue with a series of letters from the string. Stages 2-4 do just take a letter off its input queue and move it to its output queue. Stage 5 takes a letter off its input queue and then assembles it into the string and prints it once complete. A token "stop" rather than a letter on the queue is a signal to shut down. Each stage recognizes the token when received, passes it onto its output queue (if any) and then shuts down. Status messages show the progress and threading; a pause of random duration keeps things from being too predictable. Code: import time, Queue, threading, random def randompause(): time.sleep(random.randint(1,5)) def statusmessage(id, workunit): print "%s thread %s, workunit %s" %(time.asctime(), id, workunit) def endmessage(id): print "%s thread %s ending" %(time.asctime(), id) class processor(threading.Thread): def __init__(self, id, inQ=None, outQ=None, inmessage=None): """ id is thread Id: "first" for initial producer, "last" for final consumer inQ is input Q, or None for first producer outQ is outputQ, or None for final consumer """ self.inQ = inQ self.outQ = outQ self.id = id if self.id == "first": self.l = list(inmessage) if self.id == "last": self.message="" threading.Thread.__init__(self) def producer(self): while True: randompause() try: workunit = self.l.pop(0) except IndexError: self.outQ.put("stop") endmessage(self.id) return statusmessage(self.id, workunit) self.outQ.put(workunit) def consumer(self): while True: randompause() workunit = self.inQ.get(True) if workunit == "stop": print "final message:", self.message endmessage(self.id) return else: statusmessage(self.id, workunit) self.message = self.message+workunit def hybrid(self): while True: randompause() workunit = self.inQ.get(True) if workunit == "stop": self.outQ.put(workunit) endmessage(self.id) return else: statusmessage(self.id, workunit) self.outQ.put(workunit) def run(self): if self.id == "first": processor.producer(self) elif self.id == "last": processor.consumer(self) else: processor.hybrid(self) if __name__ == "__main__": q_ab = Queue.Queue() pa = processor(id="first", inmessage="spam", outQ=q_ab) q_bc = Queue.Queue() pb = processor(id="second", inQ=q_ab, outQ=q_bc) q_cd = Queue.Queue() pc = processor(id="third", inQ=q_bc, outQ=q_cd) q_de = Queue.Queue() pd = processor(id="fourth", inQ=q_cd, outQ=q_de) pe = processor(id="last", inQ=q_de) pa.start() pb.start() pc.start() pd.start() pe.start() Result: Thu Jun 29 00:37:30 2006 thread first, workunit s Thu Jun 29 00:37:31 2006 thread first, workunit p Thu Jun 29 00:37:32 2006 thread first, workunit a Thu Jun 29 00:37:33 2006 thread second, workunit s Thu Jun 29 00:37:33 2006 thread third, workunit s Thu Jun 29 00:37:33 2006 thread fourth, workunit s Thu Jun 29 00:37:33 2006 thread last, workunit s Thu Jun 29 00:37:34 2006 thread first, workunit m Thu Jun 29 00:37:37 2006 thread second, workunit p Thu Jun 29 00:37:37 2006 thread third, workunit p Thu Jun 29 00:37:38 2006 thread first ending Thu Jun 29 00:37:38 2006 thread fourth, workunit p Thu Jun 29 00:37:38 2006 thread last, workunit p Thu Jun 29 00:37:41 2006 thread second, workunit a Thu Jun 29 00:37:41 2006 thread third, workunit a Thu Jun 29 00:37:41 2006 thread fourth, workunit a Thu Jun 29 00:37:41 2006 thread last, workunit a Thu Jun 29 00:37:43 2006 thread second, workunit m Thu Jun 29 00:37:43 2006 thread third, workunit m Thu Jun 29 00:37:43 2006 thread fourth, workunit m Thu Jun 29 00:37:44 2006 thread last, workunit m Thu Jun 29 00:37:47 2006 thread second ending Thu Jun 29 00:37:47 2006 thread third ending Thu Jun 29 00:37:48 2006 thread fourth ending final message: spam Thu Jun 29 00:37:48 2006 thread last ending _______________________________________________ Tutor maillist - Tutor@python.org http://mail.python.org/mailman/listinfo/tutor