Carsten Lohmann created PROTON-2177:
---------------------------------------
Summary: IllegalStateException when freeing link as part of
timeout handling
Key: PROTON-2177
URL: https://issues.apache.org/jira/browse/PROTON-2177
Project: Qpid Proton
Issue Type: Bug
Components: proton-j
Affects Versions: proton-j-0.33.3
Reporter: Carsten Lohmann
Invoking free() on a link may result in such an exception if the preconditions
below are met:
{noformat}
java.lang.IllegalStateException
at
org.apache.qpid.proton.engine.impl.EndpointImpl.decref(EndpointImpl.java:54)
at
org.apache.qpid.proton.engine.impl.LinkImpl.postFinal(LinkImpl.java:128)
at
org.apache.qpid.proton.engine.impl.EndpointImpl.decref(EndpointImpl.java:52)
at
org.apache.qpid.proton.engine.impl.TransportLink.clearRemoteHandle(TransportLink.java:125)
at
org.apache.qpid.proton.engine.impl.TransportImpl.handleDetach(TransportImpl.java:1379)
at
org.apache.qpid.proton.engine.impl.TransportImpl.handleDetach(TransportImpl.java:70)
at org.apache.qpid.proton.amqp.transport.Detach.invoke(Detach.java:86)
at
org.apache.qpid.proton.engine.impl.TransportImpl.handleFrame(TransportImpl.java:1453)
at
org.apache.qpid.proton.engine.impl.FrameParser.input(FrameParser.java:425)
at
org.apache.qpid.proton.engine.impl.FrameParser.process(FrameParser.java:536)
at
org.apache.qpid.proton.engine.impl.TransportImpl.process(TransportImpl.java:1570)
at
org.apache.qpid.proton.engine.impl.TransportImpl.processInput(TransportImpl.java:1528)
{noformat}
The scenario:
We have implemented the logic for creating AMQP links with a timeout-mechanism.
That means that after invoking {{link.open()}} we wait for a predefined time
and if we haven't received the {{attach}} frame from the server at that point,
we call {{link.close()}} and then {{link.free()}} to avoid having a memory leak.
This has mostly worked well so far. In cases where the server was not sending
the attach frame in time, after calling {{sender.close()}} the server usually
finally responded with a {{detach}} frame (instead of sending an {{attach}} in
between).
But lately, when testing with a high number of links (>10000) we sometimes
encountered such a server behaviour (Qpid):
- client sends {{attach}} frame
- linkEstablishmentTimeout: server doesn't respond in time so client invokes
{{link.close()}} and then {{link.free()}}
- *server sends an attach frame*
- server sends a detach frame
If that happens mulitple times on the same session, the above exception occurs
when calling {{link.free()}}. Afterwards there are 'socked closed' exceptions.
If we don't invoke {{link.free}} as part of the linkEstablishmentTimeout
handling, the exception doesn't occur.
But not invoking {{link.free()}} would create a memory leak if the server
doesn't respond at all to the {{attach}} frame.
---
The issue can be reproduced with this test method (to be run as an additional
method in the "org.apache.qpid.proton.systemtests.FreeTest" class)
{code:java|collapse=true}
@Test
public void testFreeOnLinkEstablishmentTimeout() throws Exception {
LOGGER.fine(bold("======== About to create transports"));
getClient().transport = Proton.transport();
ProtocolTracerEnabler.setProtocolTracer(getClient().transport,
TestLoggingHelper.CLIENT_PREFIX);
getServer().transport = Proton.transport();
ProtocolTracerEnabler.setProtocolTracer(getServer().transport, "
" + TestLoggingHelper.SERVER_PREFIX);
getClient().connection = Proton.connection();
getClient().transport.bind(getClient().connection);
getServer().connection = Proton.connection();
getServer().transport.bind(getServer().connection);
LOGGER.fine(bold("======== About to open connections"));
getClient().connection.open();
getServer().connection.open();
doOutputInputCycle();
LOGGER.fine(bold("======== About to open session"));
getClient().session = getClient().connection.session();
getClient().session.open();
pumpClientToServer();
getServer().session =
getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE));
assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE);
getServer().session.open();
assertEndpointState(getServer().session, ACTIVE, ACTIVE);
pumpServerToClient();
assertEndpointState(getClient().session, ACTIVE, ACTIVE);
for (int i = 0; i < 5; i++) {
LOGGER.fine("\n\n");
LOGGER.fine(bold("======== About to create client sender " + i + ";
refcount on session: " + getSessionRefCount(getClient().session)));
getClient().source = new Source();
getClient().source.setAddress(null);
getClient().target = new Target();
getClient().target.setAddress("myQueue");
getClient().sender = getClient().session.sender("sender" + i);
getClient().sender.setTarget(getClient().target);
getClient().sender.setSource(getClient().source);
getClient().sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
getClient().sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
assertEndpointState(getClient().sender, UNINITIALIZED,
UNINITIALIZED);
getClient().sender.open();
assertEndpointState(getClient().sender, ACTIVE, UNINITIALIZED);
pumpClientToServer();
LOGGER.fine(bold("======== client: invoke close & free already, as
would be done in linkEstablishmentTimeout handling"));
getClient().sender.close();
getClient().sender.free(); // skipping this, the error doesn't
occur; but how to prevent a memory leak if no attach frame ever is received?
// write to outputBuffer already
ByteBuffer clientBuffer = getClient().transport.getOutputBuffer();
LOGGER.fine(bold("======== About to set up server receiver (not
having gotten the detach yet)"));
getServer().receiver = (Receiver)
getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE));
getServer().receiver.setSenderSettleMode(getServer().receiver.getRemoteSenderSettleMode());
getServer().receiver.setReceiverSettleMode(getServer().receiver.getRemoteReceiverSettleMode());
org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget =
getServer().receiver.getRemoteTarget();
assertTerminusEquals(getClient().target, serverRemoteTarget);
getServer().receiver.setTarget(serverRemoteTarget);
assertEndpointState(getServer().receiver, UNINITIALIZED, ACTIVE);
getServer().receiver.open();
assertEndpointState(getServer().receiver, ACTIVE, ACTIVE);
// write server attach to server outputBuffer already
ByteBuffer serverBuffer = getServer().transport.getOutputBuffer();
LOGGER.fine(bold("~~~~~~~~~~ now write the yet-unsent clientBuffer
(with 'detach') to the server"));
getServer().transport.getInputBuffer().put(clientBuffer);
getClient().transport.outputConsumed();
getServer().transport.processInput().checkIsOk();
LOGGER.fine(bold("~~~~~~~~~~ now write the yet-unsent serverBuffer
(with 'attach') to the client"));
getClient().transport.getInputBuffer().put(serverBuffer);
getClient().transport.processInput().checkIsOk();
getServer().transport.outputConsumed();
// assert the server got the detach
assertEndpointState(getServer().receiver, ACTIVE, CLOSED);
// let the server respond with a detach
getServer().receiver.close();
LOGGER.fine(bold("~~~~~~~~~~ pump detach to client"));
pumpServerToClient();
assertEndpointState(getClient().sender, CLOSED, CLOSED);
getClient().sender.free();
}
getClient().transport.unbind();
LOGGER.fine(bold("======== About to close and free client's
connection"));
getClient().connection.close();
getClient().connection.free();
}
private Integer getSessionRefCount(final Session protonSession) throws
NoSuchFieldException, IllegalAccessException {
if (!(protonSession instanceof EndpointImpl)) {
return null;
}
final Field refcountField =
EndpointImpl.class.getDeclaredField("refcount");
refcountField.setAccessible(true);
return (Integer) refcountField.get(protonSession);
}
{code}
(Testcode using vertx-proton and separate client/server classes can also be
provided if needed.)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]