Re: [Tutor] Starbucks does not use two-phase commit
Hi Danny, Just to report something regarding the code you have provided. I noticed that when I do a keyboard interrupt, I get the keyboard interrupt exception messages, but after that it keeps hangning and never returns to the command line input mode. I have to close the shell to really end the program afaics. After reading Todd's emails, I wondered if it had something to do with how the threaded queue is started. In your example, you use: Thread( target = function name ).start() While I have been using the thread module, using: thread.start_new( function name, ( thread id, other args ) ) When I implemented your example in my program, I also used your approach, and started having the hanging behavior. I reconverted the thread spawn to my own approch, just to see if it would make a difference. And with my approach, the keyboard interrupts really aborts the program, the shell returns to command line input mode. Any idea why those two methods of starting threads can behave so differently? Thanks Bernard On 1/20/06, Bernard Lebel [EMAIL PROTECTED] wrote: Ultra-nice Danny, it works (ie: I have implemented your example in my code and it runs as it should). I have to admit that I'm a little bit puzzled by this chain of instantiation and argument passing, however I intend to decipher it properly. Thanks again! Bernard On 1/20/06, Danny Yoo [EMAIL PROTECTED] wrote: On Fri, 20 Jan 2006, Bernard Lebel wrote: So have written a little test script. The fact is that I want to be able to manage the same queue from separate threads. Below is an example of what my real program is doing: Hi Bernard, One problem is that there's a single outputQueue being presented to get results back from the Server. A different approach is to use a lot of outputQueues. *grin* The idea is that when we send a job submission, we immediately get back a ticket. We can then use this ticket to claim() our result. Each ticket is unique to a job submission, so we shouldn't see any bleeding going on between clients. Here's some code that implements this idea. It's a little complex, so you may want to read through it slowly: from threading import Thread from Queue import Queue class Ticket: A small token that we can use to claim our result. def __init__(self, q): self.q = q self.result = None self.done = False def claim(self): if not self.done: self.result = self.q.get() self.done = True return self.result class Server: _QUIT_JOB = ['Quit!'] def __init__(self): A queue will contain 2-tuples of (job, outputQueue) elements. self.queue = Queue() def startServer(self): Brings the server online. Thread(target=self._jobLoop).start() def schedule(self, job): Schedules a job to be done and returns a ticket that the client can use later to claim the result of the job. outputQueue = Queue() self.queue.put((job, outputQueue)) return Ticket(outputQueue) def scheduleShutdown(self): Add a 'job' that shuts the system down. self.queue.put((Server._QUIT_JOB, None)) def _jobLoop(self): Continue looping through tasks. while True: print Looping... (nextJob, outputQueue) = self.queue.get() if nextJob is Server._QUIT_JOB: return returnValue = self._doJob(nextJob) outputQueue.put(returnValue) def _doJob(self, job): print I'm doing, job return job + job ## something to show that we're doing something def separateCaller(server): for i in range(1000, 1004 + 1): print --Separate caller asks %d % i ticket = server.schedule(str(i)) print --Separate caller got %s % ticket.claim() if __name__ == '__main__': server = Server() server.startServer() Thread(target=separateCaller, args=(server,)).start() result1 = server.schedule(1) print result1: %s % result1.claim() result2 = server.schedule(2) print result2: %s % result2.claim() result3 = server.schedule(3) print result3: %s % result3.claim() server.scheduleShutdown() # Play around with this a bit and see if it makes sense to you. You might also be interested in the amusing article Starbucks Does Not Use Two-Phase Commit: http://www.eaipatterns.com/ramblings/18_starbucks.html as it touches on concepts in the code. If you have more questions, please feel free to ask! ___ Tutor maillist - Tutor@python.org
Re: [Tutor] Starbucks does not use two-phase commit
I noticed that when I do a keyboard interrupt, I get the keyboard interrupt exception messages, but after that it keeps hangning and never returns to the command line input mode. I have to close the shell to really end the program afaics. Hi Bernard, When we're using the high-level 'threading' interface, the server thread --- which runs independently of the main thread --- will continue to run until it's shut down. But we can call setDaemon() on the server's thread to allow Python to exit. Look at the bottom of: http://www.python.org/doc/lib/thread-objects.html So we can modify server.startServer() to flag the server thread as a daemon: def startServer(self): t = Thread(thread=self._jobLoop) t.setDaemon(True) t.start() While I have been using the thread module, using: thread.start_new( function name, ( thread id, other args ) ) When I implemented your example in my program, I also used your approach, and started having the hanging behavior. I reconverted the thread spawn to my own approch, just to see if it would make a difference. The low level 'thread' library is a bit more platform specific. Here's what the documentation says: (http://www.python.org/doc/lib/module-thread.html) When the main thread exits, it is system defined whether the other threads survive. On SGI IRIX using the native thread implementation, they survive. On most other systems, they are killed without executing try ... finally clauses or executing object destructors. Does this make sense? ___ Tutor maillist - Tutor@python.org http://mail.python.org/mailman/listinfo/tutor
Re: [Tutor] Starbucks does not use two-phase commit
Hi Danny, Yes that makes sense, but.. what is a daemon? Sorry if this is super basic question. Thanks Bernard On 1/23/06, Danny Yoo [EMAIL PROTECTED] wrote: I noticed that when I do a keyboard interrupt, I get the keyboard interrupt exception messages, but after that it keeps hangning and never returns to the command line input mode. I have to close the shell to really end the program afaics. Hi Bernard, When we're using the high-level 'threading' interface, the server thread --- which runs independently of the main thread --- will continue to run until it's shut down. But we can call setDaemon() on the server's thread to allow Python to exit. Look at the bottom of: http://www.python.org/doc/lib/thread-objects.html So we can modify server.startServer() to flag the server thread as a daemon: def startServer(self): t = Thread(thread=self._jobLoop) t.setDaemon(True) t.start() While I have been using the thread module, using: thread.start_new( function name, ( thread id, other args ) ) When I implemented your example in my program, I also used your approach, and started having the hanging behavior. I reconverted the thread spawn to my own approch, just to see if it would make a difference. The low level 'thread' library is a bit more platform specific. Here's what the documentation says: (http://www.python.org/doc/lib/module-thread.html) When the main thread exits, it is system defined whether the other threads survive. On SGI IRIX using the native thread implementation, they survive. On most other systems, they are killed without executing try ... finally clauses or executing object destructors. Does this make sense? ___ Tutor maillist - Tutor@python.org http://mail.python.org/mailman/listinfo/tutor
Re: [Tutor] Starbucks does not use two-phase commit
On Mon, 23 Jan 2006, Bernard Lebel wrote: Yes that makes sense, but.. what is a daemon? Sorry if this is super basic question. According to: http://docs.python.org/lib/thread-objects.html A thread can be flagged as a ``daemon thread''. The significance of this flag is that the entire Python program exits when only daemon threads are left. The initial value is inherited from the creating thread. The flag can be set with the setDaemon() method and retrieved with the isDaemon() method. So that's what daemon technically does when we apply that term it to a thread. But what it means to us humans is up to interpretation: I think of daemon threads as being more ephemeral than other threads. Not sure if that makes any sense to anyone besides myself, though. *grin* There's a traditional use of the word daemon that deals with programs that run in the background: http://en.wikipedia.org/wiki/Daemon_(computer_software) so the word daemon is, like most words, a bit overloaded. *grin* ___ Tutor maillist - Tutor@python.org http://mail.python.org/mailman/listinfo/tutor
Re: [Tutor] Starbucks does not use two-phase commit
Thanks a lot Danny. Bernard On 1/23/06, Danny Yoo [EMAIL PROTECTED] wrote: On Mon, 23 Jan 2006, Bernard Lebel wrote: Yes that makes sense, but.. what is a daemon? Sorry if this is super basic question. According to: http://docs.python.org/lib/thread-objects.html A thread can be flagged as a ``daemon thread''. The significance of this flag is that the entire Python program exits when only daemon threads are left. The initial value is inherited from the creating thread. The flag can be set with the setDaemon() method and retrieved with the isDaemon() method. So that's what daemon technically does when we apply that term it to a thread. But what it means to us humans is up to interpretation: I think of daemon threads as being more ephemeral than other threads. Not sure if that makes any sense to anyone besides myself, though. *grin* There's a traditional use of the word daemon that deals with programs that run in the background: http://en.wikipedia.org/wiki/Daemon_(computer_software) so the word daemon is, like most words, a bit overloaded. *grin* ___ Tutor maillist - Tutor@python.org http://mail.python.org/mailman/listinfo/tutor
Re: [Tutor] Starbucks does not use two-phase commit
On Sat, 21 Jan 2006, Todd Maynard wrote: I want to thank you for ruining my plans for a relaxing Saturday morning. As a thread newbie I killed several hours playing around with your code. Hi Todd, Sorry about that. I hope you were relaxing in a cafe while playing with the code. One thing I noticed is that sometimes the program would hang, which I figured was the Queue code blocking in the Ticket claim function. I used exception handling to deal with that situation cleanly. That's odd. There shouldn't be anything that blocks the code. Oh! Did you make changes to the test code, or did the hanging occur in the original code in: http://mail.python.org/pipermail/tutor/2006-January/044567.html I'm curious because nothing there should fundamentally block, assuming that _doJob() doesn't dies badly with an exception. If _doJob() dies, the server dies, and that's bad. *grin* Do you mind showing what the exception handling looks like in your code? I then decided that it wasn't very nice of Starbucks to close after accepting my order without giving me my Latte, so I changed that part of the code to: [code cut] I am 99.44% sure that this is thread safe, reasoning being: setting the acceptNew to False and adding the QUIT_NOW happens in the same thread so it is impossible for another job to get scheduled after the QUIT_NOW - so no thread will end up hanging... Bad news: no. *grin* There's a race condition. Let's go into some detail with this, since this is not obvious stuff. First, let's look at the code again --- I'll label three lines with (a), (b), and (c), to make it a little easier to see the race. ### def schedule(self,job): if self.acceptNew == True: ## (a) outputQueue=Queue() self.queue.put((job,outputQueue)) return Ticket(outputQueue) else: print Server not accepting any new requests. return None def scheduleShutdown(self): ## (b) self.queue.put((Server._QUIT_NICELY,None)) def _jobLoop(self): while True: print Looping ... (nextJob, outputQueue) = self.queue.get() if nextJob is server._QUIT_NOW: return if nextJob is server._QUIT_NICELY: ## (c) self.acceptNew = False self.queue.put((Server._QUIT_NOW,None)) else: returnValue=self._doJob(nextJob) outputQueue.put(returnValue) ## Let's imagine three threads, which I'll name C1, C2, and S. C1 and C2 will be distinct client threads, and S will be the server thread that runs through _jobLoop(). Imagine the following scenario. The server's online, and its work queue is empty. 1. C1 calls schedule(), and reaches the line labeled (a). At this point, server.acceptNew is True, so it goes into the body of the if statement. But wait... 2. Now we context switch to C2. C2 calls scheduleShutdown() in its entirety. There is now a _QUIT_NICELY element in the queue. C2 is done for. 3. Now we context switch to the server thread S. It grabs the _QUIT_NICELY, and puts a _QUIT_NOW. Let's imagine that S continues and loops again. In the next loop through _jobLoop(), it sees _QUIT_NOW and exits. S is done for. Muhahaha. 4. Now we context switch back to C1 and continue with: outputQueue = Queue() self.queue.put((job,outputQueue)) return Ticket(outputQueue) In this scenario, poor C1 is left holding a ticket that will never cash out. One way to fix this problem is to make calling schedule() and scheduleShutdown() atomic in this sense: if we're calling schedule(), we shouldn't be able to context switch into a call to scheduleShutdown(), and visa-versa. Our troubles started at step 2 of the above scenario, where two clients jostled for attention. If we prevent that particular situation --- if we force all our clients to stand in line to get served --- then we'll be fine. So we might look into some synchronizing tool, like a Lock object: http://www.python.org/doc/lib/lock-objects.html Concretely, we can add an exclusive Lock object to the server's __init__: def __init__(self): self.clientLock = Lock() and make sure to acquire-and-release in any of our schedule* functions: def schedule*(self, job): self.clientLock.acquire() try: ... finally: self.clientLock.release() However, I would sleep a little better if you could reassure me that I am right, and would sleep even better if you could give me a method to test this. I'm sorry; I can't provide either. That doesn't mean that such things don't exist, but only that I don't know about them. (The only formal training I've received on this, so far,
Re: [Tutor] Starbucks does not use two-phase commit
Well Danny, now I know how I am gonna spend my Sunday Thanks for the great explanation and the resources. Of course do you think I could manage to get the code to break - of course not Usually I have the opposite problem. Anyways I think that your explanation makes perfect sense. My problem with your original code is that _jobLoop could sometimes return when there where still jobs (from separateCaller) still left in the queue. When separateCaller tried to ticket.claim , the self.result = self.q.get() would block, causing the program to hang indefinitely. This is what I was trying to prevent by using the timeout in the get() call and then handling the possible Empty exception. I am now gonna play with this some more to see if I can build a robust/clean coffeeshop framework, with customers placing orders with a cashier , the cashier passing the orders to a barista and the barista processing the orders and delivering to the customers. The idea of course being that the customers, cashier, and baristas each run in different threads. Then to enhance with multiple cashiers and baristas but first I need to put another pot of coffee on. If you don't hear from me in a while, I've probably suffered a caffeine overdose. Thanks for the inspiration, Todd Maynard -- The tao that can be tar(1)ed is not the entire Tao. The path that can be specified is not the Full Path. We declare the names of all variables and functions. Yet the Tao has no type specifier. Dynamically binding, you realize the magic. Statically binding, you see only the hierarchy. Yet magic and hierarchy arise from the same source, and this source has a null pointer. Reference the NULL within NULL, it is the gateway to all wizardry. On Sunday 22 January 2006 03:13, Danny Yoo wrote: On Sat, 21 Jan 2006, Todd Maynard wrote: I want to thank you for ruining my plans for a relaxing Saturday morning. As a thread newbie I killed several hours playing around with your code. Hi Todd, Sorry about that. I hope you were relaxing in a cafe while playing with the code. One thing I noticed is that sometimes the program would hang, which I figured was the Queue code blocking in the Ticket claim function. I used exception handling to deal with that situation cleanly. That's odd. There shouldn't be anything that blocks the code. Oh! Did you make changes to the test code, or did the hanging occur in the original code in: http://mail.python.org/pipermail/tutor/2006-January/044567.html I'm curious because nothing there should fundamentally block, assuming that _doJob() doesn't dies badly with an exception. If _doJob() dies, the server dies, and that's bad. *grin* Do you mind showing what the exception handling looks like in your code? I then decided that it wasn't very nice of Starbucks to close after accepting my order without giving me my Latte, so I changed that part of the code to: [code cut] I am 99.44% sure that this is thread safe, reasoning being: setting the acceptNew to False and adding the QUIT_NOW happens in the same thread so it is impossible for another job to get scheduled after the QUIT_NOW - so no thread will end up hanging... Bad news: no. *grin* There's a race condition. Let's go into some detail with this, since this is not obvious stuff. First, let's look at the code again --- I'll label three lines with (a), (b), and (c), to make it a little easier to see the race. ### def schedule(self,job): if self.acceptNew == True: ## (a) outputQueue=Queue() self.queue.put((job,outputQueue)) return Ticket(outputQueue) else: print Server not accepting any new requests. return None def scheduleShutdown(self): ## (b) self.queue.put((Server._QUIT_NICELY,None)) def _jobLoop(self): while True: print Looping ... (nextJob, outputQueue) = self.queue.get() if nextJob is server._QUIT_NOW: return if nextJob is server._QUIT_NICELY: ## (c) self.acceptNew = False self.queue.put((Server._QUIT_NOW,None)) else: returnValue=self._doJob(nextJob) outputQueue.put(returnValue) ## Let's imagine three threads, which I'll name C1, C2, and S. C1 and C2 will be distinct client threads, and S will be the server thread that runs through _jobLoop(). Imagine the following scenario. The server's online, and its work queue is empty. 1. C1 calls schedule(), and reaches the line labeled (a). At this point, server.acceptNew is True, so it goes into the body of the if statement. But wait... 2. Now we context switch to C2. C2 calls scheduleShutdown() in its
Re: [Tutor] Starbucks does not use two-phase commit
Danny, I want to thank you for ruining my plans for a relaxing Saturday morning. As a thread newbie I killed several hours playing around with your code. One thing I noticed is that sometimes the program would hang, which I figured was the Queue code blocking in the Ticket claim function. I used exception handling to deal with that situation cleanly. I then decided that it wasn't very nice of Starbucks to close after accepting my order without giving me my Latte, so I changed that part of the code to: def schedule(self,job): Schedules a job and returns a ticket the user can later use to get the result. if self.acceptNew == True: outputQueue=Queue() self.queue.put((job,outputQueue)) return Ticket(outputQueue) else: print Server not accepting any new requests. return None def scheduleShutdown(self): Add a job that shuts the system down. print Telling server to shut down self.queue.put((Server._QUIT_NICELY,None)) def _jobLoop(self): Continue looping through tasks. while True: print Looping ... (nextJob, outputQueue) = self.queue.get() if nextJob is server._QUIT_NOW: return if nextJob is server._QUIT_NICELY: self.acceptNew = False self.queue.put((Server._QUIT_NOW,None)) else: returnValue=self._doJob(nextJob) outputQueue.put(returnValue) I am 99.44% sure that this is thread safe, reasoning being: setting the acceptNew to False and adding the QUIT_NOW happens in the same thread so it is impossible for another job to get scheduled after the QUIT_NOW - so no thread will end up hanging... However, I would sleep a little better if you could reassure me that I am right, and would sleep even better if you could give me a method to test this. This kinda stuff looks tricky to test with standard unittest methodology Thanks again for the enligntenment all you guys bring to this awesome language. My bastardized code is below for reference. --Todd Maynard [EMAIL PROTECTED] [It is] best to confuse only one issue at a time. -- KR ** from threading import Thread from Queue import Queue,Empty class Ticket(object): A small token we can use to claim our result. def __init__(self, q): self.q = q self.result = None self.done=False def claim(self): if not self.done: try: self.result=self.q.get(True,5) self.done=True except Empty: print We lost the server! self.result=None return self.result class Server(object): _QUIT_NOW=['Quit!'] _QUIT_NICELY=['Quit Nicely'] def __init__(self): A queue will contain 2-tuples of (job, outputQueue) elements. self.queue=Queue() self.acceptNew=True def startServer(self): Brings the server online. Thread(target=self._jobLoop).start() def schedule(self,job): Schedules a job and returns a ticket the user can later use to get the result. if self.acceptNew == True: outputQueue=Queue() self.queue.put((job,outputQueue)) return Ticket(outputQueue) else: print Server not accepting any new requests. return None def scheduleShutdown(self): Add a job that shuts the system down. print Telling server to shut down self.queue.put((Server._QUIT_NICELY,None)) def _jobLoop(self): Continue looping through tasks. while True: print Looping ... (nextJob, outputQueue) = self.queue.get() if nextJob is server._QUIT_NOW: return if nextJob is server._QUIT_NICELY: self.acceptNew = False self.queue.put((Server._QUIT_NOW,None)) else: returnValue=self._doJob(nextJob) outputQueue.put(returnValue) def _doJob(self,job): print I'm doing , job return job + job #Something to show that we are doing something def separateCaller(server): for i in range(1000,1004+1): print -- separateCaller asks %d % i ticket = server.schedule(str(i)) if ticket: print -- separateCaller got %s % ticket.claim() else: print -- separateCaller couldn't get a ticket. if __name__==__main__: server = Server() server.startServer() Thread(target=separateCaller, args=(server,)).start() result1=server.schedule(1)