Author: rhs Date: Wed Sep 2 15:35:42 2009 New Revision: 810573 URL: http://svn.apache.org/viewvc?rev=810573&view=rev Log: changed Lockable -> Waiter and switched its usage from has-a to is-a; also fixed some more imports
Added: qpid/trunk/qpid/python/qpid/concurrency.py (contents, props changed) - copied, changed from r810508, qpid/trunk/qpid/python/qpid/lockable.py Removed: qpid/trunk/qpid/python/qpid/lockable.py Modified: qpid/trunk/qpid/python/qpid/driver.py qpid/trunk/qpid/python/qpid/messaging.py Copied: qpid/trunk/qpid/python/qpid/concurrency.py (from r810508, qpid/trunk/qpid/python/qpid/lockable.py) URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/concurrency.py?p2=qpid/trunk/qpid/python/qpid/concurrency.py&p1=qpid/trunk/qpid/python/qpid/lockable.py&r1=810508&r2=810573&rev=810573&view=diff ============================================================================== --- qpid/trunk/qpid/python/qpid/lockable.py (original) +++ qpid/trunk/qpid/python/qpid/concurrency.py Wed Sep 2 15:35:42 2009 @@ -26,11 +26,11 @@ exec """ def %s%s: %s - %s.lock() + %s._lock.acquire() try: return meth%s finally: - %s.unlock() + %s._lock.release() """ % (meth.__name__, inspect.formatargspec(args, vargs, kwargs, defs), repr(inspect.getdoc(meth)), args[0], inspect.formatargspec(args, vargs, kwargs, defs, @@ -38,13 +38,10 @@ args[0]) in scope return scope[meth.__name__] -class Lockable(object): +class Waiter(object): - def lock(self): - self._lock.acquire() - - def unlock(self): - self._lock.release() + def __init__(self, condition): + self.condition = condition def wait(self, predicate, timeout=None): passed = 0 @@ -53,16 +50,16 @@ if timeout is None: # using the timed wait prevents keyboard interrupts from being # blocked while waiting - self._condition.wait(3) + self.condition.wait(3) elif passed < timeout: - self._condition.wait(timeout - passed) + self.condition.wait(timeout - passed) else: return False passed = time.time() - start return True def notify(self): - self._condition.notify() + self.condition.notify() def notifyAll(self): - self._condition.notifyAll() + self.condition.notifyAll() Propchange: qpid/trunk/qpid/python/qpid/concurrency.py ------------------------------------------------------------------------------ svn:mergeinfo = Modified: qpid/trunk/qpid/python/qpid/driver.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/driver.py?rev=810573&r1=810572&r2=810573&view=diff ============================================================================== --- qpid/trunk/qpid/python/qpid/driver.py (original) +++ qpid/trunk/qpid/python/qpid/driver.py Wed Sep 2 15:35:42 2009 @@ -18,11 +18,11 @@ # import compat, connection, socket, sys, time +from concurrency import synchronized from datatypes import RangedSet, Message as Message010 from exceptions import Timeout -from lockable import synchronized, Lockable from logging import getLogger -from messaging import get_codec, Message, Pattern, UNLIMITED +from messaging import get_codec, ConnectError, Message, Pattern, UNLIMITED from ops import delivery_mode from session import Client, INCOMPLETE, SessionDetached from threading import Condition, Thread @@ -63,12 +63,11 @@ return handler._message_transfer(session, cmd) return Delegate -class Driver(Lockable): +class Driver: def __init__(self, connection): self.connection = connection self._lock = self.connection._lock - self._condition = self.connection._condition self._wakeup_cond = Condition() self._socket = None self._conn = None @@ -134,7 +133,7 @@ self.connection.error = (msg,) self._modcount = modcount - self.notifyAll() + self.connection._waiter.notifyAll() def connect(self): if self._conn is not None: @@ -177,7 +176,7 @@ _ssn.auto_sync = False _ssn.invoke_lock = self._lock _ssn.lock = self._lock - _ssn.condition = self._condition + _ssn.condition = self.connection._condition if ssn.transactional: # XXX: adding an attribute to qpid.session.Session _ssn.acked = [] @@ -422,7 +421,7 @@ rcv.received += 1 log.debug("RECV [%s] %s", ssn, msg) ssn.incoming.append(msg) - self.notifyAll() + self.connection._waiter.notifyAll() return INCOMPLETE def _decode(self, message): Modified: qpid/trunk/qpid/python/qpid/messaging.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=810573&r1=810572&r2=810573&view=diff ============================================================================== --- qpid/trunk/qpid/python/qpid/messaging.py (original) +++ qpid/trunk/qpid/python/qpid/messaging.py Wed Sep 2 15:35:42 2009 @@ -31,8 +31,8 @@ """ from codec010 import StringCodec +from concurrency import synchronized, Waiter from datatypes import timestamp, uuid4, Serial -from lockable import synchronized, Lockable from logging import getLogger from ops import PRIMITIVE from threading import Thread, RLock, Condition @@ -69,7 +69,7 @@ """ pass -class Connection(Lockable): +class Connection: """ A Connection manages a group of L{Sessions<Session>} and connects @@ -114,19 +114,23 @@ self._connected = False self._lock = RLock() self._condition = Condition(self._lock) + self._waiter = Waiter(self._condition) self._modcount = Serial(0) self.error = None from driver import Driver self._driver = Driver(self) self._driver.start() + def _wait(self, predicate, timeout=None): + return self._waiter.wait(predicate, timeout=timeout) + def _wakeup(self): self._modcount += 1 self._driver.wakeup() def _catchup(self, exc=ConnectionError): mc = self._modcount - self.wait(lambda: not self._driver._modcount < mc) + self._wait(lambda: not self._driver._modcount < mc) self._check_error(exc) def _check_error(self, exc=ConnectionError): @@ -134,7 +138,7 @@ raise exc(*self.error) def _ewait(self, predicate, timeout=None, exc=ConnectionError): - result = self.wait(lambda: self.error or predicate(), timeout) + result = self._wait(lambda: self.error or predicate(), timeout) self._check_error(exc) return result @@ -255,7 +259,7 @@ class TransactionAborted(SessionError): pass -class Session(Lockable): +class Session: """ Sessions provide a linear context for sending and receiving @@ -287,7 +291,6 @@ self.closed = False self._lock = connection._lock - self._condition = connection._condition self.thread = Thread(target = self.run) self.thread.setDaemon(True) self.thread.start() @@ -295,6 +298,9 @@ def __repr__(self): return "<Session %s>" % self.name + def _wait(self, predicate, timeout=None): + return self.connection._wait(predicate, timeout=timeout) + def _wakeup(self): self.connection._wakeup() @@ -369,8 +375,8 @@ @synchronized def _get(self, predicate, timeout=None): - if self.wait(lambda: ((self._peek(predicate) is not None) or self.closing), - timeout): + if self._wait(lambda: ((self._peek(predicate) is not None) or self.closing), + timeout): msg = self._pop(predicate) if msg is not None: msg._receiver.returned += 1 @@ -454,7 +460,7 @@ for rcv in self.receivers: rcv.stop() # TODO: think about stopping individual receivers in listen mode - self.wait(lambda: self._peek(self._pred) is None) + self._wait(lambda: self._peek(self._pred) is None) self.started = False def _pred(self, m): @@ -470,10 +476,10 @@ else: msg._receiver.listener(msg) if self._peek(self._pred) is None: - self.notifyAll() + self.connection._waiter.notifyAll() finally: self.closed = True - self.notifyAll() + self.connection._waiter.notifyAll() @synchronized def close(self): @@ -486,7 +492,7 @@ self.closing = True self._wakeup() self._catchup() - self.wait(lambda: self.closed) + self._wait(lambda: self.closed) while self.thread.isAlive(): self.thread.join(3) self.thread = None @@ -500,7 +506,7 @@ class InsufficientCapacity(SendError): pass -class Sender(Lockable): +class Sender: """ Sends outgoing messages. @@ -515,7 +521,6 @@ self.acked = Serial(0) self.closed = False self._lock = self.session._lock - self._condition = self.session._condition def _wakeup(self): self.session._wakeup() @@ -598,7 +603,7 @@ """ pass -class Receiver(Lockable): +class Receiver: """ Receives incoming messages from a remote source. Messages may be @@ -625,7 +630,6 @@ self.closed = False self.listener = None self._lock = self.session._lock - self._condition = self.session._condition def _wakeup(self): self.session._wakeup() --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org