This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new 8e841980 PROTON-2899 Updates ahead of move to Netty 4.2.x
8e841980 is described below
commit 8e8419803c4c666dd3fe8e3f3a141ab7afe34b21
Author: Timothy Bish <[email protected]>
AuthorDate: Mon Jul 14 15:53:55 2025 -0400
PROTON-2899 Updates ahead of move to Netty 4.2.x
Stabilize some tests with intermittent failures and replace use of now
deprecated APIs with their non-deprecated versions ahead of move to a
newer release where those could be removed. Other minor cleanups of the
code to prepare for future netty updates.
---
.../client/transport/netty4/TcpTransport.java | 11 ++-
.../transport/netty4/WebSocketTransport.java | 2 +-
.../client/transport/netty4/NettyServer.java | 8 +-
.../client/transport/netty4/TcpTransportTest.java | 3 +-
.../protonj2/test/driver/ProtonTestServer.java | 4 +
.../test/driver/netty/netty4/Netty4Client.java | 2 +-
.../test/driver/netty/netty4/Netty4Server.java | 2 +-
.../protonj2/test/driver/ProtonTestClientTest.java | 3 +
.../qpid/protonj2/test/driver/utils/Wait.java | 105 +++++++++++++++++++++
9 files changed, 134 insertions(+), 6 deletions(-)
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransport.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransport.java
index a21f06e6..13d2c55e 100644
---
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransport.java
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransport.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -154,7 +155,15 @@ public class TcpTransport implements Transport {
configureNetty(bootstrap, options);
- bootstrap.connect(getHost(),
getPort()).addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+ bootstrap.connect(getHost(), getPort()).addListener(new
ChannelFutureListener() {
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws
Exception {
+ if (!future.isSuccess()) {
+ handleTransportFailure(future.channel(), future.cause());
+ }
+ }
+ });
return this;
}
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransport.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransport.java
index 5d3aae6c..9e28d65f 100644
---
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransport.java
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransport.java
@@ -153,7 +153,7 @@ public class WebSocketTransport extends TcpTransport {
pipeline.addLast(new HttpClientCodec());
pipeline.addLast(new HttpObjectAggregator(8192));
if (options.webSocketCompression()) {
- pipeline.addLast(WebSocketClientCompressionHandler.INSTANCE);
+ pipeline.addLast(new WebSocketClientCompressionHandler(0));
}
}
diff --git
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/NettyServer.java
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/NettyServer.java
index 87fbaac3..52b7273e 100644
---
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/NettyServer.java
+++
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/NettyServer.java
@@ -224,7 +224,7 @@ public abstract class NettyServer implements AutoCloseable {
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new HttpObjectAggregator(65536));
if (isUseWebSocketCompression()) {
- ch.pipeline().addLast(new
WebSocketServerCompressionHandler());
+ ch.pipeline().addLast(new
WebSocketServerCompressionHandler(0));
}
ch.pipeline().addLast(new
WebSocketServerProtocolHandler(getWebSocketPath(), "amqp", true, maxFrameSize));
}
@@ -250,6 +250,8 @@ public abstract class NettyServer implements AutoCloseable {
serverChannel.close().sync();
} catch (InterruptedException e) {
LOG.trace("Error on server channel close:", e);
+ } finally {
+ serverChannel = null;
}
// Shut down all event loops to terminate all threads.
@@ -261,6 +263,10 @@ public abstract class NettyServer implements AutoCloseable
{
LOG.trace("Shutting down worker group");
workerGroup.shutdownGracefully(0, timeout,
TimeUnit.MILLISECONDS).awaitUninterruptibly(timeout);
LOG.trace("Worker group shut down");
+
+ // allow a chance for full termination
+ bossGroup.awaitTermination(10, TimeUnit.MILLISECONDS);
+ workerGroup.awaitTermination(10, TimeUnit.MILLISECONDS);
}
}
diff --git
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java
index b7e2b669..8acea840 100644
---
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java
+++
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java
@@ -31,6 +31,7 @@ import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
@@ -79,7 +80,7 @@ public class TcpTransportTest extends
ImperativeClientTestCase {
protected volatile boolean transportInitialized;
protected volatile boolean transportConnected;
protected volatile boolean transportErrored;
- protected final List<Throwable> exceptions = new ArrayList<>();
+ protected final List<Throwable> exceptions =
Collections.synchronizedList(new ArrayList<>());
protected final List<ProtonBuffer> data = new ArrayList<>();
protected final AtomicInteger bytesRead = new AtomicInteger();
diff --git
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
index 9b7d6420..a3b55997 100644
---
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
+++
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
@@ -182,6 +182,10 @@ public class ProtonTestServer extends ProtonTestPeer {
return server.isWSCompressionActive();
}
+ public boolean hasClientConnection() {
+ return server.hasClientConnection();
+ }
+
@Override
public AMQPTestDriver getDriver() {
return driver;
diff --git
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Client.java
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Client.java
index b31c6e90..baef09ea 100644
---
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Client.java
+++
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Client.java
@@ -507,7 +507,7 @@ public final class Netty4Client implements NettyClient {
channel.pipeline().addLast(new HttpObjectAggregator(8192));
if (options.isWebSocketCompression()) {
channel.pipeline().addLast(new ClientWSCompressionObserver());
-
channel.pipeline().addLast(WebSocketClientCompressionHandler.INSTANCE);
+ channel.pipeline().addLast(new
WebSocketClientCompressionHandler(0));
}
}
diff --git
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Server.java
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Server.java
index 94a891dc..bae6f674 100644
---
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Server.java
+++
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Server.java
@@ -284,7 +284,7 @@ public final class Netty4Server implements NettyServer {
ch.pipeline().addLast(new HttpObjectAggregator(65536));
if (options.isWebSocketCompression()) {
ch.pipeline().addLast(new
ServerWSCompressionObserver());
- ch.pipeline().addLast(new
WebSocketServerCompressionHandler());
+ ch.pipeline().addLast(new
WebSocketServerCompressionHandler(0));
}
ch.pipeline().addLast(new
WebSocketServerProtocolHandler(getWebSocketPath(), "amqp", true, maxFrameSize));
}
diff --git
a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java
b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java
index e4036867..95192f44 100644
---
a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java
+++
b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java
@@ -27,6 +27,7 @@ import
org.apache.qpid.protonj2.test.driver.codec.security.SaslCode;
import org.apache.qpid.protonj2.test.driver.codec.transport.AMQPHeader;
import org.apache.qpid.protonj2.test.driver.codec.transport.Open;
import org.apache.qpid.protonj2.test.driver.utils.TestPeerTestsBase;
+import org.apache.qpid.protonj2.test.driver.utils.Wait;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
@@ -129,6 +130,8 @@ class ProtonTestClientTest extends TestPeerTestsBase {
client.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
+ Wait.assertFalse(() -> peer.hasClientConnection());
+
try (ProtonTestClient client = new ProtonTestClient()) {
client.connect(remoteURI.getHost(), remoteURI.getPort());
client.expectAMQPHeader();
diff --git
a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/utils/Wait.java
b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/utils/Wait.java
new file mode 100644
index 00000000..9e3d0d35
--- /dev/null
+++
b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/utils/Wait.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.protonj2.test.driver.utils;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.concurrent.TimeUnit;
+
+public class Wait {
+
+ public static final long MAX_WAIT_MILLIS = 10 * 1000;
+ public static final long SLEEP_MILLIS = 50;
+ public static final String DEFAULT_FAILURE_MESSAGE = "Expected condition
was not met";
+
+ @FunctionalInterface
+ public interface Condition {
+ boolean isSatisfied() throws Exception;
+ }
+
+ public static void assertTrue(Condition condition) {
+ assertTrue(DEFAULT_FAILURE_MESSAGE, condition);
+ }
+
+ public static void assertFalse(Condition condition) throws Exception {
+ assertTrue(() -> !condition.isSatisfied());
+ }
+
+ public static void assertFalse(String failureMessage, Condition condition)
{
+ assertTrue(failureMessage, () -> !condition.isSatisfied());
+ }
+
+ public static void assertFalse(String failureMessage, Condition condition,
final long duration) {
+ assertTrue(failureMessage, () -> !condition.isSatisfied(), duration,
SLEEP_MILLIS);
+ }
+
+ public static void assertFalse(Condition condition, final long duration,
final long sleep) {
+ assertTrue(DEFAULT_FAILURE_MESSAGE, () -> !condition.isSatisfied(),
duration, sleep);
+ }
+
+ public static void assertTrue(Condition condition, final long duration) {
+ assertTrue(DEFAULT_FAILURE_MESSAGE, condition, duration, SLEEP_MILLIS);
+ }
+
+ public static void assertTrue(String failureMessage, Condition condition) {
+ assertTrue(failureMessage, condition, MAX_WAIT_MILLIS);
+ }
+
+ public static void assertTrue(String failureMessage, Condition condition,
final long duration) {
+ assertTrue(failureMessage, condition, duration, SLEEP_MILLIS);
+ }
+
+ public static void assertTrue(Condition condition, final long duration,
final long sleep) throws Exception {
+ assertTrue(DEFAULT_FAILURE_MESSAGE, condition, duration, sleep);
+ }
+
+ public static void assertTrue(String failureMessage, Condition condition,
final long duration, final long sleep) {
+ boolean result = waitFor(condition, duration, sleep);
+
+ if (!result) {
+ fail(failureMessage);
+ }
+ }
+
+ public static boolean waitFor(Condition condition) throws Exception {
+ return waitFor(condition, MAX_WAIT_MILLIS);
+ }
+
+ public static boolean waitFor(final Condition condition, final long
duration) throws Exception {
+ return waitFor(condition, duration, SLEEP_MILLIS);
+ }
+
+ public static boolean waitFor(final Condition condition, final long
durationMillis, final long sleepMillis) {
+ try {
+ final long expiry = System.currentTimeMillis() + durationMillis;
+ boolean conditionSatisfied = condition.isSatisfied();
+
+ while (!conditionSatisfied && System.currentTimeMillis() < expiry)
{
+ if (sleepMillis == 0) {
+ Thread.yield();
+ } else {
+ TimeUnit.MILLISECONDS.sleep(sleepMillis);
+ }
+ conditionSatisfied = condition.isSatisfied();
+ }
+
+ return conditionSatisfied;
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]