[ 
https://issues.apache.org/jira/browse/PROTON-2177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17028829#comment-17028829
 ] 

Carsten Lohmann commented on PROTON-2177:
-----------------------------------------

I wouldn't want to kill the whole session, thereby killing all other links on 
that session, when just one link creation attempt times out. (In our scenario 
there are several thousand links involved here, which would all need to be 
recreated.)

I would find it more consistent if "free()" would kind of detach the link 
object from any transport involvement, so that any further incoming frames 
would get ignored.
Or, if that is not feasible, to have "free()" throw an IllegalStateException if 
the link local + remote states are not CLOSED. That would at least make things 
more obvious and prevent the obscure "decref" IllegalStateException above.




> IllegalStateException when freeing link as part of timeout handling
> -------------------------------------------------------------------
>
>                 Key: PROTON-2177
>                 URL: https://issues.apache.org/jira/browse/PROTON-2177
>             Project: Qpid Proton
>          Issue Type: Bug
>          Components: proton-j
>    Affects Versions: proton-j-0.33.3
>            Reporter: Carsten Lohmann
>            Priority: Major
>
> Invoking free() on a link, or the processing of a received {{detach}} frame, 
> may result in such an exception if the preconditions below are met:
> {noformat}
> java.lang.IllegalStateException
>       at 
> org.apache.qpid.proton.engine.impl.EndpointImpl.decref(EndpointImpl.java:54)
>       at 
> org.apache.qpid.proton.engine.impl.LinkImpl.postFinal(LinkImpl.java:128)
>       at 
> org.apache.qpid.proton.engine.impl.EndpointImpl.decref(EndpointImpl.java:52)
>       at 
> org.apache.qpid.proton.engine.impl.TransportLink.clearRemoteHandle(TransportLink.java:125)
>       at 
> org.apache.qpid.proton.engine.impl.TransportImpl.handleDetach(TransportImpl.java:1379)
>       at 
> org.apache.qpid.proton.engine.impl.TransportImpl.handleDetach(TransportImpl.java:70)
>       at org.apache.qpid.proton.amqp.transport.Detach.invoke(Detach.java:86)
>       at 
> org.apache.qpid.proton.engine.impl.TransportImpl.handleFrame(TransportImpl.java:1453)
>       at 
> org.apache.qpid.proton.engine.impl.FrameParser.input(FrameParser.java:425)
>       at 
> org.apache.qpid.proton.engine.impl.FrameParser.process(FrameParser.java:536)
>       at 
> org.apache.qpid.proton.engine.impl.TransportImpl.process(TransportImpl.java:1570)
>       at 
> org.apache.qpid.proton.engine.impl.TransportImpl.processInput(TransportImpl.java:1528)
> {noformat}
> The scenario:
> We have implemented the logic for creating AMQP links with a 
> timeout-mechanism. That means that after invoking {{link.open()}} we wait for 
> a predefined time and if we haven't received the {{attach}} frame from the 
> server at that point, we call {{link.close()}} and then {{link.free()}} to 
> avoid having a memory leak.
> This has mostly worked well so far. In cases where the server was not sending 
> the attach frame in time, after calling {{sender.close()}} the server usually 
> finally responded with a {{detach}} frame (instead of sending an {{attach}} 
> in between).
> But lately, when testing with a high number of links (>10000) we sometimes 
> encountered such a server behaviour (with Qpid Dispatch Router as server):
> - client sends {{attach}} frame
> - linkEstablishmentTimeout: server doesn't respond in time so client invokes 
> {{link.close()}} and then {{link.free()}}
> - *server sends an attach frame*
> - server sends a detach frame
> If that happens multiple times on the same session, the above exception 
> occurs when calling {{link.free()}}. Afterwards there are 'socked closed' 
> exceptions.
> If we don't invoke {{link.free}} as part of the linkEstablishmentTimeout 
> handling, the exception doesn't occur.
> But not invoking {{link.free()}} would create a memory leak if the server 
> doesn't respond at all to the {{attach}} frame.
> ---
> The issue can be reproduced with this test method (to be run as an additional 
> method in the "org.apache.qpid.proton.systemtests.FreeTest" class)
> {code:java|collapse=true}
>     @Test
>     public void testFreeOnLinkEstablishmentTimeout() throws Exception {
>         LOGGER.fine(bold("======== About to create transports"));
>         getClient().transport = Proton.transport();
>         ProtocolTracerEnabler.setProtocolTracer(getClient().transport, 
> TestLoggingHelper.CLIENT_PREFIX);
>         getServer().transport = Proton.transport();
>         ProtocolTracerEnabler.setProtocolTracer(getServer().transport, "      
>       " + TestLoggingHelper.SERVER_PREFIX);
>         getClient().connection = Proton.connection();
>         getClient().transport.bind(getClient().connection);
>         getServer().connection = Proton.connection();
>         getServer().transport.bind(getServer().connection);
>         LOGGER.fine(bold("======== About to open connections"));
>         getClient().connection.open();
>         getServer().connection.open();
>         doOutputInputCycle();
>         LOGGER.fine(bold("======== About to open session"));
>         getClient().session = getClient().connection.session();
>         getClient().session.open();
>         pumpClientToServer();
>         getServer().session = 
> getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE));
>         assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE);
>         getServer().session.open();
>         assertEndpointState(getServer().session, ACTIVE, ACTIVE);
>         pumpServerToClient();
>         assertEndpointState(getClient().session, ACTIVE, ACTIVE);
>         for (int i = 0; i < 5; i++) {
>             LOGGER.fine("\n\n");
>             LOGGER.fine(bold("======== About to create client sender " + i + 
> "; refcount on session: " + getSessionRefCount(getClient().session)));
>             getClient().source = new Source();
>             getClient().source.setAddress(null);
>             getClient().target = new Target();
>             getClient().target.setAddress("myQueue");
>             getClient().sender = getClient().session.sender("sender" + i);
>             getClient().sender.setTarget(getClient().target);
>             getClient().sender.setSource(getClient().source);
>             
> getClient().sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
>             
> getClient().sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
>             assertEndpointState(getClient().sender, UNINITIALIZED, 
> UNINITIALIZED);
>             getClient().sender.open();
>             assertEndpointState(getClient().sender, ACTIVE, UNINITIALIZED);
>             pumpClientToServer();
>             LOGGER.fine(bold("======== client: invoke close & free already, 
> as would be done in linkEstablishmentTimeout handling"));
>             getClient().sender.close();
>             getClient().sender.free();  // skipping this, the error doesn't 
> occur; but how to prevent a memory leak if no attach frame ever is received?
>             // write to outputBuffer already
>             ByteBuffer clientBuffer = getClient().transport.getOutputBuffer();
>             LOGGER.fine(bold("======== About to set up server receiver (not 
> having gotten the detach yet)"));
>             getServer().receiver = (Receiver) 
> getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE));
>             
> getServer().receiver.setSenderSettleMode(getServer().receiver.getRemoteSenderSettleMode());
>             
> getServer().receiver.setReceiverSettleMode(getServer().receiver.getRemoteReceiverSettleMode());
>             org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget = 
> getServer().receiver.getRemoteTarget();
>             assertTerminusEquals(getClient().target, serverRemoteTarget);
>             getServer().receiver.setTarget(serverRemoteTarget);
>             assertEndpointState(getServer().receiver, UNINITIALIZED, ACTIVE);
>             getServer().receiver.open();
>             assertEndpointState(getServer().receiver, ACTIVE, ACTIVE);
>             // write server attach to server outputBuffer already
>             ByteBuffer serverBuffer = getServer().transport.getOutputBuffer();
>             LOGGER.fine(bold("~~~~~~~~~~ now write the yet-unsent 
> clientBuffer (with 'detach') to the server"));
>             getServer().transport.getInputBuffer().put(clientBuffer);
>             getClient().transport.outputConsumed();
>             getServer().transport.processInput().checkIsOk();
>             LOGGER.fine(bold("~~~~~~~~~~ now write the yet-unsent 
> serverBuffer (with 'attach') to the client"));
>             getClient().transport.getInputBuffer().put(serverBuffer);
>             getClient().transport.processInput().checkIsOk();
>             getServer().transport.outputConsumed();
>             // assert the server got the detach
>             assertEndpointState(getServer().receiver, ACTIVE, CLOSED);
>             // let the server respond with a detach
>             getServer().receiver.close();
>             LOGGER.fine(bold("~~~~~~~~~~ pump detach to client"));
>             pumpServerToClient();
>             assertEndpointState(getClient().sender, CLOSED, CLOSED);
>             getClient().sender.free();
>         }
>         getClient().transport.unbind();
>         LOGGER.fine(bold("======== About to close and free client's 
> connection"));
>         getClient().connection.close();
>         getClient().connection.free();
>     }
>     private Integer getSessionRefCount(final Session protonSession) throws 
> NoSuchFieldException, IllegalAccessException {
>         if (!(protonSession instanceof EndpointImpl)) {
>             return null;
>         }
>         final Field refcountField = 
> EndpointImpl.class.getDeclaredField("refcount");
>         refcountField.setAccessible(true);
>         return (Integer) refcountField.get(protonSession);
>     }
> {code}
> (Testcode using vertx-proton and separate client/server classes can also be 
> provided if needed.)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org
For additional commands, e-mail: dev-h...@qpid.apache.org

Reply via email to