[ https://issues.apache.org/jira/browse/PROTON-2177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Carsten Lohmann updated PROTON-2177: ------------------------------------ Description: Invoking free() on a link, or the processing of a received {{detach}} frame, may result in such an exception if the preconditions below are met: {noformat} java.lang.IllegalStateException at org.apache.qpid.proton.engine.impl.EndpointImpl.decref(EndpointImpl.java:54) at org.apache.qpid.proton.engine.impl.LinkImpl.postFinal(LinkImpl.java:128) at org.apache.qpid.proton.engine.impl.EndpointImpl.decref(EndpointImpl.java:52) at org.apache.qpid.proton.engine.impl.TransportLink.clearRemoteHandle(TransportLink.java:125) at org.apache.qpid.proton.engine.impl.TransportImpl.handleDetach(TransportImpl.java:1379) at org.apache.qpid.proton.engine.impl.TransportImpl.handleDetach(TransportImpl.java:70) at org.apache.qpid.proton.amqp.transport.Detach.invoke(Detach.java:86) at org.apache.qpid.proton.engine.impl.TransportImpl.handleFrame(TransportImpl.java:1453) at org.apache.qpid.proton.engine.impl.FrameParser.input(FrameParser.java:425) at org.apache.qpid.proton.engine.impl.FrameParser.process(FrameParser.java:536) at org.apache.qpid.proton.engine.impl.TransportImpl.process(TransportImpl.java:1570) at org.apache.qpid.proton.engine.impl.TransportImpl.processInput(TransportImpl.java:1528) {noformat} The scenario: We have implemented the logic for creating AMQP links with a timeout-mechanism. That means that after invoking {{link.open()}} we wait for a predefined time and if we haven't received the {{attach}} frame from the server at that point, we call {{link.close()}} and then {{link.free()}} to avoid having a memory leak. This has mostly worked well so far. In cases where the server was not sending the attach frame in time, after calling {{sender.close()}} the server usually finally responded with a {{detach}} frame (instead of sending an {{attach}} in between). But lately, when testing with a high number of links (>10000) we sometimes encountered such a server behaviour (with Qpid Dispatch Router as server): - client sends {{attach}} frame - linkEstablishmentTimeout: server doesn't respond in time so client invokes {{link.close()}} and then {{link.free()}} - *server sends an attach frame* - server sends a detach frame If that happens multiple times on the same session, the above exception occurs when calling {{link.free()}}. Afterwards there are 'socked closed' exceptions. If we don't invoke {{link.free}} as part of the linkEstablishmentTimeout handling, the exception doesn't occur. But not invoking {{link.free()}} would create a memory leak if the server doesn't respond at all to the {{attach}} frame. --- The issue can be reproduced with this test method (to be run as an additional method in the "org.apache.qpid.proton.systemtests.FreeTest" class) {code:java|collapse=true} @Test public void testFreeOnLinkEstablishmentTimeout() throws Exception { LOGGER.fine(bold("======== About to create transports")); getClient().transport = Proton.transport(); ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX); getServer().transport = Proton.transport(); ProtocolTracerEnabler.setProtocolTracer(getServer().transport, " " + TestLoggingHelper.SERVER_PREFIX); getClient().connection = Proton.connection(); getClient().transport.bind(getClient().connection); getServer().connection = Proton.connection(); getServer().transport.bind(getServer().connection); LOGGER.fine(bold("======== About to open connections")); getClient().connection.open(); getServer().connection.open(); doOutputInputCycle(); LOGGER.fine(bold("======== About to open session")); getClient().session = getClient().connection.session(); getClient().session.open(); pumpClientToServer(); getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE)); assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE); getServer().session.open(); assertEndpointState(getServer().session, ACTIVE, ACTIVE); pumpServerToClient(); assertEndpointState(getClient().session, ACTIVE, ACTIVE); for (int i = 0; i < 5; i++) { LOGGER.fine("\n\n"); LOGGER.fine(bold("======== About to create client sender " + i + "; refcount on session: " + getSessionRefCount(getClient().session))); getClient().source = new Source(); getClient().source.setAddress(null); getClient().target = new Target(); getClient().target.setAddress("myQueue"); getClient().sender = getClient().session.sender("sender" + i); getClient().sender.setTarget(getClient().target); getClient().sender.setSource(getClient().source); getClient().sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); getClient().sender.setReceiverSettleMode(ReceiverSettleMode.FIRST); assertEndpointState(getClient().sender, UNINITIALIZED, UNINITIALIZED); getClient().sender.open(); assertEndpointState(getClient().sender, ACTIVE, UNINITIALIZED); pumpClientToServer(); LOGGER.fine(bold("======== client: invoke close & free already, as would be done in linkEstablishmentTimeout handling")); getClient().sender.close(); getClient().sender.free(); // skipping this, the error doesn't occur; but how to prevent a memory leak if no attach frame ever is received? // write to outputBuffer already ByteBuffer clientBuffer = getClient().transport.getOutputBuffer(); LOGGER.fine(bold("======== About to set up server receiver (not having gotten the detach yet)")); getServer().receiver = (Receiver) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE)); getServer().receiver.setSenderSettleMode(getServer().receiver.getRemoteSenderSettleMode()); getServer().receiver.setReceiverSettleMode(getServer().receiver.getRemoteReceiverSettleMode()); org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget = getServer().receiver.getRemoteTarget(); assertTerminusEquals(getClient().target, serverRemoteTarget); getServer().receiver.setTarget(serverRemoteTarget); assertEndpointState(getServer().receiver, UNINITIALIZED, ACTIVE); getServer().receiver.open(); assertEndpointState(getServer().receiver, ACTIVE, ACTIVE); // write server attach to server outputBuffer already ByteBuffer serverBuffer = getServer().transport.getOutputBuffer(); LOGGER.fine(bold("~~~~~~~~~~ now write the yet-unsent clientBuffer (with 'detach') to the server")); getServer().transport.getInputBuffer().put(clientBuffer); getClient().transport.outputConsumed(); getServer().transport.processInput().checkIsOk(); LOGGER.fine(bold("~~~~~~~~~~ now write the yet-unsent serverBuffer (with 'attach') to the client")); getClient().transport.getInputBuffer().put(serverBuffer); getClient().transport.processInput().checkIsOk(); getServer().transport.outputConsumed(); // assert the server got the detach assertEndpointState(getServer().receiver, ACTIVE, CLOSED); // let the server respond with a detach getServer().receiver.close(); LOGGER.fine(bold("~~~~~~~~~~ pump detach to client")); pumpServerToClient(); assertEndpointState(getClient().sender, CLOSED, CLOSED); getClient().sender.free(); } getClient().transport.unbind(); LOGGER.fine(bold("======== About to close and free client's connection")); getClient().connection.close(); getClient().connection.free(); } private Integer getSessionRefCount(final Session protonSession) throws NoSuchFieldException, IllegalAccessException { if (!(protonSession instanceof EndpointImpl)) { return null; } final Field refcountField = EndpointImpl.class.getDeclaredField("refcount"); refcountField.setAccessible(true); return (Integer) refcountField.get(protonSession); } {code} (Testcode using vertx-proton and separate client/server classes can also be provided if needed.) was: Invoking free() on a link may result in such an exception if the preconditions below are met: {noformat} java.lang.IllegalStateException at org.apache.qpid.proton.engine.impl.EndpointImpl.decref(EndpointImpl.java:54) at org.apache.qpid.proton.engine.impl.LinkImpl.postFinal(LinkImpl.java:128) at org.apache.qpid.proton.engine.impl.EndpointImpl.decref(EndpointImpl.java:52) at org.apache.qpid.proton.engine.impl.TransportLink.clearRemoteHandle(TransportLink.java:125) at org.apache.qpid.proton.engine.impl.TransportImpl.handleDetach(TransportImpl.java:1379) at org.apache.qpid.proton.engine.impl.TransportImpl.handleDetach(TransportImpl.java:70) at org.apache.qpid.proton.amqp.transport.Detach.invoke(Detach.java:86) at org.apache.qpid.proton.engine.impl.TransportImpl.handleFrame(TransportImpl.java:1453) at org.apache.qpid.proton.engine.impl.FrameParser.input(FrameParser.java:425) at org.apache.qpid.proton.engine.impl.FrameParser.process(FrameParser.java:536) at org.apache.qpid.proton.engine.impl.TransportImpl.process(TransportImpl.java:1570) at org.apache.qpid.proton.engine.impl.TransportImpl.processInput(TransportImpl.java:1528) {noformat} The scenario: We have implemented the logic for creating AMQP links with a timeout-mechanism. That means that after invoking {{link.open()}} we wait for a predefined time and if we haven't received the {{attach}} frame from the server at that point, we call {{link.close()}} and then {{link.free()}} to avoid having a memory leak. This has mostly worked well so far. In cases where the server was not sending the attach frame in time, after calling {{sender.close()}} the server usually finally responded with a {{detach}} frame (instead of sending an {{attach}} in between). But lately, when testing with a high number of links (>10000) we sometimes encountered such a server behaviour (with Qpid Dispatch Router as server): - client sends {{attach}} frame - linkEstablishmentTimeout: server doesn't respond in time so client invokes {{link.close()}} and then {{link.free()}} - *server sends an attach frame* - server sends a detach frame If that happens multiple times on the same session, the above exception occurs when calling {{link.free()}}. Afterwards there are 'socked closed' exceptions. If we don't invoke {{link.free}} as part of the linkEstablishmentTimeout handling, the exception doesn't occur. But not invoking {{link.free()}} would create a memory leak if the server doesn't respond at all to the {{attach}} frame. --- The issue can be reproduced with this test method (to be run as an additional method in the "org.apache.qpid.proton.systemtests.FreeTest" class) {code:java|collapse=true} @Test public void testFreeOnLinkEstablishmentTimeout() throws Exception { LOGGER.fine(bold("======== About to create transports")); getClient().transport = Proton.transport(); ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX); getServer().transport = Proton.transport(); ProtocolTracerEnabler.setProtocolTracer(getServer().transport, " " + TestLoggingHelper.SERVER_PREFIX); getClient().connection = Proton.connection(); getClient().transport.bind(getClient().connection); getServer().connection = Proton.connection(); getServer().transport.bind(getServer().connection); LOGGER.fine(bold("======== About to open connections")); getClient().connection.open(); getServer().connection.open(); doOutputInputCycle(); LOGGER.fine(bold("======== About to open session")); getClient().session = getClient().connection.session(); getClient().session.open(); pumpClientToServer(); getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE)); assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE); getServer().session.open(); assertEndpointState(getServer().session, ACTIVE, ACTIVE); pumpServerToClient(); assertEndpointState(getClient().session, ACTIVE, ACTIVE); for (int i = 0; i < 5; i++) { LOGGER.fine("\n\n"); LOGGER.fine(bold("======== About to create client sender " + i + "; refcount on session: " + getSessionRefCount(getClient().session))); getClient().source = new Source(); getClient().source.setAddress(null); getClient().target = new Target(); getClient().target.setAddress("myQueue"); getClient().sender = getClient().session.sender("sender" + i); getClient().sender.setTarget(getClient().target); getClient().sender.setSource(getClient().source); getClient().sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); getClient().sender.setReceiverSettleMode(ReceiverSettleMode.FIRST); assertEndpointState(getClient().sender, UNINITIALIZED, UNINITIALIZED); getClient().sender.open(); assertEndpointState(getClient().sender, ACTIVE, UNINITIALIZED); pumpClientToServer(); LOGGER.fine(bold("======== client: invoke close & free already, as would be done in linkEstablishmentTimeout handling")); getClient().sender.close(); getClient().sender.free(); // skipping this, the error doesn't occur; but how to prevent a memory leak if no attach frame ever is received? // write to outputBuffer already ByteBuffer clientBuffer = getClient().transport.getOutputBuffer(); LOGGER.fine(bold("======== About to set up server receiver (not having gotten the detach yet)")); getServer().receiver = (Receiver) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE)); getServer().receiver.setSenderSettleMode(getServer().receiver.getRemoteSenderSettleMode()); getServer().receiver.setReceiverSettleMode(getServer().receiver.getRemoteReceiverSettleMode()); org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget = getServer().receiver.getRemoteTarget(); assertTerminusEquals(getClient().target, serverRemoteTarget); getServer().receiver.setTarget(serverRemoteTarget); assertEndpointState(getServer().receiver, UNINITIALIZED, ACTIVE); getServer().receiver.open(); assertEndpointState(getServer().receiver, ACTIVE, ACTIVE); // write server attach to server outputBuffer already ByteBuffer serverBuffer = getServer().transport.getOutputBuffer(); LOGGER.fine(bold("~~~~~~~~~~ now write the yet-unsent clientBuffer (with 'detach') to the server")); getServer().transport.getInputBuffer().put(clientBuffer); getClient().transport.outputConsumed(); getServer().transport.processInput().checkIsOk(); LOGGER.fine(bold("~~~~~~~~~~ now write the yet-unsent serverBuffer (with 'attach') to the client")); getClient().transport.getInputBuffer().put(serverBuffer); getClient().transport.processInput().checkIsOk(); getServer().transport.outputConsumed(); // assert the server got the detach assertEndpointState(getServer().receiver, ACTIVE, CLOSED); // let the server respond with a detach getServer().receiver.close(); LOGGER.fine(bold("~~~~~~~~~~ pump detach to client")); pumpServerToClient(); assertEndpointState(getClient().sender, CLOSED, CLOSED); getClient().sender.free(); } getClient().transport.unbind(); LOGGER.fine(bold("======== About to close and free client's connection")); getClient().connection.close(); getClient().connection.free(); } private Integer getSessionRefCount(final Session protonSession) throws NoSuchFieldException, IllegalAccessException { if (!(protonSession instanceof EndpointImpl)) { return null; } final Field refcountField = EndpointImpl.class.getDeclaredField("refcount"); refcountField.setAccessible(true); return (Integer) refcountField.get(protonSession); } {code} (Testcode using vertx-proton and separate client/server classes can also be provided if needed.) > IllegalStateException when freeing link as part of timeout handling > ------------------------------------------------------------------- > > Key: PROTON-2177 > URL: https://issues.apache.org/jira/browse/PROTON-2177 > Project: Qpid Proton > Issue Type: Bug > Components: proton-j > Affects Versions: proton-j-0.33.3 > Reporter: Carsten Lohmann > Priority: Major > > Invoking free() on a link, or the processing of a received {{detach}} frame, > may result in such an exception if the preconditions below are met: > {noformat} > java.lang.IllegalStateException > at > org.apache.qpid.proton.engine.impl.EndpointImpl.decref(EndpointImpl.java:54) > at > org.apache.qpid.proton.engine.impl.LinkImpl.postFinal(LinkImpl.java:128) > at > org.apache.qpid.proton.engine.impl.EndpointImpl.decref(EndpointImpl.java:52) > at > org.apache.qpid.proton.engine.impl.TransportLink.clearRemoteHandle(TransportLink.java:125) > at > org.apache.qpid.proton.engine.impl.TransportImpl.handleDetach(TransportImpl.java:1379) > at > org.apache.qpid.proton.engine.impl.TransportImpl.handleDetach(TransportImpl.java:70) > at org.apache.qpid.proton.amqp.transport.Detach.invoke(Detach.java:86) > at > org.apache.qpid.proton.engine.impl.TransportImpl.handleFrame(TransportImpl.java:1453) > at > org.apache.qpid.proton.engine.impl.FrameParser.input(FrameParser.java:425) > at > org.apache.qpid.proton.engine.impl.FrameParser.process(FrameParser.java:536) > at > org.apache.qpid.proton.engine.impl.TransportImpl.process(TransportImpl.java:1570) > at > org.apache.qpid.proton.engine.impl.TransportImpl.processInput(TransportImpl.java:1528) > {noformat} > The scenario: > We have implemented the logic for creating AMQP links with a > timeout-mechanism. That means that after invoking {{link.open()}} we wait for > a predefined time and if we haven't received the {{attach}} frame from the > server at that point, we call {{link.close()}} and then {{link.free()}} to > avoid having a memory leak. > This has mostly worked well so far. In cases where the server was not sending > the attach frame in time, after calling {{sender.close()}} the server usually > finally responded with a {{detach}} frame (instead of sending an {{attach}} > in between). > But lately, when testing with a high number of links (>10000) we sometimes > encountered such a server behaviour (with Qpid Dispatch Router as server): > - client sends {{attach}} frame > - linkEstablishmentTimeout: server doesn't respond in time so client invokes > {{link.close()}} and then {{link.free()}} > - *server sends an attach frame* > - server sends a detach frame > If that happens multiple times on the same session, the above exception > occurs when calling {{link.free()}}. Afterwards there are 'socked closed' > exceptions. > If we don't invoke {{link.free}} as part of the linkEstablishmentTimeout > handling, the exception doesn't occur. > But not invoking {{link.free()}} would create a memory leak if the server > doesn't respond at all to the {{attach}} frame. > --- > The issue can be reproduced with this test method (to be run as an additional > method in the "org.apache.qpid.proton.systemtests.FreeTest" class) > {code:java|collapse=true} > @Test > public void testFreeOnLinkEstablishmentTimeout() throws Exception { > LOGGER.fine(bold("======== About to create transports")); > getClient().transport = Proton.transport(); > ProtocolTracerEnabler.setProtocolTracer(getClient().transport, > TestLoggingHelper.CLIENT_PREFIX); > getServer().transport = Proton.transport(); > ProtocolTracerEnabler.setProtocolTracer(getServer().transport, " > " + TestLoggingHelper.SERVER_PREFIX); > getClient().connection = Proton.connection(); > getClient().transport.bind(getClient().connection); > getServer().connection = Proton.connection(); > getServer().transport.bind(getServer().connection); > LOGGER.fine(bold("======== About to open connections")); > getClient().connection.open(); > getServer().connection.open(); > doOutputInputCycle(); > LOGGER.fine(bold("======== About to open session")); > getClient().session = getClient().connection.session(); > getClient().session.open(); > pumpClientToServer(); > getServer().session = > getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE)); > assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE); > getServer().session.open(); > assertEndpointState(getServer().session, ACTIVE, ACTIVE); > pumpServerToClient(); > assertEndpointState(getClient().session, ACTIVE, ACTIVE); > for (int i = 0; i < 5; i++) { > LOGGER.fine("\n\n"); > LOGGER.fine(bold("======== About to create client sender " + i + > "; refcount on session: " + getSessionRefCount(getClient().session))); > getClient().source = new Source(); > getClient().source.setAddress(null); > getClient().target = new Target(); > getClient().target.setAddress("myQueue"); > getClient().sender = getClient().session.sender("sender" + i); > getClient().sender.setTarget(getClient().target); > getClient().sender.setSource(getClient().source); > > getClient().sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); > > getClient().sender.setReceiverSettleMode(ReceiverSettleMode.FIRST); > assertEndpointState(getClient().sender, UNINITIALIZED, > UNINITIALIZED); > getClient().sender.open(); > assertEndpointState(getClient().sender, ACTIVE, UNINITIALIZED); > pumpClientToServer(); > LOGGER.fine(bold("======== client: invoke close & free already, > as would be done in linkEstablishmentTimeout handling")); > getClient().sender.close(); > getClient().sender.free(); // skipping this, the error doesn't > occur; but how to prevent a memory leak if no attach frame ever is received? > // write to outputBuffer already > ByteBuffer clientBuffer = getClient().transport.getOutputBuffer(); > LOGGER.fine(bold("======== About to set up server receiver (not > having gotten the detach yet)")); > getServer().receiver = (Receiver) > getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE)); > > getServer().receiver.setSenderSettleMode(getServer().receiver.getRemoteSenderSettleMode()); > > getServer().receiver.setReceiverSettleMode(getServer().receiver.getRemoteReceiverSettleMode()); > org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget = > getServer().receiver.getRemoteTarget(); > assertTerminusEquals(getClient().target, serverRemoteTarget); > getServer().receiver.setTarget(serverRemoteTarget); > assertEndpointState(getServer().receiver, UNINITIALIZED, ACTIVE); > getServer().receiver.open(); > assertEndpointState(getServer().receiver, ACTIVE, ACTIVE); > // write server attach to server outputBuffer already > ByteBuffer serverBuffer = getServer().transport.getOutputBuffer(); > LOGGER.fine(bold("~~~~~~~~~~ now write the yet-unsent > clientBuffer (with 'detach') to the server")); > getServer().transport.getInputBuffer().put(clientBuffer); > getClient().transport.outputConsumed(); > getServer().transport.processInput().checkIsOk(); > LOGGER.fine(bold("~~~~~~~~~~ now write the yet-unsent > serverBuffer (with 'attach') to the client")); > getClient().transport.getInputBuffer().put(serverBuffer); > getClient().transport.processInput().checkIsOk(); > getServer().transport.outputConsumed(); > // assert the server got the detach > assertEndpointState(getServer().receiver, ACTIVE, CLOSED); > // let the server respond with a detach > getServer().receiver.close(); > LOGGER.fine(bold("~~~~~~~~~~ pump detach to client")); > pumpServerToClient(); > assertEndpointState(getClient().sender, CLOSED, CLOSED); > getClient().sender.free(); > } > getClient().transport.unbind(); > LOGGER.fine(bold("======== About to close and free client's > connection")); > getClient().connection.close(); > getClient().connection.free(); > } > private Integer getSessionRefCount(final Session protonSession) throws > NoSuchFieldException, IllegalAccessException { > if (!(protonSession instanceof EndpointImpl)) { > return null; > } > final Field refcountField = > EndpointImpl.class.getDeclaredField("refcount"); > refcountField.setAccessible(true); > return (Integer) refcountField.get(protonSession); > } > {code} > (Testcode using vertx-proton and separate client/server classes can also be > provided if needed.) -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For additional commands, e-mail: dev-h...@qpid.apache.org