Carsten Lohmann created PROTON-2177: ---------------------------------------
Summary: IllegalStateException when freeing link as part of timeout handling Key: PROTON-2177 URL: https://issues.apache.org/jira/browse/PROTON-2177 Project: Qpid Proton Issue Type: Bug Components: proton-j Affects Versions: proton-j-0.33.3 Reporter: Carsten Lohmann Invoking free() on a link may result in such an exception if the preconditions below are met: {noformat} java.lang.IllegalStateException at org.apache.qpid.proton.engine.impl.EndpointImpl.decref(EndpointImpl.java:54) at org.apache.qpid.proton.engine.impl.LinkImpl.postFinal(LinkImpl.java:128) at org.apache.qpid.proton.engine.impl.EndpointImpl.decref(EndpointImpl.java:52) at org.apache.qpid.proton.engine.impl.TransportLink.clearRemoteHandle(TransportLink.java:125) at org.apache.qpid.proton.engine.impl.TransportImpl.handleDetach(TransportImpl.java:1379) at org.apache.qpid.proton.engine.impl.TransportImpl.handleDetach(TransportImpl.java:70) at org.apache.qpid.proton.amqp.transport.Detach.invoke(Detach.java:86) at org.apache.qpid.proton.engine.impl.TransportImpl.handleFrame(TransportImpl.java:1453) at org.apache.qpid.proton.engine.impl.FrameParser.input(FrameParser.java:425) at org.apache.qpid.proton.engine.impl.FrameParser.process(FrameParser.java:536) at org.apache.qpid.proton.engine.impl.TransportImpl.process(TransportImpl.java:1570) at org.apache.qpid.proton.engine.impl.TransportImpl.processInput(TransportImpl.java:1528) {noformat} The scenario: We have implemented the logic for creating AMQP links with a timeout-mechanism. That means that after invoking {{link.open()}} we wait for a predefined time and if we haven't received the {{attach}} frame from the server at that point, we call {{link.close()}} and then {{link.free()}} to avoid having a memory leak. This has mostly worked well so far. In cases where the server was not sending the attach frame in time, after calling {{sender.close()}} the server usually finally responded with a {{detach}} frame (instead of sending an {{attach}} in between). But lately, when testing with a high number of links (>10000) we sometimes encountered such a server behaviour (Qpid): - client sends {{attach}} frame - linkEstablishmentTimeout: server doesn't respond in time so client invokes {{link.close()}} and then {{link.free()}} - *server sends an attach frame* - server sends a detach frame If that happens mulitple times on the same session, the above exception occurs when calling {{link.free()}}. Afterwards there are 'socked closed' exceptions. If we don't invoke {{link.free}} as part of the linkEstablishmentTimeout handling, the exception doesn't occur. But not invoking {{link.free()}} would create a memory leak if the server doesn't respond at all to the {{attach}} frame. --- The issue can be reproduced with this test method (to be run as an additional method in the "org.apache.qpid.proton.systemtests.FreeTest" class) {code:java|collapse=true} @Test public void testFreeOnLinkEstablishmentTimeout() throws Exception { LOGGER.fine(bold("======== About to create transports")); getClient().transport = Proton.transport(); ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX); getServer().transport = Proton.transport(); ProtocolTracerEnabler.setProtocolTracer(getServer().transport, " " + TestLoggingHelper.SERVER_PREFIX); getClient().connection = Proton.connection(); getClient().transport.bind(getClient().connection); getServer().connection = Proton.connection(); getServer().transport.bind(getServer().connection); LOGGER.fine(bold("======== About to open connections")); getClient().connection.open(); getServer().connection.open(); doOutputInputCycle(); LOGGER.fine(bold("======== About to open session")); getClient().session = getClient().connection.session(); getClient().session.open(); pumpClientToServer(); getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE)); assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE); getServer().session.open(); assertEndpointState(getServer().session, ACTIVE, ACTIVE); pumpServerToClient(); assertEndpointState(getClient().session, ACTIVE, ACTIVE); for (int i = 0; i < 5; i++) { LOGGER.fine("\n\n"); LOGGER.fine(bold("======== About to create client sender " + i + "; refcount on session: " + getSessionRefCount(getClient().session))); getClient().source = new Source(); getClient().source.setAddress(null); getClient().target = new Target(); getClient().target.setAddress("myQueue"); getClient().sender = getClient().session.sender("sender" + i); getClient().sender.setTarget(getClient().target); getClient().sender.setSource(getClient().source); getClient().sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); getClient().sender.setReceiverSettleMode(ReceiverSettleMode.FIRST); assertEndpointState(getClient().sender, UNINITIALIZED, UNINITIALIZED); getClient().sender.open(); assertEndpointState(getClient().sender, ACTIVE, UNINITIALIZED); pumpClientToServer(); LOGGER.fine(bold("======== client: invoke close & free already, as would be done in linkEstablishmentTimeout handling")); getClient().sender.close(); getClient().sender.free(); // skipping this, the error doesn't occur; but how to prevent a memory leak if no attach frame ever is received? // write to outputBuffer already ByteBuffer clientBuffer = getClient().transport.getOutputBuffer(); LOGGER.fine(bold("======== About to set up server receiver (not having gotten the detach yet)")); getServer().receiver = (Receiver) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE)); getServer().receiver.setSenderSettleMode(getServer().receiver.getRemoteSenderSettleMode()); getServer().receiver.setReceiverSettleMode(getServer().receiver.getRemoteReceiverSettleMode()); org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget = getServer().receiver.getRemoteTarget(); assertTerminusEquals(getClient().target, serverRemoteTarget); getServer().receiver.setTarget(serverRemoteTarget); assertEndpointState(getServer().receiver, UNINITIALIZED, ACTIVE); getServer().receiver.open(); assertEndpointState(getServer().receiver, ACTIVE, ACTIVE); // write server attach to server outputBuffer already ByteBuffer serverBuffer = getServer().transport.getOutputBuffer(); LOGGER.fine(bold("~~~~~~~~~~ now write the yet-unsent clientBuffer (with 'detach') to the server")); getServer().transport.getInputBuffer().put(clientBuffer); getClient().transport.outputConsumed(); getServer().transport.processInput().checkIsOk(); LOGGER.fine(bold("~~~~~~~~~~ now write the yet-unsent serverBuffer (with 'attach') to the client")); getClient().transport.getInputBuffer().put(serverBuffer); getClient().transport.processInput().checkIsOk(); getServer().transport.outputConsumed(); // assert the server got the detach assertEndpointState(getServer().receiver, ACTIVE, CLOSED); // let the server respond with a detach getServer().receiver.close(); LOGGER.fine(bold("~~~~~~~~~~ pump detach to client")); pumpServerToClient(); assertEndpointState(getClient().sender, CLOSED, CLOSED); getClient().sender.free(); } getClient().transport.unbind(); LOGGER.fine(bold("======== About to close and free client's connection")); getClient().connection.close(); getClient().connection.free(); } private Integer getSessionRefCount(final Session protonSession) throws NoSuchFieldException, IllegalAccessException { if (!(protonSession instanceof EndpointImpl)) { return null; } final Field refcountField = EndpointImpl.class.getDeclaredField("refcount"); refcountField.setAccessible(true); return (Integer) refcountField.get(protonSession); } {code} (Testcode using vertx-proton and separate client/server classes can also be provided if needed.) -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For additional commands, e-mail: dev-h...@qpid.apache.org