This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 4ce849f95 [rpc] Prevent EventLoop blocking and implement TCP-level 
backpressure in RequestChannel (#2065)
4ce849f95 is described below

commit 4ce849f950504ccb7941d2816cd8a5413f613950
Author: Yang Wang <[email protected]>
AuthorDate: Tue Dec 2 19:45:33 2025 +0800

    [rpc] Prevent EventLoop blocking and implement TCP-level backpressure in 
RequestChannel (#2065)
---
 .../fluss/rpc/netty/server/NettyServerHandler.java |   9 +
 .../fluss/rpc/netty/server/RequestChannel.java     | 260 +++++++++-
 .../fluss/rpc/protocol/RequestChannelTest.java     | 537 +++++++++++++++++++++
 3 files changed, 793 insertions(+), 13 deletions(-)

diff --git 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/NettyServerHandler.java
 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/NettyServerHandler.java
index 5501ba4c9..3d22c3523 100644
--- 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/NettyServerHandler.java
+++ 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/NettyServerHandler.java
@@ -170,12 +170,21 @@ public final class NettyServerHandler extends 
ChannelInboundHandlerAdapter {
                 authenticator.isCompleted()
                         ? ConnectionState.READY
                         : ConnectionState.AUTHENTICATING);
+
+        // Register this channel with its RequestChannel. The RequestChannel 
will manage this
+        // channel's lifecycle and backpressure state.
+        requestChannel.registerChannel(ctx.channel());
+
         // TODO: connection metrics (count, client tags, receive request avg 
idle time, etc.)
     }
 
     @Override
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
         super.channelInactive(ctx);
+
+        // Unregister this channel from its RequestChannel. The RequestChannel 
will clean up both
+        // the association and any paused state.
+        requestChannel.unregisterChannel(ctx.channel());
     }
 
     @Override
diff --git 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/RequestChannel.java 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/RequestChannel.java
index 94cdec1c6..f855c89e5 100644
--- 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/RequestChannel.java
+++ 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/RequestChannel.java
@@ -17,59 +17,293 @@
 
 package org.apache.fluss.rpc.netty.server;
 
+import org.apache.fluss.shaded.netty4.io.netty.channel.Channel;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.ThreadSafe;
 
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
 
