This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new 7b9cddbc PROTON-2900 Ensure idle timeout checks are scheduled on
remote open
7b9cddbc is described below
commit 7b9cddbc8c5cffc18c59d5427944925946157b13
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Jul 18 11:20:31 2025 -0400
PROTON-2900 Ensure idle timeout checks are scheduled on remote open
When the remote open performative arrives update the auto idle scheduled
task
if it exists such that a shorter remote idle timeout than the local value is
honored by the write check deadline computation.
---
.../protonj2/client/impl/ClientConnection.java | 3 +-
.../qpid/protonj2/client/impl/ConnectionTest.java | 27 ++++++++
.../org/apache/qpid/protonj2/engine/Engine.java | 12 +++-
.../protonj2/engine/impl/ProtonConnection.java | 4 +-
.../qpid/protonj2/engine/impl/ProtonEngine.java | 73 ++++++++++++++++------
.../protonj2/engine/impl/ProtonEngineTest.java | 11 +---
6 files changed, 99 insertions(+), 31 deletions(-)
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
index f0542a06..2a3ba67b 100644
---
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
@@ -620,8 +620,6 @@ public final class ClientConnection implements Connection {
//----- Private implementation events handlers and utility methods
private void handleLocalOpen(org.apache.qpid.protonj2.engine.Connection
connection) {
- connection.tickAuto(getScheduler());
-
if (options.openTimeout() > 0) {
executor.schedule(() -> {
if (!openFuture.isDone()) {
@@ -921,6 +919,7 @@ public final class ClientConnection implements Connection {
.localCloseHandler(this::handleLocalClose)
.openHandler(this::handleRemoteOpen)
.closeHandler(this::handleRemoteClose);
+ protonConnection.tickAuto(getScheduler());
configureEngineSaslSupport();
}
diff --git
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ConnectionTest.java
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ConnectionTest.java
index e6915238..bb5afaa9 100644
---
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ConnectionTest.java
+++
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ConnectionTest.java
@@ -1865,6 +1865,33 @@ public class ConnectionTest extends
ImperativeClientTestCase {
}
}
+ @Test
+ public void
testConnectToRemoteWhoseIdleTimeoutIsShorterThanLocalSetValue() throws
Exception {
+ try (ProtonTestServer peer = new
ProtonTestServer(testServerOptions())) {
+ peer.expectSASLAnonymousConnect();
+
peer.expectOpen().withIdleTimeOut(15_000).respond().withIdleTimeOut(1000);
+ peer.expectEmptyFrame();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Connect test started, peer listening on: {}", remoteURI);
+
+ Client container = Client.create();
+ ConnectionOptions options =
connectionOptions().idleTimeout(30_000);
+ Connection connection = container.connect(remoteURI.getHost(),
remoteURI.getPort(), options);
+
+ connection.openFuture().get(10, TimeUnit.SECONDS);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectClose().respond();
+
+ connection.closeAsync().get(10, TimeUnit.SECONDS);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
@Disabled("Disabled due to requirement of hard coded port")
@Test
public void testLocalPortOption() throws Exception {
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/Engine.java
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/Engine.java
index f4bda43c..a0ad5136 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/Engine.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/Engine.java
@@ -187,12 +187,16 @@ public interface Engine extends Consumer<ProtonBuffer> {
* Allows the engine to manage idle timeout processing by providing it the
single threaded executor
* context where all transport work is done which ensures singled threaded
access while removing the
* need for the client library or server application to manage calls to
the {@link Engine#tick} methods.
+ * <p>
+ * The API should allow for configuring auto idle timeout handling before
the connection has opened and
+ * should react to both local and remote open performatives passing
through the engine to configure read
+ * and write checks under the constraints of the local and remote idle
timeout configurations.
*
* @param executor
* The single threaded execution context where all engine work takes
place.
*
* @throws IllegalStateException if the {@link Engine} is already
performing auto tick handling.
- * @throws EngineStateException if the Engine state precludes accepting
new input.
+ * @throws EngineStateException if the Engine state precludes accepting
new input (shutdown or failed).
*
* @return this {@link Engine}
*/
@@ -202,12 +206,16 @@ public interface Engine extends Consumer<ProtonBuffer> {
* Allows the engine to manage idle timeout processing by providing it the
single threaded executor
* context where all transport work is done which ensures singled threaded
access while removing the
* need for the client library or server application to manage calls to
the {@link Engine#tick} methods.
+ * <p>
+ * The API should allow for configuring auto idle timeout handling before
the connection has opened and
+ * should react to both local and remote open performatives passing
through the engine to configure read
+ * and write checks under the constraints of the local and remote idle
timeout configurations.
*
* @param scheduler
* The single threaded execution context where all engine work takes
place.
*
* @throws IllegalStateException if the {@link Engine} is already
performing auto tick handling.
- * @throws EngineStateException if the Engine state precludes accepting
new input.
+ * @throws EngineStateException if the Engine state precludes accepting
new input (shutdown or failed).
*
* @return this {@link Engine}
*/
diff --git
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonConnection.java
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonConnection.java
index 185f0719..e22cee6a 100644
---
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonConnection.java
+++
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonConnection.java
@@ -94,7 +94,7 @@ public class ProtonConnection extends
ProtonEndpoint<Connection> implements Conn
* Create a new unbound Connection instance.
*
* @param engine
- * Parent engine that created and owns this {@link
Connection} insatnce.
+ * Parent engine that created and owns this {@link
Connection} instance.
*/
ProtonConnection(ProtonEngine engine) {
super(engine);
@@ -128,6 +128,7 @@ public class ProtonConnection extends
ProtonEndpoint<Connection> implements Conn
syncLocalStateWithRemote();
} finally {
fireLocalOpen();
+ engine.handleLocalOpen(this);
}
}
@@ -446,6 +447,7 @@ public class ProtonConnection extends
ProtonEndpoint<Connection> implements Conn
remoteOpen = open;
fireRemoteOpen();
+ engine.handleRemoteOpen(this);
}
@Override
diff --git
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonEngine.java
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonEngine.java
index b7904e14..a4c8a93e 100644
---
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonEngine.java
+++
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonEngine.java
@@ -25,6 +25,7 @@ import java.util.function.BiConsumer;
import org.apache.qpid.protonj2.buffer.ProtonBuffer;
import org.apache.qpid.protonj2.engine.AMQPPerformativeEnvelopePool;
+import org.apache.qpid.protonj2.engine.Connection;
import org.apache.qpid.protonj2.engine.ConnectionState;
import org.apache.qpid.protonj2.engine.Engine;
import org.apache.qpid.protonj2.engine.EnginePipeline;
@@ -232,20 +233,16 @@ public class ProtonEngine implements Engine {
Objects.requireNonNull(executor);
- if (connection.getState() != ConnectionState.ACTIVE) {
- throw new IllegalStateException("Cannot tick on a Connection that
is not opened.");
+ if (connection.getState() == ConnectionState.CLOSED) {
+ throw new IllegalStateException("Cannot tick on a Connection that
is closed.");
}
if (idleTimeoutExecutor != null) {
throw new IllegalStateException("Automatic ticking previously
initiated.");
}
- // TODO - As an additional feature of this method we could allow for
calling before connection is
- // opened such that it starts ticking either on open local and
also checks as a response to
- // remote open which seems might be needed anyway, see notes in
IdleTimeoutCheck class.
-
// Immediate run of the idle timeout check logic will decide
afterwards when / if we should
- // reschedule the idle timeout processing.
+ // reschedule the idle timeout processing based on the local and
remote state of the connection.
LOG.trace("Auto Idle Timeout Check being initiated");
idleTimeoutExecutor = executor;
idleTimeoutExecutor.execute(new IdleTimeoutCheck());
@@ -380,6 +377,48 @@ public class ProtonEngine implements Engine {
//----- Internal proton engine implementation
+ /**
+ * Called from the {@link Connection} that is linked to this engine when
the {@link Connection#open()}
+ * method is called and the connection has configured and updated its
state accordingly.
+ *
+ * @param connection
+ * The connection associated with this engine instance.
+ */
+ void handleLocalOpen(ProtonConnection connection) {
+ // When locally opened run the idle timeout check once after canceling
any
+ // currently scheduled instance to prevent any stacking of checks.
This will
+ // update the schedule to ensure local side idle timeouts get applied
on time.
+ if (idleTimeoutExecutor != null) {
+ if (nextIdleTimeoutCheck != null) {
+ nextIdleTimeoutCheck.cancel(false);
+ nextIdleTimeoutCheck = null;
+ }
+
+ idleTimeoutExecutor.execute(new IdleTimeoutCheck());
+ }
+ }
+
+ /**
+ * Called from the {@link Connection} that is linked to this engine after
is has received a remote
+ * Close performative and the connection has configured and updated its
state accordingly.
+ *
+ * @param connection
+ * The connection associated with this engine instance.
+ */
+ void handleRemoteOpen(ProtonConnection connection) {
+ // When remotely opened run the idle timeout check once after
canceling any
+ // currently scheduled instance to prevent any stacking of checks.
This will
+ // update the schedule to ensure remote side idle timeouts get applied
on time.
+ if (idleTimeoutExecutor != null) {
+ if (nextIdleTimeoutCheck != null) {
+ nextIdleTimeoutCheck.cancel(false);
+ nextIdleTimeoutCheck = null;
+ }
+
+ idleTimeoutExecutor.execute(new IdleTimeoutCheck());
+ }
+ }
+
ProtonEngine fireWrite(HeaderEnvelope frame) {
pipeline.fireWrite(frame);
return this;
@@ -507,15 +546,21 @@ public class ProtonEngine implements Engine {
private final class IdleTimeoutCheck implements Runnable {
- // TODO - Pick reasonable values
private final long MIN_IDLE_CHECK_INTERVAL = 1000;
private final long MAX_IDLE_CHECK_INTERVAL = 10000;
@Override
public void run() {
boolean checkScheduled = false;
+ boolean locallyOpen = connection.getState() ==
ConnectionState.ACTIVE;
- if (connection.getState() == ConnectionState.ACTIVE &&
!isShutdown()) {
+ // Ensure that any check currently scheduled is canceled and
cleared to prevent stacking.
+ if (nextIdleTimeoutCheck != null) {
+ nextIdleTimeoutCheck.cancel(false);
+ nextIdleTimeoutCheck = null;
+ }
+
+ if (locallyOpen && !isShutdown()) {
// Using nano time since it is not related to the wall clock,
which may change
long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
@@ -527,13 +572,11 @@ public class ProtonEngine implements Engine {
// Check methods will close down the engine and fire error
so we need to check that engine
// state is active and engine is not shutdown before
scheduling again.
- if (deadline != 0 && connection.getState() ==
ConnectionState.ACTIVE && state() == EngineState.STARTED) {
+ if (deadline != 0 && locallyOpen && state() ==
EngineState.STARTED) {
// Run the next idle check at half the deadline to try
and ensure we meet our
// obligation of sending our heart beat on time.
long delay = (deadline - now) / 2;
- // TODO - Some computation to work out a reasonable
delay that still compensates for
- // errors in scheduling while preventing over
eagerness.
delay = Math.max(MIN_IDLE_CHECK_INTERVAL, delay);
delay = Math.min(MAX_IDLE_CHECK_INTERVAL, delay);
@@ -541,18 +584,12 @@ public class ProtonEngine implements Engine {
LOG.trace("IdleTimeoutCheck rescheduling with delay:
{}", delay);
nextIdleTimeoutCheck =
idleTimeoutExecutor.schedule(this, delay, TimeUnit.MILLISECONDS);
}
-
- // TODO - If no local timeout but remote hasn't opened we
might return zero and not
- // schedule any ticking ? Possible solution is to
schedule after remote open
- // arrives if nothing set to run and remote
indicates it has an idle timeout.
-
} catch (Throwable t) {
LOG.trace("Auto Idle Timeout Check encountered error
during check: ", t);
}
}
if (!checkScheduled) {
- nextIdleTimeoutCheck = null;
LOG.trace("Auto Idle Timeout Check task exiting and will not
be rescheduled");
}
}
diff --git
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonEngineTest.java
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonEngineTest.java
index 21a465c5..c31f65e1 100644
---
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonEngineTest.java
+++
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonEngineTest.java
@@ -367,13 +367,8 @@ public class ProtonEngineTest extends
ProtonEngineTestSupport {
}
@Test
- public void testAutoTickFailsWhenConnectionNotOpenedNoLocalIdleSet()
throws EngineStateException {
- doTestAutoTickFailsBasedOnState(false, false, false, false);
- }
-
- @Test
- public void testAutoTickFailsWhenConnectionNotOpenedLocalIdleSet() throws
EngineStateException {
- doTestAutoTickFailsBasedOnState(true, false, false, false);
+ public void
testAutoTickFailsWhenConnectionNotClosedButEngineShutdownNoLocalIdleSet()
throws EngineStateException {
+ doTestAutoTickFailsBasedOnState(false, true, false, true);
}
@Test
@@ -418,7 +413,7 @@ public class ProtonEngineTest extends
ProtonEngineTestSupport {
try {
engine.tickAuto(Mockito.mock(ScheduledExecutorService.class));
- fail("Should not be able to tick an unopened connection");
+ fail("Should not be able to tick an closed connection or shutdown
engine");
} catch (IllegalStateException | EngineShutdownException error) {
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]