Author: rhs
Date: Thu Jun 19 17:23:39 2014
New Revision: 1603963
URL: http://svn.apache.org/r1603963
Log:
implemented final events for proton-j; added tests for final events
Modified:
qpid/proton/trunk/proton-c/bindings/python/proton.py
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
qpid/proton/trunk/proton-j/src/main/resources/cengine.py
qpid/proton/trunk/tests/python/proton_tests/engine.py
Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1603963&r1=1603962&r2=1603963&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Thu Jun 19 17:23:39
2014
@@ -2178,6 +2178,16 @@ class Endpoint(object):
def __init__(self):
self.condition = None
+ self._release_invoked = False
+
+ def _release(self):
+ """Release the underlying C Engine resource."""
+ if not self._release_invoked:
+ for c in self._children:
+ c._release()
+ self._free_resource()
+ self.connection._releasing(self)
+ self._release_invoked = True
def _update_cond(self):
obj2cond(self.condition, self._get_cond_impl())
@@ -2269,13 +2279,31 @@ class Connection(Endpoint):
def __del__(self):
if hasattr(self, "_conn") and self._conn:
- # pn_connection_free will release all child sessions in the C Engine, so
- # free all child python Sessions to avoid dangling references
- if hasattr(self, "_sessions") and self._sessions:
- for s in self._sessions:
- s._release()
- pn_connection_set_context(self._conn, None)
- pn_connection_free(self._conn)
+ self._release()
+
+ def free(self):
+ self._release()
+
+ @property
+ def _children(self):
+ return self._sessions
+
+ @property
+ def connection(self):
+ return self
+
+ def _free_resource(self):
+ pn_connection_free(self._conn)
+
+ def _released(self):
+ self._conn = None
+
+ def _releasing(self, child):
+ coll = getattr(self, "_collector", None)
+ if coll:
+ coll._contexts.add(child)
+ else:
+ child._released()
def _check(self, err):
if err < 0:
@@ -2389,16 +2417,15 @@ class Session(Endpoint):
self._links = set()
self.connection._sessions.add(self)
- def _release(self):
- """Release the underlying C Engine resource."""
- if self._ssn:
- # pn_session_free will release all child links in the C Engine, so free
- # all child python Links to avoid dangling references
- for l in self._links:
- l._release()
- pn_session_set_context(self._ssn, None)
- pn_session_free(self._ssn)
- self._ssn = None
+ @property
+ def _children(self):
+ return self._links
+
+ def _free_resource(self):
+ pn_session_free(self._ssn)
+
+ def _released(self):
+ self._ssn = None
def free(self):
"""Release the Session, freeing its resources.
@@ -2490,16 +2517,15 @@ class Link(Endpoint):
self._deliveries = set()
self.session._links.add(self)
- def _release(self):
- """Release the underlying C Engine resource."""
- if self._link:
- # pn_link_free will settle all child deliveries in the C Engine, so free
- # all child python deliveries to avoid dangling references
- for d in self._deliveries:
- d._release()
- pn_link_set_context(self._link, None)
- pn_link_free(self._link)
- self._link = None
+ @property
+ def _children(self):
+ return self._deliveries
+
+ def _free_resource(self):
+ pn_link_free(self._link)
+
+ def _released(self):
+ self._link = None
def free(self):
"""Release the Link, freeing its resources"""
@@ -2549,6 +2575,10 @@ class Link(Endpoint):
def session(self):
return Session._wrap_session(pn_link_session(self._link))
+ @property
+ def connection(self):
+ return self.session.connection
+
def delivery(self, tag):
return Delivery._wrap_delivery(pn_delivery(self._link, tag))
@@ -3278,6 +3308,7 @@ class Collector:
def __init__(self):
self._impl = pn_collector()
+ self._contexts = set()
def peek(self):
event = pn_collector_peek(self._impl)
@@ -3298,6 +3329,9 @@ class Collector:
transport=tp)
def pop(self):
+ ev = self.peek()
+ if ev is not None:
+ ev._popped(self)
pn_collector_pop(self._impl)
def __del__(self):
@@ -3333,6 +3367,19 @@ class Event:
self.delivery = delivery
self.transport = transport
+ def _popped(self, collector):
+ if self.type == Event.LINK_FINAL:
+ ctx = self.link
+ elif self.type == Event.SESSION_FINAL:
+ ctx = self.session
+ elif self.type == Event.CONNECTION_FINAL:
+ ctx = self.connection
+ else:
+ return
+
+ collector._contexts.remove(ctx)
+ ctx._released()
+
def __repr__(self):
objects = [self.connection, self.session, self.link, self.delivery,
self.transport]
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java?rev=1603963&r1=1603962&r2=1603963&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java
(original)
+++
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java
Thu Jun 19 17:23:39 2014
@@ -81,6 +81,7 @@ public interface Transport extends Endpo
public int END_OF_STREAM = -1;
public void bind(Connection connection);
+ public void unbind();
public int capacity();
public ByteBuffer tail();
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java?rev=1603963&r1=1603962&r2=1603963&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
(original)
+++
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
Thu Jun 19 17:23:39 2014
@@ -182,16 +182,30 @@ public class ConnectionImpl extends Endp
return this;
}
- public void free()
- {
- super.free();
- for(Session session : _sessions)
- {
+ @Override
+ void postFinal() {
+ put(Event.Type.CONNECTION_FINAL, this);
+ }
+
+ @Override
+ void doFree() {
+ for(Session session : _sessions) {
session.free();
}
_sessions = null;
}
+ void modifyEndpoints() {
+ if (_sessions != null) {
+ for (SessionImpl ssn: _sessions) {
+ ssn.modifyEndpoints();
+ }
+ }
+ if (!freed) {
+ modified();
+ }
+ }
+
void handleOpen(Open open)
{
// TODO - store state
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java?rev=1603963&r1=1603962&r2=1603963&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java
(original)
+++
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java
Thu Jun 19 17:23:39 2014
@@ -37,6 +37,24 @@ public abstract class EndpointImpl imple
private EndpointImpl _transportPrev;
private Object _context;
+ private int refcount = 1;
+ boolean freed = false;
+
+ void incref() {
+ refcount++;
+ }
+
+ void decref() {
+ refcount--;
+ if (refcount == 0) {
+ postFinal();
+ } else if (refcount < 0) {
+ throw new IllegalStateException();
+ }
+ }
+
+ abstract void postFinal();
+
protected abstract void localStateChanged();
public void open()
@@ -162,16 +180,20 @@ public abstract class EndpointImpl imple
return _transportPrev;
}
- public void free()
+ abstract void doFree();
+
+ final public void free()
{
- if(_transportNext != null)
- {
- _transportNext.setTransportPrev(_transportPrev);
- }
- if(_transportPrev != null)
- {
- _transportPrev.setTransportNext(_transportNext);
+ if (freed) return;
+ freed = true;
+
+ doFree();
+
+ if (_localState == EndpointState.ACTIVE) {
+ close();
}
+
+ decref();
}
void setTransportNext(EndpointImpl transportNext)
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java?rev=1603963&r1=1603962&r2=1603963&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
(original)
+++
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
Thu Jun 19 17:23:39 2014
@@ -76,6 +76,8 @@ class EventImpl implements Event
switch (type) {
case CONNECTION_REMOTE_STATE:
case CONNECTION_LOCAL_STATE:
+ case CONNECTION_INIT:
+ case CONNECTION_FINAL:
return (Connection) context;
case TRANSPORT:
Transport transport = getTransport();
@@ -97,6 +99,8 @@ class EventImpl implements Event
switch (type) {
case SESSION_REMOTE_STATE:
case SESSION_LOCAL_STATE:
+ case SESSION_INIT:
+ case SESSION_FINAL:
return (Session) context;
default:
Link link = getLink();
@@ -112,6 +116,8 @@ class EventImpl implements Event
switch (type) {
case LINK_REMOTE_STATE:
case LINK_LOCAL_STATE:
+ case LINK_INIT:
+ case LINK_FINAL:
case LINK_FLOW:
return (Link) context;
default:
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java?rev=1603963&r1=1603962&r2=1603963&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
(original)
+++
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
Thu Jun 19 17:23:39 2014
@@ -53,12 +53,13 @@ public abstract class LinkImpl extends E
private ReceiverSettleMode _remoteReceiverSettleMode;
- private final LinkNode<LinkImpl> _node;
+ private LinkNode<LinkImpl> _node;
private boolean _drain;
LinkImpl(SessionImpl session, String name)
{
_session = session;
+ _session.incref();
_name = name;
ConnectionImpl conn = session.getConnectionImpl();
_node = conn.addLinkEndpoint(this);
@@ -105,11 +106,21 @@ public abstract class LinkImpl extends E
}
}
- public void free()
+ @Override
+ void postFinal() {
+ _session.getConnectionImpl().put(Event.Type.LINK_FINAL, this);
+ _session.decref();
+ }
+
+ @Override
+ void doFree()
{
- super.free();
_session.getConnectionImpl().removeLinkEndpoint(_node);
- //TODO.
+ _node = null;
+ }
+
+ void modifyEndpoints() {
+ modified();
}
public void remove(DeliveryImpl delivery)
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java?rev=1603963&r1=1603962&r2=1603963&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
(original)
+++
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
Thu Jun 19 17:23:39 2014
@@ -96,12 +96,11 @@ public class ReceiverImpl extends LinkIm
return consumed;
}
- public void free()
+ @Override
+ void doFree()
{
getSession().freeReceiver(this);
-
- super.free();
- //TODO.
+ super.doFree();
}
boolean hasIncoming()
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java?rev=1603963&r1=1603962&r2=1603963&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
(original)
+++
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
Thu Jun 19 17:23:39 2014
@@ -63,11 +63,11 @@ public class SenderImpl extends LinkImp
//TODO.
}
- public void free()
+ @Override
+ void doFree()
{
getSession().freeSender(this);
- super.free();
-
+ super.doFree();
}
@Override
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java?rev=1603963&r1=1603962&r2=1603963&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
(original)
+++
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
Thu Jun 19 17:23:39 2014
@@ -45,6 +45,7 @@ public class SessionImpl extends Endpoin
SessionImpl(ConnectionImpl connection)
{
_connection = connection;
+ _connection.incref();
_node = _connection.addSessionEndpoint(this);
_connection.put(Event.Type.SESSION_INIT, this);
}
@@ -91,25 +92,38 @@ public class SessionImpl extends Endpoin
return getConnectionImpl();
}
- public void free()
- {
- super.free();
+ @Override
+ void postFinal() {
+ _connection.put(Event.Type.SESSION_FINAL, this);
+ _connection.decref();
+ }
+ @Override
+ void doFree() {
_connection.removeSessionEndpoint(_node);
_node = null;
- for(SenderImpl sender : _senders.values())
- {
+ for(SenderImpl sender : _senders.values()) {
sender.free();
}
_senders.clear();
- for(ReceiverImpl receiver : _receivers.values())
- {
+ for(ReceiverImpl receiver : _receivers.values()) {
receiver.free();
}
_receivers.clear();
}
+ void modifyEndpoints() {
+ for (SenderImpl snd : _senders.values()) {
+ snd.modifyEndpoints();
+ }
+
+ for (ReceiverImpl rcv : _receivers.values()) {
+ rcv.modifyEndpoints();
+ }
+ modified();
+ }
+
TransportSession getTransportSession()
{
return _transportSession;
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1603963&r1=1603962&r2=1603963&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
(original)
+++
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
Thu Jun 19 17:23:39 2014
@@ -193,9 +193,10 @@ public class TransportImpl extends Endpo
@Override
public void bind(Connection conn)
{
- // TODO - check if already bound
- ((ConnectionImpl) conn).setTransport(this);
_connectionEndpoint = (ConnectionImpl) conn;
+ // TODO - check if already bound
+ _connectionEndpoint.setTransport(this);
+ _connectionEndpoint.incref();
if(getRemoteState() != EndpointState.UNINITIALIZED)
{
@@ -210,6 +211,23 @@ public class TransportImpl extends Endpo
}
@Override
+ public void unbind()
+ {
+ _connectionEndpoint.modifyEndpoints();
+
+ _connectionEndpoint.setTransport(null);
+ _connectionEndpoint.decref();
+
+ for (TransportSession ts: _transportSessionState.values()) {
+ ts.unbind();
+ }
+
+ for (TransportLink tl: _transportLinkState.values()) {
+ tl.unbind();
+ }
+ }
+
+ @Override
public int input(byte[] bytes, int offset, int length)
{
oldApiCheckStateBeforeInput(length).checkIsOk();
@@ -911,10 +929,10 @@ public class TransportImpl extends Endpo
}
@Override
- public void free()
- {
- super.free();
- }
+ void postFinal() {}
+
+ @Override
+ void doFree() { }
//==================================================================================================================
// handle incoming amqp data
@@ -1095,6 +1113,7 @@ public class TransportImpl extends Endpo
LinkImpl link = transportLink.getLink();
transportLink.receivedDetach();
transportSession.freeRemoteHandle(transportLink.getRemoteHandle());
+ transportLink.clearRemoteHandle();
link.setRemoteState(EndpointState.CLOSED);
if(detach.getError() != null)
{
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java?rev=1603963&r1=1603962&r2=1603963&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java
(original)
+++
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java
Thu Jun 19 17:23:39 2014
@@ -51,6 +51,12 @@ class TransportLink<T extends LinkImpl>
: new TransportSender((SenderImpl)link));
}
+ void unbind()
+ {
+ clearLocalHandle();
+ clearRemoteHandle();
+ }
+
public UnsignedInteger getLocalHandle()
{
return _localHandle;
@@ -58,6 +64,9 @@ class TransportLink<T extends LinkImpl>
public void setLocalHandle(UnsignedInteger localHandle)
{
+ if (_localHandle == null) {
+ _link.incref();
+ }
_localHandle = localHandle;
}
@@ -78,6 +87,9 @@ class TransportLink<T extends LinkImpl>
public void clearLocalHandle()
{
+ if (_localHandle != null) {
+ _link.decref();
+ }
_localHandle = null;
}
@@ -88,9 +100,20 @@ class TransportLink<T extends LinkImpl>
public void setRemoteHandle(UnsignedInteger remoteHandle)
{
+ if (_remoteHandle == null) {
+ _link.incref();
+ }
_remoteHandle = remoteHandle;
}
+ public void clearRemoteHandle()
+ {
+ if (_remoteHandle != null) {
+ _link.decref();
+ }
+ _remoteHandle = null;
+ }
+
public UnsignedInteger getDeliveryCount()
{
return _deliveryCount;
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java?rev=1603963&r1=1603962&r2=1603963&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
(original)
+++
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
Thu Jun 19 17:23:39 2014
@@ -69,6 +69,12 @@ class TransportSession
_session = session;
}
+ void unbind()
+ {
+ unsetLocalChannel();
+ unsetRemoteChannel();
+ }
+
public SessionImpl getSession()
{
return _session;
@@ -81,6 +87,9 @@ class TransportSession
public void setLocalChannel(int localChannel)
{
+ if (!isLocalChannelSet()) {
+ _session.incref();
+ }
_localChannel = localChannel;
}
@@ -91,6 +100,9 @@ class TransportSession
public void setRemoteChannel(int remoteChannel)
{
+ if (!isRemoteChannelSet()) {
+ _session.incref();
+ }
_remoteChannel = remoteChannel;
}
@@ -116,11 +128,17 @@ class TransportSession
public void unsetLocalChannel()
{
+ if (isLocalChannelSet()) {
+ _session.decref();
+ }
_localChannel = -1;
}
public void unsetRemoteChannel()
{
+ if (isRemoteChannelSet()) {
+ _session.decref();
+ }
_remoteChannel = -1;
}
@@ -262,7 +280,7 @@ class TransportSession
_unsettledIncomingDeliveriesById.put(_incomingDeliveryId,
delivery);
getSession().incrementIncomingDeliveries(1);
}
- if( transfer.getState()!=null )
+ if( transfer.getState()!=null )
{
delivery.setRemoteDeliveryState(transfer.getState());
}
@@ -313,7 +331,7 @@ class TransportSession
public void freeLocalChannel()
{
- _localChannel = -1;
+ unsetLocalChannel();
}
private void setRemoteIncomingWindow(UnsignedInteger incomingWindow)
Modified: qpid/proton/trunk/proton-j/src/main/resources/cengine.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/resources/cengine.py?rev=1603963&r1=1603962&r2=1603963&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/resources/cengine.py (original)
+++ qpid/proton/trunk/proton-j/src/main/resources/cengine.py Thu Jun 19
17:23:39 2014
@@ -244,6 +244,9 @@ def pn_connection_close(conn):
conn.on_close()
conn.impl.close()
+def pn_connection_free(conn):
+ conn.impl.free()
+
class pn_session_wrapper(endpoint_wrapper):
pass
@@ -325,7 +328,7 @@ def pn_receiver(ssn, name):
return wrap(ssn.impl.receiver(name), pn_link_wrapper)
def pn_session_free(ssn):
- ssn.impl = None
+ ssn.impl.free()
TERMINUS_TYPES_J2P = {
Source: PN_SOURCE,
@@ -652,7 +655,7 @@ def pn_link_current(link):
return wrap(link.impl.current(), pn_delivery_wrapper)
def pn_link_free(link):
- link.impl = None
+ link.impl.free()
def pn_work_head(conn):
return wrap(conn.impl.getWorkHead(), pn_delivery_wrapper)
@@ -877,6 +880,10 @@ def pn_transport_bind(trans, conn):
trans.impl.bind(conn.impl)
return 0
+def pn_transport_unbind(trans):
+ trans.impl.unbind()
+ return 0
+
def pn_transport_trace(trans, n):
# XXX
pass
Modified: qpid/proton/trunk/tests/python/proton_tests/engine.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/engine.py?rev=1603963&r1=1603962&r2=1603963&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/engine.py Thu Jun 19 17:23:39
2014
@@ -389,7 +389,7 @@ class LinkTest(Test):
def teardown(self):
self.cleanup()
gc.collect()
- assert not gc.garbage
+ assert not gc.garbage, gc.garbage
def test_open_close(self):
assert self.snd.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT
@@ -2071,49 +2071,62 @@ class DeliveryTest(Test):
class EventTest(Test):
+ def setup(self):
+ self.collector = Collector()
+
def teardown(self):
self.cleanup()
+ self.drain()
- def list(self, collector):
+ def drain(self):
result = []
while True:
- e = collector.peek()
+ e = self.collector.peek()
if e:
result.append(e)
- collector.pop()
+ self.collector.pop()
else:
break
return result
- def expect(self, collector, *types):
- events = self.list(collector)
- assert types == tuple([e.type for e in events]), (types, events)
- if len(events) == 1:
- return events[0]
- elif len(events) > 1:
- return events
+ def expect(self, *types):
+ return self.expect_oneof(types)
+
+ def expect_oneof(self, *sequences):
+ events = self.drain()
+ types = tuple([e.type for e in events])
+
+ for alternative in sequences:
+ if types == alternative:
+ if len(events) == 1:
+ return events[0]
+ elif len(events) > 1:
+ return events
+ else:
+ return
+
+ assert False, "actual events %s did not match any of the expected
sequences: %s" % (events, sequences)
def testEndpointEvents(self):
c1, c2 = self.connection()
- coll = Collector()
- c1.collect(coll)
- self.expect(coll, Event.CONNECTION_INIT)
+ c1.collect(self.collector)
+ self.expect(Event.CONNECTION_INIT)
self.pump()
- self.expect(coll)
+ self.expect()
c2.open()
self.pump()
- self.expect(coll, Event.CONNECTION_REMOTE_STATE)
+ self.expect(Event.CONNECTION_REMOTE_STATE)
self.pump()
- self.expect(coll)
+ self.expect()
ssn = c2.session()
snd = ssn.sender("sender")
ssn.open()
snd.open()
- self.expect(coll)
+ self.expect()
self.pump()
- self.expect(coll, Event.SESSION_INIT, Event.SESSION_REMOTE_STATE,
+ self.expect(Event.SESSION_INIT, Event.SESSION_REMOTE_STATE,
Event.LINK_INIT, Event.LINK_REMOTE_STATE)
c1.open()
@@ -2122,66 +2135,113 @@ class EventTest(Test):
rcv = ssn2.receiver("receiver")
rcv.open()
self.pump()
- self.expect(coll,
- Event.CONNECTION_LOCAL_STATE,
- Event.TRANSPORT,
- Event.SESSION_INIT,
- Event.SESSION_LOCAL_STATE,
- Event.TRANSPORT,
- Event.LINK_INIT,
- Event.LINK_LOCAL_STATE,
+ self.expect(Event.CONNECTION_LOCAL_STATE, Event.TRANSPORT,
+ Event.SESSION_INIT, Event.SESSION_LOCAL_STATE,
+ Event.TRANSPORT, Event.LINK_INIT, Event.LINK_LOCAL_STATE,
Event.TRANSPORT)
+ rcv.close()
+ self.expect(Event.LINK_LOCAL_STATE, Event.TRANSPORT)
+ self.pump()
+ rcv.free()
+ del rcv
+ self.expect(Event.LINK_FINAL)
+ ssn2.free()
+ del ssn2
+ self.expect(Event.SESSION_LOCAL_STATE, Event.TRANSPORT)
+ self.pump()
+ self.expect(Event.SESSION_FINAL)
+ c1.free()
+ c1._transport.unbind()
+ self.expect(Event.CONNECTION_LOCAL_STATE, Event.TRANSPORT,
+ Event.LINK_FINAL, Event.SESSION_FINAL, Event.CONNECTION_FINAL)
+
+ def testConnectionINIT_FINAL(self):
+ c = Connection()
+ c.collect(self.collector)
+ self.expect(Event.CONNECTION_INIT)
+ c.free()
+ self.expect(Event.CONNECTION_FINAL)
+
+ def testSessionINIT_FINAL(self):
+ c = Connection()
+ c.collect(self.collector)
+ self.expect(Event.CONNECTION_INIT)
+ s = c.session()
+ self.expect(Event.SESSION_INIT)
+ s.free()
+ self.expect(Event.SESSION_FINAL)
+ c.free()
+ self.expect(Event.CONNECTION_FINAL)
+
+ def testLinkINIT_FINAL(self):
+ c = Connection()
+ c.collect(self.collector)
+ self.expect(Event.CONNECTION_INIT)
+ s = c.session()
+ self.expect(Event.SESSION_INIT)
+ r = s.receiver("asdf")
+ self.expect(Event.LINK_INIT)
+ r.free()
+ self.expect(Event.LINK_FINAL)
+ c.free()
+ self.expect(Event.SESSION_FINAL, Event.CONNECTION_FINAL)
+
def testFlowEvents(self):
snd, rcv = self.link("test-link")
- coll = Collector()
- snd.session.connection.collect(coll)
+ snd.session.connection.collect(self.collector)
rcv.open()
rcv.flow(10)
self.pump()
- self.expect(coll, Event.CONNECTION_INIT, Event.SESSION_INIT,
+ self.expect(Event.CONNECTION_INIT, Event.SESSION_INIT,
Event.LINK_INIT, Event.LINK_REMOTE_STATE, Event.LINK_FLOW)
rcv.flow(10)
self.pump()
- self.expect(coll, Event.LINK_FLOW)
- return snd, rcv, coll
+ self.expect(Event.LINK_FLOW)
+ return snd, rcv
def testDeliveryEvents(self):
snd, rcv = self.link("test-link")
- coll = Collector()
- rcv.session.connection.collect(coll)
+ rcv.session.connection.collect(self.collector)
rcv.open()
rcv.flow(10)
self.pump()
- self.expect(coll, Event.CONNECTION_INIT, Event.SESSION_INIT,
+ self.expect(Event.CONNECTION_INIT, Event.SESSION_INIT,
Event.LINK_INIT, Event.LINK_LOCAL_STATE, Event.TRANSPORT,
Event.TRANSPORT)
snd.delivery("delivery")
snd.send("Hello World!")
snd.advance()
self.pump()
- self.expect(coll)
+ self.expect()
snd.open()
self.pump()
- self.expect(coll, Event.LINK_REMOTE_STATE, Event.DELIVERY)
+ self.expect(Event.LINK_REMOTE_STATE, Event.DELIVERY)
+ rcv.session.connection._transport.unbind()
+ rcv.session.connection.free()
+ self.expect_oneof((Event.TRANSPORT, Event.TRANSPORT, Event.TRANSPORT,
Event.LINK_LOCAL_STATE,
+ Event.SESSION_LOCAL_STATE,
Event.CONNECTION_LOCAL_STATE, Event.LINK_FINAL,
+ Event.SESSION_FINAL, Event.CONNECTION_FINAL),
+ (Event.TRANSPORT, Event.TRANSPORT, Event.TRANSPORT,
Event.LINK_LOCAL_STATE,
+ Event.LINK_FINAL, Event.SESSION_LOCAL_STATE,
Event.SESSION_FINAL,
+ Event.CONNECTION_LOCAL_STATE, Event.CONNECTION_FINAL))
def testDeliveryEventsDisp(self):
- snd, rcv, coll = self.testFlowEvents()
+ snd, rcv = self.testFlowEvents()
snd.open()
dlv = snd.delivery("delivery")
snd.send("Hello World!")
assert snd.advance()
- self.expect(coll,
- Event.LINK_LOCAL_STATE,
+ self.expect(Event.LINK_LOCAL_STATE,
Event.TRANSPORT,
Event.TRANSPORT,
Event.TRANSPORT)
self.pump()
- self.expect(coll)
+ self.expect()
rdlv = rcv.current
assert rdlv != None
assert rdlv.tag == "delivery"
rdlv.update(Delivery.ACCEPTED)
self.pump()
- event = self.expect(coll, Event.DELIVERY)
+ event = self.expect(Event.DELIVERY)
assert event.delivery == dlv
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]