This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new 01be8de9 PROTON-2899 Add support for both netty 4.1 and 4.2
01be8de9 is described below
commit 01be8de9507613a40d305284d350c411b3e6ff58
Author: Timothy Bish <[email protected]>
AuthorDate: Thu Jul 17 13:39:04 2025 -0400
PROTON-2899 Add support for both netty 4.1 and 4.2
Allow the client to work with netty 4.2 and later while still
defaulting to testing and supporting netty 4.1 releases
---
.../client/transport/netty4/EpollSupport.java | 17 +-
.../client/transport/netty4/IOUringSupport.java | 99 +++++++++--
.../client/transport/netty4/KQueueSupport.java | 17 +-
.../client/transport/netty4/TcpTransportTest.java | 183 ++++++++++++---------
4 files changed, 229 insertions(+), 87 deletions(-)
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/EpollSupport.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/EpollSupport.java
index 7f815c0b..c9408652 100644
---
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/EpollSupport.java
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/EpollSupport.java
@@ -35,8 +35,12 @@ public final class EpollSupport {
public static final String NAME = "EPOLL";
public static boolean isAvailable(TransportOptions transportOptions) {
+ return transportOptions.allowNativeIO() && isAvailable();
+ }
+
+ public static boolean isAvailable() {
try {
- return transportOptions.allowNativeIO() && Epoll.isAvailable();
+ return Epoll.isAvailable();
} catch (NoClassDefFoundError ncdfe) {
LOG.debug("Unable to check for Epoll support due to missing class
definition", ncdfe);
return false;
@@ -44,10 +48,21 @@ public final class EpollSupport {
}
public static EventLoopGroup createGroup(int nThreads, ThreadFactory
ioThreadFactory) {
+ ensureAvailability();
+
return new EpollEventLoopGroup(nThreads, ioThreadFactory);
}
public static Class<? extends Channel> getChannelClass() {
+ ensureAvailability();
+
return EpollSocketChannel.class;
}
+
+ public static void ensureAvailability() {
+ if (!isAvailable()) {
+ throw new UnsupportedOperationException(
+ "Netty Epoll support is not enabled because the Netty library
indicates it is not present or disabled");
+ }
+ }
}
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/IOUringSupport.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/IOUringSupport.java
index bc33dc85..9e1d806b 100644
---
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/IOUringSupport.java
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/IOUringSupport.java
@@ -16,6 +16,8 @@
*/
package org.apache.qpid.protonj2.client.transport.netty4;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
import java.util.concurrent.ThreadFactory;
import org.apache.qpid.protonj2.client.TransportOptions;
@@ -24,30 +26,107 @@ import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
-import io.netty.incubator.channel.uring.IOUring;
-import io.netty.incubator.channel.uring.IOUringEventLoopGroup;
-import io.netty.incubator.channel.uring.IOUringSocketChannel;
+@SuppressWarnings("unchecked")
public final class IOUringSupport {
private static final Logger LOG =
LoggerFactory.getLogger(IOUringSupport.class);
public static final String NAME = "IO_URING";
- public static boolean isAvailable(TransportOptions transportOptions) {
+ private static final boolean AVAILABLE;
+ private static final boolean INCUBATOR_VARIANT;
+ private static final Class<? extends Channel> SOCKET_CHANNEL_CLASS;
+ private static final Constructor<?> EVENTLOOP_CONSTRUCTOR;
+ private static final Method IOHANDLER_FACTORY;
+
+ static {
+ boolean available = false;
+ boolean incubator = false;
+ Class<? extends Channel> socketChannelClass = null;
+ Constructor<?> constructor = null;
+ Method ioHandlerFactory = null;
+
+ // Try for new Netty built in IoUring before falling back to incubator
checks
try {
- return transportOptions.allowNativeIO() && IOUring.isAvailable();
- } catch (NoClassDefFoundError ncdfe) {
- LOG.debug("Unable to check for IO_Uring support due to missing
class definition", ncdfe);
- return false;
+ final Class<?> ioUring =
Class.forName("io.netty.channel.uring.IoUring");
+ final Method isAvailable =
ioUring.getDeclaredMethod("isAvailable", (Class<?>[])null);
+ final Class<?> eventLoopGroup =
Class.forName("io.netty.channel.MultiThreadIoEventLoopGroup");
+ final Class<?> ioUringHandler =
Class.forName("io.netty.channel.uring.IoUringIoHandler");
+ final Class<?> ioUringHandlerFactory =
Class.forName("io.netty.channel.IoHandlerFactory");
+
+ constructor = eventLoopGroup.getDeclaredConstructor(int.class,
ThreadFactory.class, ioUringHandlerFactory);
+ ioHandlerFactory = ioUringHandler.getDeclaredMethod("newFactory");
+ socketChannelClass = (Class<? extends Channel>)
Class.forName("io.netty.channel.uring.IoUringSocketChannel");
+ available = (boolean) isAvailable.invoke(null);
+ } catch (Exception e) {
+ LOG.debug("Unable to enable netty io_uring support due to error",
e);
}
+
+ if (!available) {
+ try {
+ final Class<?> ioUring =
Class.forName("io.netty.incubator.channel.uring.IOUring");
+ final Method isAvailable =
ioUring.getDeclaredMethod("isAvailable");
+ final Class<?> eventLoopGroup =
Class.forName("io.netty.incubator.channel.uring.IOUringEventLoopGroup");
+
+ socketChannelClass = (Class<? extends Channel>)
Class.forName("io.netty.incubator.channel.uring.IOUringSocketChannel");
+ constructor = eventLoopGroup.getDeclaredConstructor(int.class,
ThreadFactory.class);
+ available = (boolean) isAvailable.invoke(null);
+ incubator = true;
+ } catch (Exception e) {
+ LOG.debug("Unable to enable netty incubator io_uring support
due to error", e);
+ }
+ }
+
+ AVAILABLE = available;
+ INCUBATOR_VARIANT = incubator;
+ SOCKET_CHANNEL_CLASS = socketChannelClass;
+ EVENTLOOP_CONSTRUCTOR = constructor;
+ IOHANDLER_FACTORY = ioHandlerFactory;
+ }
+
+ public static boolean isAvailable(TransportOptions transportOptions) {
+ return transportOptions.allowNativeIO() && AVAILABLE;
+ }
+
+ public static boolean isAvailable() {
+ return AVAILABLE;
}
public static EventLoopGroup createGroup(int nThreads, ThreadFactory
ioThreadFactory) {
- return new IOUringEventLoopGroup(nThreads, ioThreadFactory);
+ ensureAvailability();
+
+ Exception createError = null;
+
+ if (INCUBATOR_VARIANT) {
+ try {
+ return (EventLoopGroup)
EVENTLOOP_CONSTRUCTOR.newInstance(nThreads, ioThreadFactory);
+ } catch (Exception e) {
+ LOG.debug("Unable to create Netty incubator io_uring
EventLoopGroup due to error", e);
+ createError = e;
+ }
+ } else {
+ try {
+ return (EventLoopGroup)
EVENTLOOP_CONSTRUCTOR.newInstance(nThreads, ioThreadFactory,
IOHANDLER_FACTORY.invoke(null));
+ } catch (Exception e) {
+ LOG.debug("Unable to create Netty io_uring EventLoopGroup due
to error", e);
+ createError = e;
+ }
+ }
+
+ throw (Error) new UnsupportedOperationException("Netty io_uring failed
to create resource").initCause(createError);
}
public static Class<? extends Channel> getChannelClass() {
- return IOUringSocketChannel.class;
+ ensureAvailability();
+
+ return SOCKET_CHANNEL_CLASS;
+ }
+
+ public static void ensureAvailability() {
+ if (!AVAILABLE) {
+ throw new UnsupportedOperationException(
+ "Netty io_ring support is not enabled because the Netty
library indicates it is not present or disabled");
+ }
}
}
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/KQueueSupport.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/KQueueSupport.java
index 287da94d..4c8ef723 100644
---
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/KQueueSupport.java
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/KQueueSupport.java
@@ -35,8 +35,12 @@ public final class KQueueSupport {
public static final String NAME = "KQUEUE";
public static boolean isAvailable(TransportOptions transportOptions) {
+ return transportOptions.allowNativeIO() && isAvailable();
+ }
+
+ public static boolean isAvailable() {
try {
- return transportOptions.allowNativeIO() && KQueue.isAvailable();
+ return KQueue.isAvailable();
} catch (NoClassDefFoundError ncdfe) {
LOG.debug("Unable to check for KQueue support due to missing class
definition", ncdfe);
return false;
@@ -44,10 +48,21 @@ public final class KQueueSupport {
}
public static EventLoopGroup createGroup(int nThreads, ThreadFactory
ioThreadFactory) {
+ ensureAvailability();
+
return new KQueueEventLoopGroup(nThreads, ioThreadFactory);
}
public static Class<? extends Channel> getChannelClass() {
+ ensureAvailability();
+
return KQueueSocketChannel.class;
}
+
+ public static void ensureAvailability() {
+ if (!isAvailable()) {
+ throw new UnsupportedOperationException(
+ "Netty KQueue support is not enabled because the Netty library
indicates it is not present or disabled");
+ }
+ }
}
diff --git
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java
index 8acea840..105479a7 100644
---
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java
+++
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java
@@ -57,14 +57,10 @@ import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.UnpooledByteBufAllocator;
-import io.netty.channel.epoll.Epoll;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.kqueue.KQueue;
-import io.netty.channel.kqueue.KQueueEventLoopGroup;
-import io.netty.incubator.channel.uring.IOUring;
-import io.netty.incubator.channel.uring.IOUringEventLoopGroup;
+import io.netty.channel.EventLoopGroup;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.ResourceLeakDetector.Level;
+import io.netty.util.concurrent.EventExecutor;
/**
* Test basic functionality of the Netty based TCP transport.
@@ -862,7 +858,7 @@ public class TcpTransportTest extends
ImperativeClientTestCase {
}
private void doTestEpollSupport(boolean useEpoll) throws Exception {
- assumeTrue(Epoll.isAvailable());
+ assumeTrue(EpollSupport.isAvailable());
try (NettyEchoServer server = createEchoServer()) {
server.start();
@@ -907,7 +903,7 @@ public class TcpTransportTest extends
ImperativeClientTestCase {
}
private void doTestIORingSupport(boolean useIOUring) throws Exception {
- assumeTrue(IOUring.isAvailable());
+ assumeTrue(IOUringSupport.isAvailable());
try (NettyEchoServer server = createEchoServer()) {
server.start();
@@ -941,62 +937,6 @@ public class TcpTransportTest extends
ImperativeClientTestCase {
assertTrue(data.isEmpty());
}
- private void assertEpoll(String message, boolean expected, Transport
transport) throws Exception {
- Field bootstrap = null;
- Class<?> transportType = transport.getClass();
-
- while (transportType != null && bootstrap == null) {
- try {
- bootstrap = transportType.getDeclaredField("bootstrap");
- } catch (NoSuchFieldException error) {
- transportType = transportType.getSuperclass();
- if (Object.class.equals(transportType)) {
- transportType = null;
- }
- }
- }
-
- assertNotNull(bootstrap, "Transport implementation unknown");
-
- bootstrap.setAccessible(true);
-
- Bootstrap transportBootstrap = (Bootstrap) bootstrap.get(transport);
-
- if (expected) {
- assertTrue(transportBootstrap.config().group() instanceof
EpollEventLoopGroup, message);
- } else {
- assertFalse(transportBootstrap.config().group() instanceof
EpollEventLoopGroup, message);
- }
- }
-
- private void assertIOUring(String message, boolean expected, Transport
transport) throws Exception {
- Field bootstrap = null;
- Class<?> transportType = transport.getClass();
-
- while (transportType != null && bootstrap == null) {
- try {
- bootstrap = transportType.getDeclaredField("bootstrap");
- } catch (NoSuchFieldException error) {
- transportType = transportType.getSuperclass();
- if (Object.class.equals(transportType)) {
- transportType = null;
- }
- }
- }
-
- assertNotNull(bootstrap, "Transport implementation unknown");
-
- bootstrap.setAccessible(true);
-
- Bootstrap transportBootstrap = (Bootstrap) bootstrap.get(transport);
-
- if (expected) {
- assertTrue(transportBootstrap.config().group() instanceof
IOUringEventLoopGroup, message);
- } else {
- assertFalse(transportBootstrap.config().group() instanceof
IOUringEventLoopGroup, message);
- }
- }
-
@Test
public void testConnectToServerWithKQueueEnabled() throws Exception {
doTestKQueueSupport(true);
@@ -1008,7 +948,7 @@ public class TcpTransportTest extends
ImperativeClientTestCase {
}
private void doTestKQueueSupport(boolean useKQueue) throws Exception {
- assumeTrue(KQueue.isAvailable());
+ assumeTrue(KQueueSupport.isAvailable());
try (NettyEchoServer server = createEchoServer()) {
server.start();
@@ -1043,14 +983,21 @@ public class TcpTransportTest extends
ImperativeClientTestCase {
@Test
public void testFallbackToNioWhenNativeIOConfiguredNotSupportedEpoll()
throws Exception {
- assumeFalse(Epoll.isAvailable());
+ assumeFalse(EpollSupport.isAvailable());
doTestFallbackToNioWhenNativeLayerNotSupported(EpollSupport.NAME);
}
+ @Test
+ public void testFallbackToNioWhenNativeIOConfiguredNotSupportedIOUring()
throws Exception {
+ assumeFalse(IOUringSupport.isAvailable());
+
+ doTestFallbackToNioWhenNativeLayerNotSupported(IOUringSupport.NAME);
+ }
+
@Test
public void testFallbackToNioWhenNativeIOConfiguredNotSupportedKQueue()
throws Exception {
- assumeFalse(KQueue.isAvailable());
+ assumeFalse(KQueueSupport.isAvailable());
doTestFallbackToNioWhenNativeLayerNotSupported(KQueueSupport.NAME);
}
@@ -1088,13 +1035,47 @@ public class TcpTransportTest extends
ImperativeClientTestCase {
assertTrue(data.isEmpty());
}
+ private void assertEpoll(String message, boolean expected, Transport
transport) throws Exception {
+ final IOSubsystem ioHandler = extractIOSubsysten(transport);
+
+ if (expected) {
+ assertTrue(IOSubsystem.EPOLL.equals(ioHandler));
+ } else {
+ assertFalse(IOSubsystem.EPOLL.equals(ioHandler));
+ }
+ }
+
+ private void assertIOUring(String message, boolean expected, Transport
transport) throws Exception {
+ final IOSubsystem ioHandler = extractIOSubsysten(transport);
+
+ if (expected) {
+ assertTrue(IOSubsystem.IO_URING.equals(ioHandler));
+ } else {
+ assertFalse(IOSubsystem.IO_URING.equals(ioHandler));
+ }
+ }
+
private void assertKQueue(String message, boolean expected, Transport
transport) throws Exception {
- Field group = null;
+ final IOSubsystem ioHandler = extractIOSubsysten(transport);
+
+ if (expected) {
+ assertTrue(IOSubsystem.KQUEUE.equals(ioHandler));
+ } else {
+ assertFalse(IOSubsystem.KQUEUE.equals(ioHandler));
+ }
+ }
+
+ private enum IOSubsystem {
+ NIO, EPOLL, KQUEUE, IO_URING;
+ }
+
+ private IOSubsystem extractIOSubsysten(Transport transport) throws
Exception {
+ Field bootstrap = null;
Class<?> transportType = transport.getClass();
- while (transportType != null && group == null) {
+ while (transportType != null && bootstrap == null) {
try {
- group = transportType.getDeclaredField("group");
+ bootstrap = transportType.getDeclaredField("bootstrap");
} catch (NoSuchFieldException error) {
transportType = transportType.getSuperclass();
if (Object.class.equals(transportType)) {
@@ -1103,13 +1084,65 @@ public class TcpTransportTest extends
ImperativeClientTestCase {
}
}
- assertNotNull(group, "Transport implementation unknown");
+ assertNotNull(bootstrap, "Transport implementation unknown");
+
+ bootstrap.setAccessible(true);
+
+ final Bootstrap transportBootstrap = (Bootstrap)
bootstrap.get(transport);
+ final EventLoopGroup group = transportBootstrap.config().group();
+ final String eventLoopGroupName = group.getClass().getSimpleName();
+
+ // Prior to Netty 4.2 the EventLoopGroup implementation indicates the
IO layer in use
+ if (eventLoopGroupName.startsWith("Nio")) {
+ return IOSubsystem.NIO;
+ } else if (eventLoopGroupName.startsWith("Epoll")) {
+ return IOSubsystem.EPOLL;
+ } else if (eventLoopGroupName.startsWith("IOUring")) {
+ return IOSubsystem.IO_URING;
+ } else if (eventLoopGroupName.startsWith("KQueue")) {
+ return IOSubsystem.KQUEUE;
+ }
+
+ // Post Netty 4.2 we need to find the IOHandler that drives the
EventLoopGroup
+ Field children = null;
+ Class<?> groupType = group.getClass();
- group.setAccessible(true);
- if (expected) {
- assertTrue(group.get(transport) instanceof KQueueEventLoopGroup,
message);
+ while (groupType != null && children == null) {
+ try {
+ children = groupType.getDeclaredField("children");
+ } catch (NoSuchFieldException error) {
+ groupType = groupType.getSuperclass();
+ if (Object.class.equals(groupType)) {
+ groupType = null;
+ }
+ }
+ }
+
+ children.setAccessible(true);
+
+ final EventExecutor[] executors = (EventExecutor[])
children.get(group);
+ final EventExecutor executor = executors[0];
+ final Field ioHandlerField =
executor.getClass().getDeclaredField("ioHandler");
+
+ if (ioHandlerField != null) {
+ ioHandlerField.setAccessible(true);
+
+ final String ioHandlerName = ioHandlerField.get(executor) != null ?
+ ioHandlerField.get(executor) .getClass().getSimpleName() :
"Nio";
+
+ if (ioHandlerName.startsWith("Nio")) {
+ return IOSubsystem.NIO;
+ } else if (ioHandlerName.startsWith("Epoll")) {
+ return IOSubsystem.EPOLL;
+ } else if (ioHandlerName.startsWith("IoUring")) {
+ return IOSubsystem.IO_URING;
+ } else if (ioHandlerName.startsWith("KQueue")) {
+ return IOSubsystem.KQUEUE;
+ } else {
+ return IOSubsystem.NIO;
+ }
} else {
- assertFalse(group.get(transport) instanceof KQueueEventLoopGroup,
message);
+ return IOSubsystem.NIO; // Safe default since we found nothing we
recognize
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]