Author: gsim Date: Fri Oct 10 12:41:20 2014 New Revision: 1630787 URL: http://svn.apache.org/r1630787 Log: PROTON-641: fixed link cleanup on connection/session abort; made free not close automatically
Modified: qpid/proton/branches/examples/proton-c/src/engine/engine.c qpid/proton/branches/examples/proton-c/src/transport/transport.c qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java qpid/proton/branches/examples/tests/python/proton_tests/engine.py Modified: qpid/proton/branches/examples/proton-c/src/engine/engine.c URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/src/engine/engine.c?rev=1630787&r1=1630786&r2=1630787&view=diff ============================================================================== --- qpid/proton/branches/examples/proton-c/src/engine/engine.c (original) +++ qpid/proton/branches/examples/proton-c/src/engine/engine.c Fri Oct 10 12:41:20 2014 @@ -111,8 +111,6 @@ void pn_endpoint_tini(pn_endpoint_t *end void pn_connection_free(pn_connection_t *connection) { assert(!connection->endpoint.freed); - if (pn_connection_state(connection) & PN_LOCAL_ACTIVE) - pn_connection_close(connection); // free those endpoints that haven't been freed by the application LL_REMOVE(connection, endpoint, &connection->endpoint); while (connection->endpoint_head) { @@ -226,8 +224,6 @@ void pn_session_free(pn_session_t *sessi pn_link_t *link = (pn_link_t *)pn_list_get(session->links, 0); pn_link_free(link); } - if (pn_session_state(session) & PN_LOCAL_ACTIVE) - pn_session_close(session); pn_remove_session(session->connection, session); pn_endpoint_t *endpoint = (pn_endpoint_t *) session; LL_REMOVE(pn_ep_get_connection(endpoint), endpoint, endpoint); @@ -282,8 +278,6 @@ void pn_terminus_free(pn_terminus_t *ter void pn_link_free(pn_link_t *link) { assert(!link->endpoint.freed); - if (pn_link_state(link) & PN_LOCAL_ACTIVE) - pn_link_close(link); pn_remove_link(link->session, link); pn_endpoint_t *endpoint = (pn_endpoint_t *) link; LL_REMOVE(pn_ep_get_connection(endpoint), endpoint, endpoint); Modified: qpid/proton/branches/examples/proton-c/src/transport/transport.c URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/src/transport/transport.c?rev=1630787&r1=1630786&r2=1630787&view=diff ============================================================================== --- qpid/proton/branches/examples/proton-c/src/transport/transport.c (original) +++ qpid/proton/branches/examples/proton-c/src/transport/transport.c Fri Oct 10 12:41:20 2014 @@ -183,14 +183,20 @@ pn_session_t *pn_channel_state(pn_transp return (pn_session_t *) pn_hash_get(transport->remote_channels, channel); } -static void pn_map_channel(pn_transport_t *transport, uint16_t channel, pn_session_t *session) +static void pni_map_remote_channel(pn_session_t *session, uint16_t channel) { + pn_transport_t *transport = session->connection->transport; pn_hash_put(transport->remote_channels, channel, session); session->state.remote_channel = channel; } -void pn_unmap_channel(pn_transport_t *transport, pn_session_t *ssn) +void pni_transport_unbind_handles(pn_hash_t *handles); + +static void pni_unmap_remote_channel(pn_session_t *ssn) { + // XXX: should really update link state also + pni_transport_unbind_handles(ssn->state.remote_handles); + pn_transport_t *transport = ssn->connection->transport; uint16_t channel = ssn->state.remote_channel; ssn->state.remote_channel = -2; // note: may free the session: @@ -331,18 +337,18 @@ pn_error_t *pn_transport_error(pn_transp return NULL; } -static void pn_map_handle(pn_session_t *ssn, uint32_t handle, pn_link_t *link) +static void pni_map_remote_handle(pn_link_t *link, uint32_t handle) { link->state.remote_handle = handle; - pn_hash_put(ssn->state.remote_handles, handle, link); + pn_hash_put(link->session->state.remote_handles, handle, link); } -void pn_unmap_handle(pn_session_t *ssn, pn_link_t *link) +static void pni_unmap_remote_handle(pn_link_t *link) { - uint32_t handle = link->state.remote_handle; + uintptr_t handle = link->state.remote_handle; link->state.remote_handle = -2; // may delete link: - pn_hash_del(ssn->state.remote_handles, handle); + pn_hash_del(link->session->state.remote_handles, handle); } pn_link_t *pn_handle_state(pn_session_t *ssn, uint32_t handle) @@ -504,7 +510,7 @@ int pn_do_begin(pn_dispatcher_t *disp) ssn = pn_session(transport->connection); } ssn->state.incoming_transfer_count = next; - pn_map_channel(transport, disp->channel, ssn); + pni_map_remote_channel(ssn, disp->channel); PN_SET_REMOTE(ssn->endpoint.state, PN_REMOTE_ACTIVE); pn_collector_put(transport->connection->collector, PN_SESSION_REMOTE_OPEN, ssn); return 0; @@ -622,7 +628,7 @@ int pn_do_attach(pn_dispatcher_t *disp) free(strheap); } - pn_map_handle(ssn, handle, link); + pni_map_remote_handle(link, handle); PN_SET_REMOTE(link->endpoint.state, PN_REMOTE_ACTIVE); pn_terminus_t *rsrc = &link->remote_source; if (source.start || src_dynamic) { @@ -941,7 +947,7 @@ int pn_do_detach(pn_dispatcher_t *disp) // TODO: implement } - pn_unmap_handle(ssn, link); + pni_unmap_remote_handle(link); return 0; } @@ -953,7 +959,7 @@ int pn_do_end(pn_dispatcher_t *disp) if (err) return err; PN_SET_REMOTE(ssn->endpoint.state, PN_REMOTE_CLOSED); pn_collector_put(transport->connection->collector, PN_SESSION_REMOTE_CLOSE, ssn); - pn_unmap_channel(transport, ssn); + pni_unmap_remote_channel(ssn); return 0; } @@ -1195,6 +1201,15 @@ size_t pn_session_incoming_window(pn_ses } } +static void pni_map_local_channel(pn_session_t *ssn) +{ + pn_transport_t *transport = ssn->connection->transport; + pn_session_state_t *state = &ssn->state; + uint16_t channel = allocate_alias(transport->local_channels); + state->local_channel = channel; + pn_hash_put(transport->local_channels, channel, ssn); +} + int pn_process_ssn_setup(pn_transport_t *transport, pn_endpoint_t *endpoint) { if (endpoint->type == SESSION && transport->open_sent) @@ -1203,16 +1218,14 @@ int pn_process_ssn_setup(pn_transport_t pn_session_state_t *state = &ssn->state; if (!(endpoint->state & PN_LOCAL_UNINIT) && state->local_channel == (uint16_t) -1) { - uint16_t channel = allocate_alias(transport->local_channels); + pni_map_local_channel(ssn); state->incoming_window = pn_session_incoming_window(ssn); state->outgoing_window = pn_session_outgoing_window(ssn); - pn_post_frame(transport->disp, channel, "DL[?HIII]", BEGIN, + pn_post_frame(transport->disp, state->local_channel, "DL[?HIII]", BEGIN, ((int16_t) state->remote_channel >= 0), state->remote_channel, state->outgoing_transfer_count, state->incoming_window, state->outgoing_window); - state->local_channel = channel; - pn_hash_put(transport->local_channels, channel, ssn); } } @@ -1235,6 +1248,13 @@ static const char *expiry_symbol(pn_expi return NULL; } +static void pni_map_local_handle(pn_link_t *link) { + pn_link_state_t *state = &link->state; + pn_session_state_t *ssn_state = &link->session->state; + state->local_handle = allocate_alias(ssn_state->local_handles); + pn_hash_put(ssn_state->local_handles, state->local_handle, link); +} + int pn_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endpoint) { if (transport->open_sent && (endpoint->type == SENDER || @@ -1246,8 +1266,7 @@ int pn_process_link_setup(pn_transport_t if (((int16_t) ssn_state->local_channel >= 0) && !(endpoint->state & PN_LOCAL_UNINIT) && state->local_handle == (uint32_t) -1) { - state->local_handle = allocate_alias(ssn_state->local_handles); - pn_hash_put(ssn_state->local_handles, state->local_handle, link); + pni_map_local_handle(link); const pn_distribution_mode_t dist_mode = link->source.distribution_mode; int err = pn_post_frame(transport->disp, ssn_state->local_channel, "DL[SIoBB?DL[SIsIoC?sCnCC]?DL[SIsIoCC]nnI]", ATTACH, @@ -1540,6 +1559,14 @@ int pn_process_flow_sender(pn_transport_ return 0; } +static void pni_unmap_local_handle(pn_link_t *link) { + pn_link_state_t *state = &link->state; + uintptr_t handle = state->local_handle; + state->local_handle = -2; + // may delete link + pn_hash_del(link->session->state.local_handles, handle); +} + int pn_process_link_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint) { if (endpoint->type == SENDER || endpoint->type == RECEIVER) @@ -1568,8 +1595,7 @@ int pn_process_link_teardown(pn_transpor int err = pn_post_frame(transport->disp, ssn_state->local_channel, "DL[Io?DL[sSC]]", DETACH, state->local_handle, true, (bool) name, ERROR, name, description, info); if (err) return err; - pn_hash_del(ssn_state->local_handles, state->local_handle); - state->local_handle = -2; + pni_unmap_local_handle(link); } pn_clear_modified(transport->connection, endpoint); @@ -1601,6 +1627,17 @@ bool pn_pointful_buffering(pn_transport_ return false; } +static void pni_unmap_local_channel(pn_session_t *ssn) { + // XXX: should really update link state also + pni_transport_unbind_handles(ssn->state.local_handles); + pn_transport_t *transport = ssn->connection->transport; + pn_session_state_t *state = &ssn->state; + uintptr_t channel = state->local_channel; + state->local_channel = -2; + // may delete session + pn_hash_del(transport->local_channels, channel); +} + int pn_process_ssn_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint) { if (endpoint->type == SESSION) @@ -1610,7 +1647,9 @@ int pn_process_ssn_teardown(pn_transport if (endpoint->state & PN_LOCAL_CLOSED && (int16_t) state->local_channel >= 0 && !transport->close_sent) { - if (pn_pointful_buffering(transport, session)) return 0; + if (pn_pointful_buffering(transport, session)) { + return 0; + } const char *name = NULL; const char *description = NULL; @@ -1625,8 +1664,7 @@ int pn_process_ssn_teardown(pn_transport int err = pn_post_frame(transport->disp, state->local_channel, "DL[?DL[sSC]]", END, (bool) name, ERROR, name, description, info); if (err) return err; - pn_hash_del(transport->local_channels, state->local_channel); - state->local_channel = -2; + pni_unmap_local_channel(session); } pn_clear_modified(transport->connection, endpoint); Modified: qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java?rev=1630787&r1=1630786&r2=1630787&view=diff ============================================================================== --- qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java (original) +++ qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java Fri Oct 10 12:41:20 2014 @@ -190,11 +190,6 @@ public abstract class EndpointImpl imple freed = true; doFree(); - - if (_localState == EndpointState.ACTIVE) { - close(); - } - decref(); } Modified: qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1630787&r1=1630786&r2=1630787&view=diff ============================================================================== --- qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java (original) +++ qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java Fri Oct 10 12:41:20 2014 @@ -400,10 +400,6 @@ public class TransportImpl extends Endpo writeFrame(transportSession.getLocalChannel(), detach, null, null); - - // TODO - temporary hack for PROTON-154, this line should be removed and replaced - // with proper handling for closed links - link.free(); } endpoint.clearModified(); @@ -1158,14 +1154,13 @@ public class TransportImpl extends Endpo LinkImpl link = transportLink.getLink(); transportLink.receivedDetach(); transportSession.freeRemoteHandle(transportLink.getRemoteHandle()); + _connectionEndpoint.put(Event.Type.LINK_REMOTE_CLOSE, link); transportLink.clearRemoteHandle(); link.setRemoteState(EndpointState.CLOSED); if(detach.getError() != null) { link.getRemoteCondition().copyFrom(detach.getError()); } - - _connectionEndpoint.put(Event.Type.LINK_REMOTE_CLOSE, link); } else { Modified: qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1630787&r1=1630786&r2=1630787&view=diff ============================================================================== --- qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java (original) +++ qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java Fri Oct 10 12:41:20 2014 @@ -1444,14 +1444,16 @@ public class MessengerImpl implements Me { _receivers++; _blocked.add((Receiver)link); + link.setContext(Boolean.TRUE); } } // a link is being removed, account for it. private void linkRemoved(Link _link) { - if (_link instanceof Receiver) + if (_link instanceof Receiver && (Boolean) _link.getContext()) { + _link.setContext(Boolean.FALSE); Receiver link = (Receiver)_link; assert _receivers > 0; _receivers--; Modified: qpid/proton/branches/examples/tests/python/proton_tests/engine.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tests/python/proton_tests/engine.py?rev=1630787&r1=1630786&r2=1630787&view=diff ============================================================================== --- qpid/proton/branches/examples/tests/python/proton_tests/engine.py (original) +++ qpid/proton/branches/examples/tests/python/proton_tests/engine.py Fri Oct 10 12:41:20 2014 @@ -2067,14 +2067,11 @@ class DeliveryTest(Test): def testCustom(self): self.testDisposition(type=0x12345, value=CustomValue([1, 2, 3])) -class EventTest(Test): +class CollectorTest(Test): def setup(self): self.collector = Collector() - def teardown(self): - self.cleanup() - def drain(self): result = [] while True: @@ -2104,6 +2101,11 @@ class EventTest(Test): assert False, "actual events %s did not match any of the expected sequences: %s" % (events, sequences) +class EventTest(CollectorTest): + + def teardown(self): + self.cleanup() + def testEndpointEvents(self): c1, c2 = self.connection() c1.collect(self.collector) @@ -2145,13 +2147,11 @@ class EventTest(Test): self.expect(Event.LINK_FINAL) ssn2.free() del ssn2 - self.expect(Event.SESSION_CLOSE, Event.TRANSPORT) self.pump() - self.expect(Event.SESSION_FINAL) c1.free() c1._transport.unbind() - self.expect(Event.CONNECTION_CLOSE, Event.TRANSPORT, - Event.LINK_FINAL, Event.SESSION_FINAL, Event.CONNECTION_FINAL) + self.expect(Event.SESSION_FINAL, Event.LINK_FINAL, Event.SESSION_FINAL, + Event.CONNECTION_FINAL) def testConnectionINIT_FINAL(self): c = Connection() @@ -2215,12 +2215,8 @@ class EventTest(Test): self.expect(Event.LINK_REMOTE_OPEN, Event.DELIVERY) rcv.session.connection._transport.unbind() rcv.session.connection.free() - self.expect_oneof((Event.TRANSPORT, Event.LINK_CLOSE, Event.SESSION_CLOSE, - Event.CONNECTION_CLOSE, Event.LINK_FINAL, - Event.SESSION_FINAL, Event.CONNECTION_FINAL), - (Event.TRANSPORT, Event.LINK_CLOSE, Event.LINK_FINAL, - Event.SESSION_CLOSE, Event.SESSION_FINAL, - Event.CONNECTION_CLOSE, Event.CONNECTION_FINAL)) + self.expect(Event.TRANSPORT, Event.LINK_FINAL, Event.SESSION_FINAL, + Event.CONNECTION_FINAL) def testDeliveryEventsDisp(self): snd, rcv = self.testFlowEvents() @@ -2238,3 +2234,88 @@ class EventTest(Test): self.pump() event = self.expect(Event.DELIVERY) assert event.delivery == dlv + +class PeerTest(CollectorTest): + + def setup(self): + CollectorTest.setup(self) + self.connection = Connection() + self.connection.collect(self.collector) + self.transport = Transport() + self.transport.bind(self.connection) + self.peer = Connection() + self.peer_transport = Transport() + self.peer_transport.bind(self.peer) + self.peer_transport.trace(Transport.TRACE_OFF) + + def pump(self): + pump(self.transport, self.peer_transport) + +class TeardownLeakTest(PeerTest): + + def doLeak(self, local, remote): + self.connection.open() + self.expect(Event.CONNECTION_INIT, Event.CONNECTION_OPEN, Event.TRANSPORT) + + ssn = self.connection.session() + ssn.open() + self.expect(Event.SESSION_INIT, Event.SESSION_OPEN, Event.TRANSPORT) + + snd = ssn.sender("sender") + snd.open() + self.expect(Event.LINK_INIT, Event.LINK_OPEN, Event.TRANSPORT) + + + self.pump() + + self.peer.open() + self.peer.session_head(0).open() + self.peer.link_head(0).open() + + self.pump() + self.expect_oneof((Event.CONNECTION_REMOTE_OPEN, Event.SESSION_REMOTE_OPEN, + Event.LINK_REMOTE_OPEN, Event.LINK_FLOW), + (Event.CONNECTION_REMOTE_OPEN, Event.SESSION_REMOTE_OPEN, + Event.LINK_REMOTE_OPEN)) + + if local: + snd.close() # ha!! + self.expect(Event.LINK_CLOSE, Event.TRANSPORT) + ssn.close() + self.expect(Event.SESSION_CLOSE, Event.TRANSPORT) + self.connection.close() + self.expect(Event.CONNECTION_CLOSE, Event.TRANSPORT) + + if remote: + self.peer.link_head(0).close() # ha!! + self.peer.session_head(0).close() + self.peer.close() + + self.pump() + + if remote: + self.expect_oneof((Event.LINK_REMOTE_CLOSE, Event.SESSION_REMOTE_CLOSE, + Event.CONNECTION_REMOTE_CLOSE), + (Event.LINK_REMOTE_CLOSE, Event.LINK_FINAL, + Event.SESSION_REMOTE_CLOSE, + Event.CONNECTION_REMOTE_CLOSE)) + else: + self.expect(Event.SESSION_REMOTE_CLOSE, Event.CONNECTION_REMOTE_CLOSE) + + self.connection.free() + self.transport.unbind() + + self.expect_oneof((Event.LINK_FINAL, Event.SESSION_FINAL, Event.CONNECTION_FINAL), + (Event.SESSION_FINAL, Event.CONNECTION_FINAL)) + + def testLocalRemoteLeak(self): + self.doLeak(True, True) + + def testLocalLeak(self): + self.doLeak(True, False) + + def testRemoteLeak(self): + self.doLeak(False, True) + + def testLeak(self): + self.doLeak(False, False) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org