-/** A blocking queue channel that can receive requests and send responses. */
+/**
+ * A queue channel that can receive requests and send responses.
+ *
+ * <p>Uses an unbounded LinkedBlockingQueue to ensure that putRequest() never 
blocks, preventing
+ * EventLoop threads from being blocked. Backpressure is applied at the TCP 
level by pausing channel
+ * reads when the queue size exceeds the backpressure threshold.
+ *
+ * <p>Each RequestChannel instance manages its own associated Netty channels 
(those hashed to this
+ * RequestChannel) and independently controls their backpressure state. This 
design encapsulates all
+ * backpressure logic within the RequestChannel, eliminating the need for 
global state management.
+ */
 @ThreadSafe
 public class RequestChannel {
     private static final Logger LOG = 
LoggerFactory.getLogger(RequestChannel.class);
 
+    /** Unbounded blocking queue to hold incoming requests. Never blocks on 
put. */
     protected final BlockingQueue<RpcRequest> requestQueue;
 
-    public RequestChannel(int queueCapacity) {
-        this.requestQueue = new ArrayBlockingQueue<>(queueCapacity);
+    /**
+     * The threshold at which backpressure should be applied (pausing channel 
reads). When queue
+     * size exceeds this value, channels should be paused to prevent memory 
exhaustion.
+     */
+    private final int backpressureThreshold;
+
+    /**
+     * The threshold at which to resume paused channels. Set to 50% of 
backpressureThreshold to
+     * provide hysteresis and avoid thrashing.
+     */
+    private final int resumeThreshold;
+
+    /**
+     * All Netty channels that are hashed to this RequestChannel. Channels are 
registered when they
+     * become active and unregistered when they become inactive.
+     *
+     * <p>When backpressure is applied, ALL channels in this set are paused 
simultaneously. When
+     * backpressure is released, ALL channels are resumed simultaneously.
+     */
+    private final Set<Channel> associatedChannels = 
ConcurrentHashMap.newKeySet();
+
+    /**
+     * Indicates whether backpressure is currently active. When true, all 
associated channels have
+     * been paused (setAutoRead(false)). When false, all channels are running 
normally.
+     *
+     * <p>Volatile ensures visibility for fast-path reads (outside the lock). 
All modifications are
+     * protected by backpressureLock, so atomicity is guaranteed by the lock, 
not by atomic
+     * operations.
+     */
+    private volatile boolean isBackpressureActive = false;
+
+    /**
+     * Lock to protect backpressure state transitions and task submissions. 
This lock ensures that:
+     * 1. State checks and task submissions are atomic (preventing permanent 
channel blocking) 2.
+     * Pause and resume operations are mutually exclusive 3. New channel 
registration correctly
+     * synchronizes with current backpressure state
+     *
+     * <p>The lock eliminates the need for CAS operations - simple boolean 
checks and assignments
+     * under the lock are sufficient for correctness.
+     */
+    private final ReentrantLock backpressureLock = new ReentrantLock();
+
+    public RequestChannel(int backpressureThreshold) {
+        this.requestQueue = new LinkedBlockingQueue<>();
+        this.backpressureThreshold = backpressureThreshold;
+        this.resumeThreshold = backpressureThreshold / 2;
     }
 
     /**
-     * Send a request to be handled, potentially blocking until there is room 
in the queue for the
-     * request.
+     * Send a request to be handled. Since this uses an unbounded queue, this 
method never blocks,
+     * ensuring EventLoop threads are never blocked by queue operations.
+     *
+     * <p>After adding the request, automatically checks if backpressure 
should be applied. If the
+     * queue size exceeds the backpressure threshold, ALL channels associated 
with this
+     * RequestChannel will be paused to prevent further memory growth.
+     *
+     * <p>OPTIMIZATION: Only check backpressure if not already active (avoid 
redundant checks).
      */
-    public void putRequest(RpcRequest request) throws Exception {
-        requestQueue.put(request);
+    public void putRequest(RpcRequest request) {
+        requestQueue.add(request);
+
+        // CRITICAL OPTIMIZATION: Skip check if already in backpressure state.
+        // This avoids lock contention on every putRequest() call when system 
is under pressure.
+        // The volatile read is very cheap compared to lock acquisition.
+        if (!isBackpressureActive) {
+            pauseAllChannelsIfNeeded();
+        }
     }
 
     /**
      * Sends a shutdown request to the channel. This can allow request 
processor gracefully
      * shutdown.
      */
-    public void putShutdownRequest() throws Exception {
+    public void putShutdownRequest() {
         putRequest(ShutdownRequest.INSTANCE);
     }
 
     /**
-     * Get the next request or block until specified time has elapsed.
+     * Get the next request, waiting up to the specified timeout if the queue 
is empty. After
+     * successfully polling a request, attempts to resume paused channels if 
the queue size has
+     * dropped below the resume threshold.
      *
      * @return the head of this queue, or null if the specified waiting time 
elapses before an
      *     element is available.
      */
     public RpcRequest pollRequest(long timeoutMs) {
         try {
-            return requestQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);
+            RpcRequest request = requestQueue.poll(timeoutMs, 
TimeUnit.MILLISECONDS);
+            if (isBackpressureActive) {
+                tryResumeChannels();
+            }
+            return request;
         } catch (InterruptedException e) {
-            LOG.warn("Interrupted while polling requests from channel queue.", 
e);
+            Thread.currentThread().interrupt();
             return null;
         }
     }
 
     /** Get the number of requests in the queue. */
-    int requestsCount() {
+    public int requestsCount() {
         return requestQueue.size();
     }
+
+    /**
+     * Registers a Netty channel as being associated with this RequestChannel. 
This is called when a
+     * channel becomes active and is hashed to this RequestChannel.
+     *
+     * <p>IMPORTANT: New channels are NOT immediately paused even if 
backpressure is active. This is
+     * critical for system health: 1. Health check connections must not be 
blocked at startup 2. New
+     * connections will naturally be controlled by the next backpressure check 
3. Immediately
+     * pausing new connections can cause deadlock at startup when queue is 
full but processing is
+     * slow
+     *
+     * @param channel the channel to register
+     */
+    public void registerChannel(Channel channel) {
+        associatedChannels.add(channel);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                    "Registered channel {} to RequestChannel (backpressure 
threshold: {}, associated channels: {}, backpressure active: {})",
+                    channel.remoteAddress(),
+                    backpressureThreshold,
+                    associatedChannels.size(),
+                    isBackpressureActive);
+        }
+    }
+
+    /**
+     * Unregisters a Netty channel from this RequestChannel. This is called 
when a channel becomes
+     * inactive.
+     *
+     * @param channel the channel to unregister
+     */
+    public void unregisterChannel(Channel channel) {
+        associatedChannels.remove(channel);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                    "Unregistered channel {} from RequestChannel (associated 
channels: {}, backpressure active: {})",
+                    channel.remoteAddress(),
+                    associatedChannels.size(),
+                    isBackpressureActive);
+        }
+    }
+
+    /**
+     * Check if the queue size has exceeded the backpressure threshold. When 
true, channel reads
+     * should be paused to prevent memory exhaustion.
+     */
+    private boolean shouldApplyBackpressure() {
+        return requestQueue.size() >= backpressureThreshold;
+    }
+
+    /**
+     * Check if the queue size has dropped below the resume threshold. When 
true, paused channels
+     * can be resumed to accept new requests.
+     */
+    private boolean shouldResumeChannels() {
+        return requestQueue.size() <= resumeThreshold;
+    }
+
+    /**
+     * Pauses ALL channels associated with this RequestChannel if the queue 
size exceeds the
+     * backpressure threshold. This ensures that when the queue is full, all 
channels stop sending
+     * requests to prevent memory exhaustion.
+     *
+     * <p>Uses a lock to protect the entire operation (state check + state 
change + task submission)
+     * as an atomic unit. This prevents race conditions with resume operations 
and channel
+     * registrations.
+     *
+     * <p>TODO: In the future, consider pausing only a subset of channels 
instead of all channels to
+     * reduce the impact on upstream traffic. A selective pause strategy could 
minimize disruption
+     * to the overall system while still providing effective backpressure 
control.
+     */
+    private void pauseAllChannelsIfNeeded() {
+        if (!shouldApplyBackpressure()) {
+            return;
+        }
+
+        // Lock protects: state check + state change + task submission as 
atomic operation
+        backpressureLock.lock();
+        try {
+            // Check if already in backpressure state
+            if (isBackpressureActive) {
+                return; // Already paused, nothing to do
+            }
+
+            // Activate backpressure and pause all channels
+            isBackpressureActive = true;
+
+            for (Channel channel : associatedChannels) {
+                if (channel.isActive()) {
+                    // Submit to the channel's EventLoop to ensure thread 
safety
+                    channel.eventLoop()
+                            .execute(
+                                    () -> {
+                                        if (channel.isActive() && 
channel.config().isAutoRead()) {
+                                            
channel.config().setAutoRead(false);
+                                            LOG.warn(
+                                                    "Queue size ({}) exceeded 
backpressure threshold ({}), paused channel: {}",
+                                                    requestsCount(),
+                                                    backpressureThreshold,
+                                                    channel.remoteAddress());
+                                        }
+                                    });
+                }
+            }
+        } finally {
+            backpressureLock.unlock();
+        }
+    }
+
+    /**
+     * Attempts to resume all associated channels if the queue size has 
dropped below the resume
+     * threshold. This method is called automatically after a request is 
dequeued.
+     *
+     * <p>Uses a lock to protect the entire operation (state check + state 
change + task submission)
+     * as an atomic unit. This prevents race conditions with pause operations 
and channel
+     * registrations.
+     */
+    private void tryResumeChannels() {
+        if (!shouldResumeChannels()) {
+            return;
+        }
+
+        // Lock protects: state check + state change + task submission as 
atomic operation
+        backpressureLock.lock();
+        try {
+            // Check if backpressure is not active
+            if (!isBackpressureActive) {
+                return; // Already resumed, nothing to do
+            }
+
+            // Deactivate backpressure and resume all channels
+            isBackpressureActive = false;
+
+            for (Channel channel : associatedChannels) {
+                if (channel.isActive()) {
+                    // Submit resume task to the channel's EventLoop to ensure 
thread safety
+                    channel.eventLoop()
+                            .execute(
+                                    () -> {
+                                        if (channel.isActive() && 
!channel.config().isAutoRead()) {
+                                            channel.config().setAutoRead(true);
+                                            LOG.info(
+                                                    "Queue size ({}) dropped 
below resume threshold ({}), resumed channel: {}",
+                                                    requestsCount(),
+                                                    resumeThreshold,
+                                                    channel.remoteAddress());
+                                        }
+                                    });
+                }
+            }
+        } finally {
+            backpressureLock.unlock();
+        }
+    }
 }
