[
https://issues.apache.org/jira/browse/PROTON-2177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Carsten Lohmann updated PROTON-2177:
------------------------------------
Description:
Invoking free() on a link may result in such an exception if the preconditions
below are met:
{noformat}
java.lang.IllegalStateException
at
org.apache.qpid.proton.engine.impl.EndpointImpl.decref(EndpointImpl.java:54)
at
org.apache.qpid.proton.engine.impl.LinkImpl.postFinal(LinkImpl.java:128)
at
org.apache.qpid.proton.engine.impl.EndpointImpl.decref(EndpointImpl.java:52)
at
org.apache.qpid.proton.engine.impl.TransportLink.clearRemoteHandle(TransportLink.java:125)
at
org.apache.qpid.proton.engine.impl.TransportImpl.handleDetach(TransportImpl.java:1379)
at
org.apache.qpid.proton.engine.impl.TransportImpl.handleDetach(TransportImpl.java:70)
at org.apache.qpid.proton.amqp.transport.Detach.invoke(Detach.java:86)
at
org.apache.qpid.proton.engine.impl.TransportImpl.handleFrame(TransportImpl.java:1453)
at
org.apache.qpid.proton.engine.impl.FrameParser.input(FrameParser.java:425)
at
org.apache.qpid.proton.engine.impl.FrameParser.process(FrameParser.java:536)
at
org.apache.qpid.proton.engine.impl.TransportImpl.process(TransportImpl.java:1570)
at
org.apache.qpid.proton.engine.impl.TransportImpl.processInput(TransportImpl.java:1528)
{noformat}
The scenario:
We have implemented the logic for creating AMQP links with a timeout-mechanism.
That means that after invoking {{link.open()}} we wait for a predefined time
and if we haven't received the {{attach}} frame from the server at that point,
we call {{link.close()}} and then {{link.free()}} to avoid having a memory leak.
This has mostly worked well so far. In cases where the server was not sending
the attach frame in time, after calling {{sender.close()}} the server usually
finally responded with a {{detach}} frame (instead of sending an {{attach}} in
between).
But lately, when testing with a high number of links (>10000) we sometimes
encountered such a server behaviour (with Qpid Dispatch Router as server):
- client sends {{attach}} frame
- linkEstablishmentTimeout: server doesn't respond in time so client invokes
{{link.close()}} and then {{link.free()}}
- *server sends an attach frame*
- server sends a detach frame
If that happens mulitple times on the same session, the above exception occurs
when calling {{link.free()}}. Afterwards there are 'socked closed' exceptions.
If we don't invoke {{link.free}} as part of the linkEstablishmentTimeout
handling, the exception doesn't occur.
But not invoking {{link.free()}} would create a memory leak if the server
doesn't respond at all to the {{attach}} frame.
---
The issue can be reproduced with this test method (to be run as an additional
method in the "org.apache.qpid.proton.systemtests.FreeTest" class)
{code:java|collapse=true}
@Test
public void testFreeOnLinkEstablishmentTimeout() throws Exception {
LOGGER.fine(bold("======== About to create transports"));
getClient().transport = Proton.transport();
ProtocolTracerEnabler.setProtocolTracer(getClient().transport,
TestLoggingHelper.CLIENT_PREFIX);
getServer().transport = Proton.transport();
ProtocolTracerEnabler.setProtocolTracer(getServer().transport, "
" + TestLoggingHelper.SERVER_PREFIX);
getClient().connection = Proton.connection();
getClient().transport.bind(getClient().connection);
getServer().connection = Proton.connection();
getServer().transport.bind(getServer().connection);
LOGGER.fine(bold("======== About to open connections"));
getClient().connection.open();
getServer().connection.open();
doOutputInputCycle();
LOGGER.fine(bold("======== About to open session"));
getClient().session = getClient().connection.session();
getClient().session.open();
pumpClientToServer();
getServer().session =
getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE));
assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE);
getServer().session.open();
assertEndpointState(getServer().session, ACTIVE, ACTIVE);
pumpServerToClient();
assertEndpointState(getClient().session, ACTIVE, ACTIVE);
for (int i = 0; i < 5; i++) {
LOGGER.fine("\n\n");
LOGGER.fine(bold("======== About to create client sender " + i + ";
refcount on session: " + getSessionRefCount(getClient().session)));
getClient().source = new Source();
getClient().source.setAddress(null);
getClient().target = new Target();
getClient().target.setAddress("myQueue");
getClient().sender = getClient().session.sender("sender" + i);
getClient().sender.setTarget(getClient().target);
getClient().sender.setSource(getClient().source);
getClient().sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
getClient().sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
assertEndpointState(getClient().sender, UNINITIALIZED,
UNINITIALIZED);
getClient().sender.open();
assertEndpointState(getClient().sender, ACTIVE, UNINITIALIZED);
pumpClientToServer();
LOGGER.fine(bold("======== client: invoke close & free already, as
would be done in linkEstablishmentTimeout handling"));
getClient().sender.close();
getClient().sender.free(); // skipping this, the error doesn't
occur; but how to prevent a memory leak if no attach frame ever is received?
// write to outputBuffer already
ByteBuffer clientBuffer = getClient().transport.getOutputBuffer();
LOGGER.fine(bold("======== About to set up server receiver (not
having gotten the detach yet)"));
getServer().receiver = (Receiver)
getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE));
getServer().receiver.setSenderSettleMode(getServer().receiver.getRemoteSenderSettleMode());
getServer().receiver.setReceiverSettleMode(getServer().receiver.getRemoteReceiverSettleMode());
org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget =
getServer().receiver.getRemoteTarget();
assertTerminusEquals(getClient().target, serverRemoteTarget);
getServer().receiver.setTarget(serverRemoteTarget);
assertEndpointState(getServer().receiver, UNINITIALIZED, ACTIVE);
getServer().receiver.open();
assertEndpointState(getServer().receiver, ACTIVE, ACTIVE);
// write server attach to server outputBuffer already
ByteBuffer serverBuffer = getServer().transport.getOutputBuffer();
LOGGER.fine(bold("~~~~~~~~~~ now write the yet-unsent clientBuffer
(with 'detach') to the server"));
getServer().transport.getInputBuffer().put(clientBuffer);
getClient().transport.outputConsumed();
getServer().transport.processInput().checkIsOk();
LOGGER.fine(bold("~~~~~~~~~~ now write the yet-unsent serverBuffer
(with 'attach') to the client"));
getClient().transport.getInputBuffer().put(serverBuffer);
getClient().transport.processInput().checkIsOk();
getServer().transport.outputConsumed();
// assert the server got the detach
assertEndpointState(getServer().receiver, ACTIVE, CLOSED);
// let the server respond with a detach
getServer().receiver.close();
LOGGER.fine(bold("~~~~~~~~~~ pump detach to client"));
pumpServerToClient();
assertEndpointState(getClient().sender, CLOSED, CLOSED);
getClient().sender.free();
}
getClient().transport.unbind();
LOGGER.fine(bold("======== About to close and free client's
connection"));
getClient().connection.close();
getClient().connection.free();
}
private Integer getSessionRefCount(final Session protonSession) throws
NoSuchFieldException, IllegalAccessException {
if (!(protonSession instanceof EndpointImpl)) {
return null;
}
final Field refcountField =
EndpointImpl.class.getDeclaredField("refcount");
refcountField.setAccessible(true);
return (Integer) refcountField.get(protonSession);
}
{code}
(Testcode using vertx-proton and separate client/server classes can also be
provided if needed.)
was:
Invoking free() on a link may result in such an exception if the preconditions
below are met:
{noformat}
java.lang.IllegalStateException
at
org.apache.qpid.proton.engine.impl.EndpointImpl.decref(EndpointImpl.java:54)
at
org.apache.qpid.proton.engine.impl.LinkImpl.postFinal(LinkImpl.java:128)
at
org.apache.qpid.proton.engine.impl.EndpointImpl.decref(EndpointImpl.java:52)
at
org.apache.qpid.proton.engine.impl.TransportLink.clearRemoteHandle(TransportLink.java:125)
at
org.apache.qpid.proton.engine.impl.TransportImpl.handleDetach(TransportImpl.java:1379)
at
org.apache.qpid.proton.engine.impl.TransportImpl.handleDetach(TransportImpl.java:70)
at org.apache.qpid.proton.amqp.transport.Detach.invoke(Detach.java:86)
at
org.apache.qpid.proton.engine.impl.TransportImpl.handleFrame(TransportImpl.java:1453)
at
org.apache.qpid.proton.engine.impl.FrameParser.input(FrameParser.java:425)
at
org.apache.qpid.proton.engine.impl.FrameParser.process(FrameParser.java:536)
at
org.apache.qpid.proton.engine.impl.TransportImpl.process(TransportImpl.java:1570)
at
org.apache.qpid.proton.engine.impl.TransportImpl.processInput(TransportImpl.java:1528)
{noformat}
The scenario:
We have implemented the logic for creating AMQP links with a timeout-mechanism.
That means that after invoking {{link.open()}} we wait for a predefined time
and if we haven't received the {{attach}} frame from the server at that point,
we call {{link.close()}} and then {{link.free()}} to avoid having a memory leak.
This has mostly worked well so far. In cases where the server was not sending
the attach frame in time, after calling {{sender.close()}} the server usually
finally responded with a {{detach}} frame (instead of sending an {{attach}} in
between).
But lately, when testing with a high number of links (>10000) we sometimes
encountered such a server behaviour (Qpid):
- client sends {{attach}} frame
- linkEstablishmentTimeout: server doesn't respond in time so client invokes
{{link.close()}} and then {{link.free()}}
- *server sends an attach frame*
- server sends a detach frame
If that happens mulitple times on the same session, the above exception occurs
when calling {{link.free()}}. Afterwards there are 'socked closed' exceptions.
If we don't invoke {{link.free}} as part of the linkEstablishmentTimeout
handling, the exception doesn't occur.
But not invoking {{link.free()}} would create a memory leak if the server
doesn't respond at all to the {{attach}} frame.
---
The issue can be reproduced with this test method (to be run as an additional
method in the "org.apache.qpid.proton.systemtests.FreeTest" class)
{code:java|collapse=true}
@Test
public void testFreeOnLinkEstablishmentTimeout() throws Exception {
LOGGER.fine(bold("======== About to create transports"));
getClient().transport = Proton.transport();
ProtocolTracerEnabler.setProtocolTracer(getClient().transport,
TestLoggingHelper.CLIENT_PREFIX);
getServer().transport = Proton.transport();
ProtocolTracerEnabler.setProtocolTracer(getServer().transport, "
" + TestLoggingHelper.SERVER_PREFIX);
getClient().connection = Proton.connection();
getClient().transport.bind(getClient().connection);
getServer().connection = Proton.connection();
getServer().transport.bind(getServer().connection);
LOGGER.fine(bold("======== About to open connections"));
getClient().connection.open();
getServer().connection.open();
doOutputInputCycle();
LOGGER.fine(bold("======== About to open session"));
getClient().session = getClient().connection.session();
getClient().session.open();
pumpClientToServer();
getServer().session =
getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE));
assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE);
getServer().session.open();
assertEndpointState(getServer().session, ACTIVE, ACTIVE);
pumpServerToClient();
assertEndpointState(getClient().session, ACTIVE, ACTIVE);
for (int i = 0; i < 5; i++) {
LOGGER.fine("\n\n");
LOGGER.fine(bold("======== About to create client sender " + i + ";
refcount on session: " + getSessionRefCount(getClient().session)));
getClient().source = new Source();
getClient().source.setAddress(null);
getClient().target = new Target();
getClient().target.setAddress("myQueue");
getClient().sender = getClient().session.sender("sender" + i);
getClient().sender.setTarget(getClient().target);
getClient().sender.setSource(getClient().source);
getClient().sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
getClient().sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
assertEndpointState(getClient().sender, UNINITIALIZED,
UNINITIALIZED);
getClient().sender.open();
assertEndpointState(getClient().sender, ACTIVE, UNINITIALIZED);
pumpClientToServer();
LOGGER.fine(bold("======== client: invoke close & free already, as
would be done in linkEstablishmentTimeout handling"));
getClient().sender.close();
getClient().sender.free(); // skipping this, the error doesn't
occur; but how to prevent a memory leak if no attach frame ever is received?
// write to outputBuffer already
ByteBuffer clientBuffer = getClient().transport.getOutputBuffer();
LOGGER.fine(bold("======== About to set up server receiver (not
having gotten the detach yet)"));
getServer().receiver = (Receiver)
getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE));
getServer().receiver.setSenderSettleMode(getServer().receiver.getRemoteSenderSettleMode());
getServer().receiver.setReceiverSettleMode(getServer().receiver.getRemoteReceiverSettleMode());
org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget =
getServer().receiver.getRemoteTarget();
assertTerminusEquals(getClient().target, serverRemoteTarget);
getServer().receiver.setTarget(serverRemoteTarget);
assertEndpointState(getServer().receiver, UNINITIALIZED, ACTIVE);
getServer().receiver.open();
assertEndpointState(getServer().receiver, ACTIVE, ACTIVE);
// write server attach to server outputBuffer already
ByteBuffer serverBuffer = getServer().transport.getOutputBuffer();
LOGGER.fine(bold("~~~~~~~~~~ now write the yet-unsent clientBuffer
(with 'detach') to the server"));
getServer().transport.getInputBuffer().put(clientBuffer);
getClient().transport.outputConsumed();
getServer().transport.processInput().checkIsOk();
LOGGER.fine(bold("~~~~~~~~~~ now write the yet-unsent serverBuffer
(with 'attach') to the client"));
getClient().transport.getInputBuffer().put(serverBuffer);
getClient().transport.processInput().checkIsOk();
getServer().transport.outputConsumed();
// assert the server got the detach
assertEndpointState(getServer().receiver, ACTIVE, CLOSED);
// let the server respond with a detach
getServer().receiver.close();
LOGGER.fine(bold("~~~~~~~~~~ pump detach to client"));
pumpServerToClient();
assertEndpointState(getClient().sender, CLOSED, CLOSED);
getClient().sender.free();
}
getClient().transport.unbind();
LOGGER.fine(bold("======== About to close and free client's
connection"));
getClient().connection.close();
getClient().connection.free();
}
private Integer getSessionRefCount(final Session protonSession) throws
NoSuchFieldException, IllegalAccessException {
if (!(protonSession instanceof EndpointImpl)) {
return null;
}
final Field refcountField =
EndpointImpl.class.getDeclaredField("refcount");
refcountField.setAccessible(true);
return (Integer) refcountField.get(protonSession);
}
{code}
(Testcode using vertx-proton and separate client/server classes can also be
provided if needed.)
> IllegalStateException when freeing link as part of timeout handling
> -------------------------------------------------------------------
>
> Key: PROTON-2177
> URL: https://issues.apache.org/jira/browse/PROTON-2177
> Project: Qpid Proton
> Issue Type: Bug
> Components: proton-j
> Affects Versions: proton-j-0.33.3
> Reporter: Carsten Lohmann
> Priority: Major
>
> Invoking free() on a link may result in such an exception if the
> preconditions below are met:
> {noformat}
> java.lang.IllegalStateException
> at
> org.apache.qpid.proton.engine.impl.EndpointImpl.decref(EndpointImpl.java:54)
> at
> org.apache.qpid.proton.engine.impl.LinkImpl.postFinal(LinkImpl.java:128)
> at
> org.apache.qpid.proton.engine.impl.EndpointImpl.decref(EndpointImpl.java:52)
> at
> org.apache.qpid.proton.engine.impl.TransportLink.clearRemoteHandle(TransportLink.java:125)
> at
> org.apache.qpid.proton.engine.impl.TransportImpl.handleDetach(TransportImpl.java:1379)
> at
> org.apache.qpid.proton.engine.impl.TransportImpl.handleDetach(TransportImpl.java:70)
> at org.apache.qpid.proton.amqp.transport.Detach.invoke(Detach.java:86)
> at
> org.apache.qpid.proton.engine.impl.TransportImpl.handleFrame(TransportImpl.java:1453)
> at
> org.apache.qpid.proton.engine.impl.FrameParser.input(FrameParser.java:425)
> at
> org.apache.qpid.proton.engine.impl.FrameParser.process(FrameParser.java:536)
> at
> org.apache.qpid.proton.engine.impl.TransportImpl.process(TransportImpl.java:1570)
> at
> org.apache.qpid.proton.engine.impl.TransportImpl.processInput(TransportImpl.java:1528)
> {noformat}
> The scenario:
> We have implemented the logic for creating AMQP links with a
> timeout-mechanism. That means that after invoking {{link.open()}} we wait for
> a predefined time and if we haven't received the {{attach}} frame from the
> server at that point, we call {{link.close()}} and then {{link.free()}} to
> avoid having a memory leak.
> This has mostly worked well so far. In cases where the server was not sending
> the attach frame in time, after calling {{sender.close()}} the server usually
> finally responded with a {{detach}} frame (instead of sending an {{attach}}
> in between).
> But lately, when testing with a high number of links (>10000) we sometimes
> encountered such a server behaviour (with Qpid Dispatch Router as server):
> - client sends {{attach}} frame
> - linkEstablishmentTimeout: server doesn't respond in time so client invokes
> {{link.close()}} and then {{link.free()}}
> - *server sends an attach frame*
> - server sends a detach frame
> If that happens mulitple times on the same session, the above exception
> occurs when calling {{link.free()}}. Afterwards there are 'socked closed'
> exceptions.
> If we don't invoke {{link.free}} as part of the linkEstablishmentTimeout
> handling, the exception doesn't occur.
> But not invoking {{link.free()}} would create a memory leak if the server
> doesn't respond at all to the {{attach}} frame.
> ---
> The issue can be reproduced with this test method (to be run as an additional
> method in the "org.apache.qpid.proton.systemtests.FreeTest" class)
> {code:java|collapse=true}
> @Test
> public void testFreeOnLinkEstablishmentTimeout() throws Exception {
> LOGGER.fine(bold("======== About to create transports"));
> getClient().transport = Proton.transport();
> ProtocolTracerEnabler.setProtocolTracer(getClient().transport,
> TestLoggingHelper.CLIENT_PREFIX);
> getServer().transport = Proton.transport();
> ProtocolTracerEnabler.setProtocolTracer(getServer().transport, "
> " + TestLoggingHelper.SERVER_PREFIX);
> getClient().connection = Proton.connection();
> getClient().transport.bind(getClient().connection);
> getServer().connection = Proton.connection();
> getServer().transport.bind(getServer().connection);
> LOGGER.fine(bold("======== About to open connections"));
> getClient().connection.open();
> getServer().connection.open();
> doOutputInputCycle();
> LOGGER.fine(bold("======== About to open session"));
> getClient().session = getClient().connection.session();
> getClient().session.open();
> pumpClientToServer();
> getServer().session =
> getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE));
> assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE);
> getServer().session.open();
> assertEndpointState(getServer().session, ACTIVE, ACTIVE);
> pumpServerToClient();
> assertEndpointState(getClient().session, ACTIVE, ACTIVE);
> for (int i = 0; i < 5; i++) {
> LOGGER.fine("\n\n");
> LOGGER.fine(bold("======== About to create client sender " + i +
> "; refcount on session: " + getSessionRefCount(getClient().session)));
> getClient().source = new Source();
> getClient().source.setAddress(null);
> getClient().target = new Target();
> getClient().target.setAddress("myQueue");
> getClient().sender = getClient().session.sender("sender" + i);
> getClient().sender.setTarget(getClient().target);
> getClient().sender.setSource(getClient().source);
>
> getClient().sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
>
> getClient().sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
> assertEndpointState(getClient().sender, UNINITIALIZED,
> UNINITIALIZED);
> getClient().sender.open();
> assertEndpointState(getClient().sender, ACTIVE, UNINITIALIZED);
> pumpClientToServer();
> LOGGER.fine(bold("======== client: invoke close & free already,
> as would be done in linkEstablishmentTimeout handling"));
> getClient().sender.close();
> getClient().sender.free(); // skipping this, the error doesn't
> occur; but how to prevent a memory leak if no attach frame ever is received?
> // write to outputBuffer already
> ByteBuffer clientBuffer = getClient().transport.getOutputBuffer();
> LOGGER.fine(bold("======== About to set up server receiver (not
> having gotten the detach yet)"));
> getServer().receiver = (Receiver)
> getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE));
>
> getServer().receiver.setSenderSettleMode(getServer().receiver.getRemoteSenderSettleMode());
>
> getServer().receiver.setReceiverSettleMode(getServer().receiver.getRemoteReceiverSettleMode());
> org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget =
> getServer().receiver.getRemoteTarget();
> assertTerminusEquals(getClient().target, serverRemoteTarget);
> getServer().receiver.setTarget(serverRemoteTarget);
> assertEndpointState(getServer().receiver, UNINITIALIZED, ACTIVE);
> getServer().receiver.open();
> assertEndpointState(getServer().receiver, ACTIVE, ACTIVE);
> // write server attach to server outputBuffer already
> ByteBuffer serverBuffer = getServer().transport.getOutputBuffer();
> LOGGER.fine(bold("~~~~~~~~~~ now write the yet-unsent
> clientBuffer (with 'detach') to the server"));
> getServer().transport.getInputBuffer().put(clientBuffer);
> getClient().transport.outputConsumed();
> getServer().transport.processInput().checkIsOk();
> LOGGER.fine(bold("~~~~~~~~~~ now write the yet-unsent
> serverBuffer (with 'attach') to the client"));
> getClient().transport.getInputBuffer().put(serverBuffer);
> getClient().transport.processInput().checkIsOk();
> getServer().transport.outputConsumed();
> // assert the server got the detach
> assertEndpointState(getServer().receiver, ACTIVE, CLOSED);
> // let the server respond with a detach
> getServer().receiver.close();
> LOGGER.fine(bold("~~~~~~~~~~ pump detach to client"));
> pumpServerToClient();
> assertEndpointState(getClient().sender, CLOSED, CLOSED);
> getClient().sender.free();
> }
> getClient().transport.unbind();
> LOGGER.fine(bold("======== About to close and free client's
> connection"));
> getClient().connection.close();
> getClient().connection.free();
> }
> private Integer getSessionRefCount(final Session protonSession) throws
> NoSuchFieldException, IllegalAccessException {
> if (!(protonSession instanceof EndpointImpl)) {
> return null;
> }
> final Field refcountField =
> EndpointImpl.class.getDeclaredField("refcount");
> refcountField.setAccessible(true);
> return (Integer) refcountField.get(protonSession);
> }
> {code}
> (Testcode using vertx-proton and separate client/server classes can also be
> provided if needed.)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]