Re: Threading.Condition problem
Piet van Oostrum wrote: Gabriel Rossetti gabriel.rosse...@arimaz.com (GR) wrote: GR Sorry if this appears twice, I sent it once with an attachment and it never GR arrived so maybe the attachment is posing problems. I inlined the code this GR time (at the bottom), thank you, GR Gabriel GR ## Original message GR Hello everyone, GR I wrote a small example that listens for xmpp msgs in a thread. The main GR program calls a function that blocks (using Condition.wait) until a msg GR has been received and then returns the msg. When a msg arrives, it is GR put in a variable in the thread's object, it then calls the notify() GR attr on the Condition object. For some reason, this doesn't work, the GR thread gets the msg, tries to notify the Condition object, fails because GR the lock has not been acquired yet and blocks. I tried ignoring the GR failure, thinking that since it has not been acquired yet then when it GR is, it will get the msg right away and never call Condition.wait, thus GR not causing any problems, but this does not work either. Does someone GR know what I am doing wrong? I attached the code to this msg. The code that puts the message in the variable should also acquire the lock: def onMessage(self, conn, msg): with self._cv: self.message = msg self._cv.notify() Thank you, that was the problem, I eventually found that A couple of remarks: 1. I think the code is neater if all manipulation with the condition is done in the same class (actually in the same instance -- making this instance into a monitor). The reason I didn't do that is that I don' t want the Listener to sleep, I maybe over simplified the example, I actually put them in a dictionary as they come in, so in your example, if I have several threads waiting on msgs it wouldn't work. I'm trying to make a webservice api thay will also be turned into a java .jar for people that need java. Now that I think about it, each session will have an instance of the object so msgs shouldn' t get mixed up (one connection per user), so I could block in the thread. I'll try your suggestion as I think it is cleaner. class Listener(Thread): def __init__(self, ws): Thread.__init__(self) self.interrupt = Event() self.message = None self._cv = Condition() self.client = ws._client self.client.RegisterHandler('message', self.onMessage) def onMessage(self, conn, msg): with self._cv: self.message = msg try: self._cv.notify() except RuntimeError: print self._cv has not acquired the lock yet def getMsg(self): with self._cv: while !self.message self._cv.wait() return self.message class WS(object): def __init__(self, username, password, res): self._jid = xmpp.protocol.JID(username) self._client = xmpp.Client(self._jid.getDomain()) #self._cv = Condition() def getMsg(self, mid=None): return self._listener.getMsg() Of course I haven't tested this code as I don't have the context modules. 2. I don't know if more than one message can be delivered in the same instance. If yes, than your code will not work, and neither will the code above as, the message instance variable is never cleared. So the next getMsg will be happy to deliver the previous one. You would have to clear it when returning this one. Like I said above, in reality I have a dict not just a simple variable. def getMsg(self): with self._cv: while !self.message self._cv.wait() msg = self.message self.message = None return msg 3. If the messages come in faster than they can be processed some will be lost as they will overwrite the previous one in the self.message variable. The solution is to use a threading.Queue to transfer the messages from one thread to the other. This also saves you the hassle of doing your own synchronisation like above. If you are not familiar with synchronising multithreaded applications it is very easy to make errors and even if you are it is quite easy to do them wrong. I have been involved in distributed programming courses at university level and I have seen many errors in this area. I used a dict because the API can also be setup to be async and use callbacks, so I had to be able to access the msgs directly and quickly. Gabriel -- http://mail.python.org/mailman/listinfo/python-list
Re: Threading.Condition problem
Gabriel Rossetti gabriel.rosse...@arimaz.com (GR) wrote: GR Piet van Oostrum wrote: ... GR I wrote a small example that listens for xmpp msgs in a thread. The main GR program calls a function that blocks (using Condition.wait) until a msg GR has been received and then returns the msg. When a msg arrives, it is GR put in a variable in the thread's object, it then calls the notify() GR attr on the Condition object. For some reason, this doesn't work, the GR thread gets the msg, tries to notify the Condition object, fails because GR the lock has not been acquired yet and blocks. I tried ignoring the GR failure, thinking that since it has not been acquired yet then when it GR is, it will get the msg right away and never call Condition.wait, thus GR not causing any problems, but this does not work either. Does someone GR know what I am doing wrong? I attached the code to this msg. The code that puts the message in the variable should also acquire the lock: def onMessage(self, conn, msg): with self._cv: self.message = msg self._cv.notify() GR Thank you, that was the problem, I eventually found that A couple of remarks: 1. I think the code is neater if all manipulation with the condition is done in the same class (actually in the same instance -- making this instance into a monitor). GR The reason I didn't do that is that I don' t want the Listener to sleep, I GR maybe over simplified the example, I actually put them in a dictionary as GR they come in, so in your example, if I have several threads waiting on msgs GR it wouldn't work. I'm trying to make a webservice api thay will also be GR turned into a java .jar for people that need java. Now that I think about GR it, each session will have an instance of the object so msgs shouldn' t get GR mixed up (one connection per user), so I could block in the thread. I'll GR try your suggestion as I think it is cleaner. Sleeping as you call it is better than busy waiting. You must have some synchronisation to make it efficient. If you put the messages in a dictionary access to the dictionary must be protected. Having several threads waiting for the messages doesn't prevent you from using proper synchronisation. Maybe you must use notify_all instead of notify. -- Piet van Oostrum p...@cs.uu.nl URL: http://pietvanoostrum.com [PGP 8DAE142BE17999C4] Private email: p...@vanoostrum.org -- http://mail.python.org/mailman/listinfo/python-list
Re: Threading.Condition problem
Gabriel Rossetti gabriel.rosse...@arimaz.com (GR) wrote: GR Sorry if this appears twice, I sent it once with an attachment and it never GR arrived so maybe the attachment is posing problems. I inlined the code this GR time (at the bottom), thank you, GR Gabriel GR ## Original message GR Hello everyone, GR I wrote a small example that listens for xmpp msgs in a thread. The main GR program calls a function that blocks (using Condition.wait) until a msg GR has been received and then returns the msg. When a msg arrives, it is GR put in a variable in the thread's object, it then calls the notify() GR attr on the Condition object. For some reason, this doesn't work, the GR thread gets the msg, tries to notify the Condition object, fails because GR the lock has not been acquired yet and blocks. I tried ignoring the GR failure, thinking that since it has not been acquired yet then when it GR is, it will get the msg right away and never call Condition.wait, thus GR not causing any problems, but this does not work either. Does someone GR know what I am doing wrong? I attached the code to this msg. The code that puts the message in the variable should also acquire the lock: def onMessage(self, conn, msg): with self._cv: self.message = msg self._cv.notify() A couple of remarks: 1. I think the code is neater if all manipulation with the condition is done in the same class (actually in the same instance -- making this instance into a monitor). class Listener(Thread): def __init__(self, ws): Thread.__init__(self) self.interrupt = Event() self.message = None self._cv = Condition() self.client = ws._client self.client.RegisterHandler('message', self.onMessage) def onMessage(self, conn, msg): with self._cv: self.message = msg try: self._cv.notify() except RuntimeError: print self._cv has not acquired the lock yet def getMsg(self): with self._cv: while !self.message self._cv.wait() return self.message class WS(object): def __init__(self, username, password, res): self._jid = xmpp.protocol.JID(username) self._client = xmpp.Client(self._jid.getDomain()) #self._cv = Condition() def getMsg(self, mid=None): return self._listener.getMsg() Of course I haven't tested this code as I don't have the context modules. 2. I don't know if more than one message can be delivered in the same instance. If yes, than your code will not work, and neither will the code above as, the message instance variable is never cleared. So the next getMsg will be happy to deliver the previous one. You would have to clear it when returning this one. def getMsg(self): with self._cv: while !self.message self._cv.wait() msg = self.message self.message = None return msg 3. If the messages come in faster than they can be processed some will be lost as they will overwrite the previous one in the self.message variable. The solution is to use a threading.Queue to transfer the messages from one thread to the other. This also saves you the hassle of doing your own synchronisation like above. If you are not familiar with synchronising multithreaded applications it is very easy to make errors and even if you are it is quite easy to do them wrong. I have been involved in distributed programming courses at university level and I have seen many errors in this area. -- Piet van Oostrum p...@cs.uu.nl URL: http://pietvanoostrum.com [PGP 8DAE142BE17999C4] Private email: p...@vanoostrum.org -- http://mail.python.org/mailman/listinfo/python-list
Threading.Condition problem
Hello everyone, I wrote a small example that listens for xmpp msgs in a thread. The main program calls a function that blocks (using Condition.wait) until a msg has been received and then returns the msg. When a msg arrives, it is put in a variable in the thread's object, it then calls the notify() attr on the Condition object. For some reason, this doesn't work, the thread gets the msg, tries to notify the Condition object, fails because the lock has not been acquired yet and blocks. I tried ignoring the failure, thinking that since it has not been acquired yet then when it is, it will get the msg right away and never call Condition.wait, thus not causing any problems, but this does not work either. Does someone know what I am doing wrong? I attached the code to this msg. Thank you, Gabriel # Copyright (c) 2001-2006 Twisted Matrix Laboratories. # See LICENSE for details. import sys from twisted.internet import reactor from twisted.names.srvconnect import SRVConnector from twisted.words.xish import domish, xpath from twisted.words.protocols.jabber import xmlstream, client, jid PRESENCE = '/presence' # this is an global xpath query to use in an observer MESSAGE = '/message' # message xpath IQ = '/iq' # iq xpath class XMPPClientConnector(SRVConnector): def __init__(self, reactor, domain, factory): SRVConnector.__init__(self, reactor, 'xmpp-client', domain, factory) def pickServer(self): host, port = SRVConnector.pickServer(self) if not self.servers and not self.orderedServers: # no SRV record, fall back.. port = 5222 return host, port class Client(object): def __init__(self, client_jid, secret, dest=None): self.dest = dest f = client.XMPPClientFactory(client_jid, secret) f.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self.connected) f.addBootstrap(xmlstream.STREAM_END_EVENT, self.disconnected) f.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self.authenticated) f.addBootstrap(xmlstream.INIT_FAILED_EVENT, self.init_failed) connector = XMPPClientConnector(reactor, client_jid.host, f) connector.connect() def rawDataIn(self, buf): print RECV: %s % unicode(buf, 'utf-8').encode('ascii', 'replace') def rawDataOut(self, buf): print SEND: %s % unicode(buf, 'utf-8').encode('ascii', 'replace') def connected(self, xs): print 'Connected.' self.xmlstream = xs # Log all traffic xs.rawDataInFn = self.rawDataIn xs.rawDataOutFn = self.rawDataOut def disconnected(self, xs): print 'Disconnected.' try: reactor.stop() except: pass def authenticated(self, xs): def sendPresence(): presence = domish.Element((None, 'presence')) xs.send(presence) def sendMsg(user, res=None): msg = domish.Element((jabber:client, message)) msg[to] = %...@localhost%s % (user, / + res if res else ) body = msg.addElement(body, content = Hello world %s % res if res else ) subject = msg.addElement(subject, content = will this be displayed?) thread = msg.addElement(thread, content = this shouldn't be displayed) xs.send(msg) print Authenticated. xs.addObserver(PRESENCE, self.onPresence, 1) #xs.addObserver(IQ, self.onIq, 1) xs.addObserver(MESSAGE, self.onMessage, 1) reactor.callLater(0, sendPresence) if(self.dest): reactor.callLater(2, sendMsg, self.dest, toto) reactor.callLater(4, sendMsg, self.dest, titi) #msg = domish.Element((jabber:client, message)) #msg[to] = grosse...@localhost #body = msg.addElement(body, content = Hello world) #subject = msg.addElement(subject, content = will this be displayed?) #thread = msg.addElement(thread, content = this shouldn't be displayed) #xs.send(msg) #msg = domish.Element((jabber:client, iq)) #msg[to] = ser...@localhost #msg[id] = 666 #xs.send(msg) #msg = domish.Element((jabber:client, presence)) #msg[to] = grosse...@localhost #msg[type] = subscribe #xs.send(msg) #reactor.callLater(5, xs.sendFooter) def onMessage(self, msg): Act on the message stanza that has just been received. # return to sender #msg = create_reply(msg) #self.xmlstream.send(msg) # send the modified domish.Element pass #def onIq(self, iq): # #Act on the iq stanza that has just been received. # #iq = create_reply(iq) #self.xmlstream.send(iq) def onPresence(self, prs): Act on the presence stanza that has just been received. t =
Re: Threading.Condition problem
The previous msg w/ attached code is the wrong code, please use the code attached to this msg, thank you and sorry for this. Gabriel Gabriel Rossetti wrote: Hello everyone, I wrote a small example that listens for xmpp msgs in a thread. The main program calls a function that blocks (using Condition.wait) until a msg has been received and then returns the msg. When a msg arrives, it is put in a variable in the thread's object, it then calls the notify() attr on the Condition object. For some reason, this doesn't work, the thread gets the msg, tries to notify the Condition object, fails because the lock has not been acquired yet and blocks. I tried ignoring the failure, thinking that since it has not been acquired yet then when it is, it will get the msg right away and never call Condition.wait, thus not causing any problems, but this does not work either. Does someone know what I am doing wrong? I attached the code to this msg. Thank you, Gabriel from __future__ import with_statement import xmpp, sys from threading import Thread, Condition, Event class Listener(Thread): def __init__(self, ws): Thread.__init__(self) self.interrupt = Event() self.message = None self._cv = ws._cv self.client = ws._client self.client.RegisterHandler('message', self.onMessage) def onMessage(self, conn, msg): self.message = msg try: self._cv.notify() except RuntimeError: print self._cv has not acquired the lock yet def getMsg(self): return self.message def run(self): try: while(not self.interrupt.isSet()): self.client.Process(1) except KeyboardInterrupt: return 0 class WS(object): def __init__(self, username, password, res): self._jid = xmpp.protocol.JID(username) self._client = xmpp.Client(self._jid.getDomain()) self._cv = Condition() if(self._client.connect(server=(localhost, 5222)) == ): raise Exception(Error while connecting!) if(self._client.auth(self._jid.getNode(), password, res) is None): raise Exception(Authentication failed!) self._client.sendInitPresence() self._listener = Listener(self) self._listener.start() def getMsg(self, mid=None): with self._cv: res = self._listener.getMsg() while not res: self._cv.wait() res = self._listener.getMsg() return res if(__name__ == __main__): ws = WS(t...@localhost, 123, test) res = ws.getMsg() print I just received : %s % str(res) sys.exit(0) -- http://mail.python.org/mailman/listinfo/python-list
Threading.Condition problem
Sorry if this appears twice, I sent it once with an attachment and it never arrived so maybe the attachment is posing problems. I inlined the code this time (at the bottom), thank you, Gabriel ## Original message Hello everyone, I wrote a small example that listens for xmpp msgs in a thread. The main program calls a function that blocks (using Condition.wait) until a msg has been received and then returns the msg. When a msg arrives, it is put in a variable in the thread's object, it then calls the notify() attr on the Condition object. For some reason, this doesn't work, the thread gets the msg, tries to notify the Condition object, fails because the lock has not been acquired yet and blocks. I tried ignoring the failure, thinking that since it has not been acquired yet then when it is, it will get the msg right away and never call Condition.wait, thus not causing any problems, but this does not work either. Does someone know what I am doing wrong? I attached the code to this msg. Thank you, Gabriel Example code from __future__ import with_statement import xmpp, sys from threading import Thread, Condition, Event class Listener(Thread): def __init__(self, ws): Thread.__init__(self) self.interrupt = Event() self.message = None self._cv = ws._cv self.client = ws._client self.client.RegisterHandler('message', self.onMessage) def onMessage(self, conn, msg): self.message = msg try: self._cv.notify() except RuntimeError: print self._cv has not acquired the lock yet def getMsg(self): return self.message def run(self): try: while(not self.interrupt.isSet()): self.client.Process(1) except KeyboardInterrupt: return 0 class WS(object): def __init__(self, username, password, res): self._jid = xmpp.protocol.JID(username) self._client = xmpp.Client(self._jid.getDomain()) self._cv = Condition() if(self._client.connect(server=(localhost, 5222)) == ): raise Exception(Error while connecting!) if(self._client.auth(self._jid.getNode(), password, res) is None): raise Exception(Authentication failed!) self._client.sendInitPresence() self._listener = Listener(self) self._listener.start() def getMsg(self, mid=None): with self._cv: res = self._listener.getMsg() while not res: self._cv.wait() res = self._listener.getMsg() return res if(__name__ == __main__): ws = WS(t...@localhost, 123, test) res = ws.getMsg() print I just received : %s % str(res) sys.exit(0) -- http://mail.python.org/mailman/listinfo/python-list