diff --git 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/protocol/RequestChannelTest.java 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/protocol/RequestChannelTest.java
index 7509df095..d71b01665 100644
--- 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/protocol/RequestChannelTest.java
+++ 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/protocol/RequestChannelTest.java
@@ -22,14 +22,38 @@ import org.apache.fluss.rpc.messages.GetTableInfoRequest;
 import org.apache.fluss.rpc.netty.server.FlussRequest;
 import org.apache.fluss.rpc.netty.server.RequestChannel;
 import org.apache.fluss.rpc.netty.server.RpcRequest;
+import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBufAllocator;
 import org.apache.fluss.shaded.netty4.io.netty.buffer.EmptyByteBuf;
 import org.apache.fluss.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator;
+import org.apache.fluss.shaded.netty4.io.netty.channel.Channel;
+import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelConfig;
+import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelId;
+import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelMetadata;
+import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelOption;
+import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelPipeline;
+import 
org.apache.fluss.shaded.netty4.io.netty.channel.ChannelProgressivePromise;
+import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelPromise;
+import org.apache.fluss.shaded.netty4.io.netty.channel.EventLoop;
+import org.apache.fluss.shaded.netty4.io.netty.channel.EventLoopGroup;
+import org.apache.fluss.shaded.netty4.io.netty.channel.MessageSizeEstimator;
+import org.apache.fluss.shaded.netty4.io.netty.channel.RecvByteBufAllocator;
+import org.apache.fluss.shaded.netty4.io.netty.channel.WriteBufferWaterMark;
+import org.apache.fluss.shaded.netty4.io.netty.util.Attribute;
+import org.apache.fluss.shaded.netty4.io.netty.util.AttributeKey;
+import 
org.apache.fluss.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory;
+import 
org.apache.fluss.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor;
 
 import org.junit.jupiter.api.Test;
 
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -100,4 +124,517 @@ public class RequestChannelTest {
         rpcRequest = channel.pollRequest(100);
         assertThat(rpcRequest).isEqualTo(rpcRequest2);
     }
+
+    /**
+     * Test that backpressure is activated when queue size exceeds threshold 
and deactivated when
+     * queue size drops below resume threshold. This verifies the complete 
backpressure lifecycle.
+     */
+    @Test
+    void testBackpressureActivationAndDeactivation() throws Exception {
+        int backpressureThreshold = 100;
+        int resumeThreshold = 50; // 50% of backpressureThreshold
+        RequestChannel channel = new RequestChannel(backpressureThreshold);
+
+        // Create a test channel that can track autoRead state changes
+        TestChannel testChannel = new TestChannel();
+        channel.registerChannel(testChannel);
+
+        // Initially, channel should have autoRead enabled
+        assertThat(testChannel.isAutoRead()).isTrue();
+
+        // Step 1: Fill queue to trigger backpressure
+        // Add requests until we exceed the backpressure threshold
+        for (int i = 0; i < backpressureThreshold; i++) {
+            channel.putRequest(createTestRequest(i));
+        }
+
+        // Wait for backpressure to be applied (channel operations are async 
via eventLoop)
+        // The backpressure should be triggered when we add the threshold-th 
request
+        testChannel.waitForAutoReadChange(false, 2, TimeUnit.SECONDS);
+
+        // Verify backpressure is active: autoRead should be false
+        assertThat(testChannel.isAutoRead()).isFalse();
+        
assertThat(channel.requestsCount()).isGreaterThanOrEqualTo(backpressureThreshold);
+
+        // Step 2: Consume requests until queue size drops below resume 
threshold
+        // We need to consume enough requests to go from >=100 to <=50
+        // So we need to consume at least 51 requests (100 - 50 + 1 = 51)
+        int requestsToConsume = backpressureThreshold - resumeThreshold + 1;
+        for (int i = 0; i < requestsToConsume; i++) {
+            RpcRequest request = channel.pollRequest(10);
+            assertThat(request).isNotNull();
+        }
+
+        // Wait for backpressure to be released
+        testChannel.waitForAutoReadChange(true, 2, TimeUnit.SECONDS);
+
+        // Verify backpressure is released: autoRead should be true again
+        assertThat(testChannel.isAutoRead()).isTrue();
+        
assertThat(channel.requestsCount()).isLessThanOrEqualTo(resumeThreshold);
+
+        // Clean up
+        channel.unregisterChannel(testChannel);
+    }
+
+    /** Helper method to create a test RpcRequest with a unique identifier. */
+    private RpcRequest createTestRequest(int id) {
+        return new FlussRequest(
+                ApiKeys.GET_TABLE_INFO.id,
+                (short) 0,
+                id,
+                null,
+                new GetTableInfoRequest(),
+                new EmptyByteBuf(new UnpooledByteBufAllocator(true, true)),
+                "FLUSS",
+                true,
+                null,
+                null,
+                new CompletableFuture<>());
+    }
+
+    /**
+     * A test Channel implementation that tracks autoRead state changes for 
backpressure testing.
+     */
+    private static class TestChannel implements Channel {
+        private final AtomicBoolean autoRead = new AtomicBoolean(true);
+        private final TestChannelConfig config = new TestChannelConfig(this);
+        private final TestEventLoop eventLoop = new TestEventLoop();
+        private final ChannelId channelId = new TestChannelId();
+        private final SocketAddress remoteAddress = new 
InetSocketAddress("localhost", 8080);
+
+        @Override
+        public ChannelConfig config() {
+            return config;
+        }
+
+        @Override
+        public EventLoop eventLoop() {
+            return eventLoop;
+        }
+
+        @Override
+        public ChannelId id() {
+            return channelId;
+        }
+
+        @Override
+        public SocketAddress remoteAddress() {
+            return remoteAddress;
+        }
+
+        @Override
+        public boolean isActive() {
+            return true;
+        }
+
+        boolean isAutoRead() {
+            return autoRead.get();
+        }
+
+        void waitForAutoReadChange(boolean expectedValue, long timeout, 
TimeUnit unit)
+                throws InterruptedException {
+            long deadline = System.nanoTime() + unit.toNanos(timeout);
+            while (autoRead.get() != expectedValue && System.nanoTime() < 
deadline) {
+                Thread.sleep(10);
+            }
+            assertThat(autoRead.get())
+                    .as("AutoRead should be " + expectedValue + " within 
timeout")
+                    .isEqualTo(expectedValue);
+        }
+
+        // Minimal implementation of other required methods
+        @Override
+        public ChannelPipeline pipeline() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBufAllocator alloc() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Channel read() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Channel flush() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ChannelFuture write(Object msg) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ChannelFuture write(Object msg, ChannelPromise promise) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ChannelFuture writeAndFlush(Object msg) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) 
{
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ChannelPromise newPromise() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ChannelFuture newSucceededFuture() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ChannelFuture newFailedFuture(Throwable cause) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ChannelPromise voidPromise() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean isWritable() {
+            return true;
+        }
+
+        @Override
+        public long bytesBeforeUnwritable() {
+            return Long.MAX_VALUE;
+        }
+
+        @Override
+        public long bytesBeforeWritable() {
+            return 0;
+        }
+
+        @Override
+        public Unsafe unsafe() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ChannelFuture closeFuture() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean isOpen() {
+            return true;
+        }
+
+        @Override
+        public boolean isRegistered() {
+            return true;
+        }
+
+        @Override
+        public SocketAddress localAddress() {
+            return new InetSocketAddress("localhost", 0);
+        }
+
+        @Override
+        public ChannelFuture bind(SocketAddress localAddress) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ChannelFuture bind(SocketAddress localAddress, ChannelPromise 
promise) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ChannelFuture connect(SocketAddress remoteAddress) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ChannelFuture connect(SocketAddress remoteAddress, 
ChannelPromise promise) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ChannelFuture connect(SocketAddress remoteAddress, 
SocketAddress localAddress) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ChannelFuture connect(
+                SocketAddress remoteAddress, SocketAddress localAddress, 
ChannelPromise promise) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ChannelFuture disconnect() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ChannelFuture disconnect(ChannelPromise promise) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ChannelFuture close() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ChannelFuture close(ChannelPromise promise) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ChannelFuture deregister() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ChannelFuture deregister(ChannelPromise promise) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Channel parent() {
+            return null;
+        }
+
+        @Override
+        public ChannelProgressivePromise newProgressivePromise() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ChannelMetadata metadata() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <T> Attribute<T> attr(AttributeKey<T> key) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <T> boolean hasAttr(AttributeKey<T> key) {
+            return false;
+        }
+
+        @Override
+        public int compareTo(Channel o) {
+            return id().asLongText().compareTo(o.id().asLongText());
+        }
+
+        /** Test ChannelConfig that tracks autoRead state. */
+        private static class TestChannelConfig implements ChannelConfig {
+            private final TestChannel channel;
+
+            TestChannelConfig(TestChannel channel) {
+                this.channel = channel;
+            }
+
+            @Override
+            public boolean isAutoRead() {
+                return channel.autoRead.get();
+            }
+
+            @Override
+            public ChannelConfig setAutoRead(boolean autoRead) {
+                channel.autoRead.set(autoRead);
+                return this;
+            }
+
+            @Override
+            public boolean isAutoClose() {
+                return false;
+            }
+
+            @Override
+            public ChannelConfig setAutoClose(boolean autoClose) {
+                return this;
+            }
+
+            // Minimal implementation of other required methods
+            @Override
+            public <T> T getOption(ChannelOption<T> option) {
+                return null;
+            }
+
+            @Override
+            public <T> boolean setOption(ChannelOption<T> option, T value) {
+                return false;
+            }
+
+            @Override
+            public Map<ChannelOption<?>, Object> getOptions() {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public boolean setOptions(Map<ChannelOption<?>, ?> options) {
+                return false;
+            }
+
+            @Override
+            public int getConnectTimeoutMillis() {
+                return 0;
+            }
+
+            @Override
+            public ChannelConfig setConnectTimeoutMillis(int 
connectTimeoutMillis) {
+                return this;
+            }
+
+            @Override
+            public int getMaxMessagesPerRead() {
+                return 1;
+            }
+
+            @Override
+            public ChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) 
{
+                return this;
+            }
+
+            @Override
+            public int getWriteSpinCount() {
+                return 16;
+            }
+
+            @Override
+            public ChannelConfig setWriteSpinCount(int writeSpinCount) {
+                return this;
+            }
+
+            @Override
+            public ByteBufAllocator getAllocator() {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public ChannelConfig setAllocator(ByteBufAllocator allocator) {
+                return this;
+            }
+
+            @Override
+            public RecvByteBufAllocator getRecvByteBufAllocator() {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator 
allocator) {
+                return this;
+            }
+
+            @Override
+            public int getWriteBufferHighWaterMark() {
+                return 0;
+            }
+
+            @Override
+            public ChannelConfig setWriteBufferHighWaterMark(int 
writeBufferHighWaterMark) {
+                return this;
+            }
+
+            @Override
+            public int getWriteBufferLowWaterMark() {
+                return 0;
+            }
+
+            @Override
+            public ChannelConfig setWriteBufferLowWaterMark(int 
writeBufferLowWaterMark) {
+                return this;
+            }
+
+            @Override
+            public WriteBufferWaterMark getWriteBufferWaterMark() {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public ChannelConfig setWriteBufferWaterMark(
+                    WriteBufferWaterMark writeBufferWaterMark) {
+                return this;
+            }
+
+            @Override
+            public MessageSizeEstimator getMessageSizeEstimator() {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator 
estimator) {
+                return this;
+            }
+        }
+
+        /** Test EventLoop that executes tasks immediately in the current 
thread for testing. */
+        private static class TestEventLoop extends SingleThreadEventExecutor 
implements EventLoop {
+            TestEventLoop() {
+                super(null, new DefaultThreadFactory("test"), false);
+            }
+
+            @Override
+            protected void run() {
+                // No-op, tasks are executed synchronously
+            }
+
+            @Override
+            public void execute(Runnable task) {
+                // Execute immediately in current thread for testing
+                task.run();
+            }
+
+            @Override
+            public EventLoopGroup parent() {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public EventLoop next() {
+                return this;
+            }
+
+            @Override
+            public ChannelFuture register(Channel channel) {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public ChannelFuture register(Channel channel, ChannelPromise 
promise) {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public ChannelPromise register(ChannelPromise promise) {
+                throw new UnsupportedOperationException();
+            }
+        }
+
+        /** Simple ChannelId implementation for testing. */
+        private static class TestChannelId implements ChannelId {
+            @Override
+            public String asShortText() {
+                return "test-channel";
+            }
+
+            @Override
+            public String asLongText() {
+                return "test-channel-long";
+            }
+
+            @Override
+            public int compareTo(ChannelId o) {
+                return asLongText().compareTo(o.asLongText());
+            }
+        }
+    }
 }


Reply via email to