This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 4ce849f95 [rpc] Prevent EventLoop blocking and implement TCP-level
backpressure in RequestChannel (#2065)
4ce849f95 is described below
commit 4ce849f950504ccb7941d2816cd8a5413f613950
Author: Yang Wang <[email protected]>
AuthorDate: Tue Dec 2 19:45:33 2025 +0800
[rpc] Prevent EventLoop blocking and implement TCP-level backpressure in
RequestChannel (#2065)
---
.../fluss/rpc/netty/server/NettyServerHandler.java | 9 +
.../fluss/rpc/netty/server/RequestChannel.java | 260 +++++++++-
.../fluss/rpc/protocol/RequestChannelTest.java | 537 +++++++++++++++++++++
3 files changed, 793 insertions(+), 13 deletions(-)
diff --git
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/NettyServerHandler.java
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/NettyServerHandler.java
index 5501ba4c9..3d22c3523 100644
---
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/NettyServerHandler.java
+++
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/NettyServerHandler.java
@@ -170,12 +170,21 @@ public final class NettyServerHandler extends
ChannelInboundHandlerAdapter {
authenticator.isCompleted()
? ConnectionState.READY
: ConnectionState.AUTHENTICATING);
+
+ // Register this channel with its RequestChannel. The RequestChannel
will manage this
+ // channel's lifecycle and backpressure state.
+ requestChannel.registerChannel(ctx.channel());
+
// TODO: connection metrics (count, client tags, receive request avg
idle time, etc.)
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
+
+ // Unregister this channel from its RequestChannel. The RequestChannel
will clean up both
+ // the association and any paused state.
+ requestChannel.unregisterChannel(ctx.channel());
}
@Override
diff --git
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/RequestChannel.java
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/RequestChannel.java
index 94cdec1c6..f855c89e5 100644
---
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/RequestChannel.java
+++
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/RequestChannel.java
@@ -17,59 +17,293 @@
package org.apache.fluss.rpc.netty.server;
+import org.apache.fluss.shaded.netty4.io.netty.channel.Channel;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.ThreadSafe;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
-/** A blocking queue channel that can receive requests and send responses. */
+/**
+ * A queue channel that can receive requests and send responses.
+ *
+ * <p>Uses an unbounded LinkedBlockingQueue to ensure that putRequest() never
blocks, preventing
+ * EventLoop threads from being blocked. Backpressure is applied at the TCP
level by pausing channel
+ * reads when the queue size exceeds the backpressure threshold.
+ *
+ * <p>Each RequestChannel instance manages its own associated Netty channels
(those hashed to this
+ * RequestChannel) and independently controls their backpressure state. This
design encapsulates all
+ * backpressure logic within the RequestChannel, eliminating the need for
global state management.
+ */
@ThreadSafe
public class RequestChannel {
private static final Logger LOG =
LoggerFactory.getLogger(RequestChannel.class);
+ /** Unbounded blocking queue to hold incoming requests. Never blocks on
put. */
protected final BlockingQueue<RpcRequest> requestQueue;
- public RequestChannel(int queueCapacity) {
- this.requestQueue = new ArrayBlockingQueue<>(queueCapacity);
+ /**
+ * The threshold at which backpressure should be applied (pausing channel
reads). When queue
+ * size exceeds this value, channels should be paused to prevent memory
exhaustion.
+ */
+ private final int backpressureThreshold;
+
+ /**
+ * The threshold at which to resume paused channels. Set to 50% of
backpressureThreshold to
+ * provide hysteresis and avoid thrashing.
+ */
+ private final int resumeThreshold;
+
+ /**
+ * All Netty channels that are hashed to this RequestChannel. Channels are
registered when they
+ * become active and unregistered when they become inactive.
+ *
+ * <p>When backpressure is applied, ALL channels in this set are paused
simultaneously. When
+ * backpressure is released, ALL channels are resumed simultaneously.
+ */
+ private final Set<Channel> associatedChannels =
ConcurrentHashMap.newKeySet();
+
+ /**
+ * Indicates whether backpressure is currently active. When true, all
associated channels have
+ * been paused (setAutoRead(false)). When false, all channels are running
normally.
+ *
+ * <p>Volatile ensures visibility for fast-path reads (outside the lock).
All modifications are
+ * protected by backpressureLock, so atomicity is guaranteed by the lock,
not by atomic
+ * operations.
+ */
+ private volatile boolean isBackpressureActive = false;
+
+ /**
+ * Lock to protect backpressure state transitions and task submissions.
This lock ensures that:
+ * 1. State checks and task submissions are atomic (preventing permanent
channel blocking) 2.
+ * Pause and resume operations are mutually exclusive 3. New channel
registration correctly
+ * synchronizes with current backpressure state
+ *
+ * <p>The lock eliminates the need for CAS operations - simple boolean
checks and assignments
+ * under the lock are sufficient for correctness.
+ */
+ private final ReentrantLock backpressureLock = new ReentrantLock();
+
+ public RequestChannel(int backpressureThreshold) {
+ this.requestQueue = new LinkedBlockingQueue<>();
+ this.backpressureThreshold = backpressureThreshold;
+ this.resumeThreshold = backpressureThreshold / 2;
}
/**
- * Send a request to be handled, potentially blocking until there is room
in the queue for the
- * request.
+ * Send a request to be handled. Since this uses an unbounded queue, this
method never blocks,
+ * ensuring EventLoop threads are never blocked by queue operations.
+ *
+ * <p>After adding the request, automatically checks if backpressure
should be applied. If the
+ * queue size exceeds the backpressure threshold, ALL channels associated
with this
+ * RequestChannel will be paused to prevent further memory growth.
+ *
+ * <p>OPTIMIZATION: Only check backpressure if not already active (avoid
redundant checks).
*/
- public void putRequest(RpcRequest request) throws Exception {
- requestQueue.put(request);
+ public void putRequest(RpcRequest request) {
+ requestQueue.add(request);
+
+ // CRITICAL OPTIMIZATION: Skip check if already in backpressure state.
+ // This avoids lock contention on every putRequest() call when system
is under pressure.
+ // The volatile read is very cheap compared to lock acquisition.
+ if (!isBackpressureActive) {
+ pauseAllChannelsIfNeeded();
+ }
}
/**
* Sends a shutdown request to the channel. This can allow request
processor gracefully
* shutdown.
*/
- public void putShutdownRequest() throws Exception {
+ public void putShutdownRequest() {
putRequest(ShutdownRequest.INSTANCE);
}
/**
- * Get the next request or block until specified time has elapsed.
+ * Get the next request, waiting up to the specified timeout if the queue
is empty. After
+ * successfully polling a request, attempts to resume paused channels if
the queue size has
+ * dropped below the resume threshold.
*
* @return the head of this queue, or null if the specified waiting time
elapses before an
* element is available.
*/
public RpcRequest pollRequest(long timeoutMs) {
try {
- return requestQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);
+ RpcRequest request = requestQueue.poll(timeoutMs,
TimeUnit.MILLISECONDS);
+ if (isBackpressureActive) {
+ tryResumeChannels();
+ }
+ return request;
} catch (InterruptedException e) {
- LOG.warn("Interrupted while polling requests from channel queue.",
e);
+ Thread.currentThread().interrupt();
return null;
}
}
/** Get the number of requests in the queue. */
- int requestsCount() {
+ public int requestsCount() {
return requestQueue.size();
}
+
+ /**
+ * Registers a Netty channel as being associated with this RequestChannel.
This is called when a
+ * channel becomes active and is hashed to this RequestChannel.
+ *
+ * <p>IMPORTANT: New channels are NOT immediately paused even if
backpressure is active. This is
+ * critical for system health: 1. Health check connections must not be
blocked at startup 2. New
+ * connections will naturally be controlled by the next backpressure check
3. Immediately
+ * pausing new connections can cause deadlock at startup when queue is
full but processing is
+ * slow
+ *
+ * @param channel the channel to register
+ */
+ public void registerChannel(Channel channel) {
+ associatedChannels.add(channel);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Registered channel {} to RequestChannel (backpressure
threshold: {}, associated channels: {}, backpressure active: {})",
+ channel.remoteAddress(),
+ backpressureThreshold,
+ associatedChannels.size(),
+ isBackpressureActive);
+ }
+ }
+
+ /**
+ * Unregisters a Netty channel from this RequestChannel. This is called
when a channel becomes
+ * inactive.
+ *
+ * @param channel the channel to unregister
+ */
+ public void unregisterChannel(Channel channel) {
+ associatedChannels.remove(channel);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Unregistered channel {} from RequestChannel (associated
channels: {}, backpressure active: {})",
+ channel.remoteAddress(),
+ associatedChannels.size(),
+ isBackpressureActive);
+ }
+ }
+
+ /**
+ * Check if the queue size has exceeded the backpressure threshold. When
true, channel reads
+ * should be paused to prevent memory exhaustion.
+ */
+ private boolean shouldApplyBackpressure() {
+ return requestQueue.size() >= backpressureThreshold;
+ }
+
+ /**
+ * Check if the queue size has dropped below the resume threshold. When
true, paused channels
+ * can be resumed to accept new requests.
+ */
+ private boolean shouldResumeChannels() {
+ return requestQueue.size() <= resumeThreshold;
+ }
+
+ /**
+ * Pauses ALL channels associated with this RequestChannel if the queue
size exceeds the
+ * backpressure threshold. This ensures that when the queue is full, all
channels stop sending
+ * requests to prevent memory exhaustion.
+ *
+ * <p>Uses a lock to protect the entire operation (state check + state
change + task submission)
+ * as an atomic unit. This prevents race conditions with resume operations
and channel
+ * registrations.
+ *
+ * <p>TODO: In the future, consider pausing only a subset of channels
instead of all channels to
+ * reduce the impact on upstream traffic. A selective pause strategy could
minimize disruption
+ * to the overall system while still providing effective backpressure
control.
+ */
+ private void pauseAllChannelsIfNeeded() {
+ if (!shouldApplyBackpressure()) {
+ return;
+ }
+
+ // Lock protects: state check + state change + task submission as
atomic operation
+ backpressureLock.lock();
+ try {
+ // Check if already in backpressure state
+ if (isBackpressureActive) {
+ return; // Already paused, nothing to do
+ }
+
+ // Activate backpressure and pause all channels
+ isBackpressureActive = true;
+
+ for (Channel channel : associatedChannels) {
+ if (channel.isActive()) {
+ // Submit to the channel's EventLoop to ensure thread
safety
+ channel.eventLoop()
+ .execute(
+ () -> {
+ if (channel.isActive() &&
channel.config().isAutoRead()) {
+
channel.config().setAutoRead(false);
+ LOG.warn(
+ "Queue size ({}) exceeded
backpressure threshold ({}), paused channel: {}",
+ requestsCount(),
+ backpressureThreshold,
+ channel.remoteAddress());
+ }
+ });
+ }
+ }
+ } finally {
+ backpressureLock.unlock();
+ }
+ }
+
+ /**
+ * Attempts to resume all associated channels if the queue size has
dropped below the resume
+ * threshold. This method is called automatically after a request is
dequeued.
+ *
+ * <p>Uses a lock to protect the entire operation (state check + state
change + task submission)
+ * as an atomic unit. This prevents race conditions with pause operations
and channel
+ * registrations.
+ */
+ private void tryResumeChannels() {
+ if (!shouldResumeChannels()) {
+ return;
+ }
+
+ // Lock protects: state check + state change + task submission as
atomic operation
+ backpressureLock.lock();
+ try {
+ // Check if backpressure is not active
+ if (!isBackpressureActive) {
+ return; // Already resumed, nothing to do
+ }
+
+ // Deactivate backpressure and resume all channels
+ isBackpressureActive = false;
+
+ for (Channel channel : associatedChannels) {
+ if (channel.isActive()) {
+ // Submit resume task to the channel's EventLoop to ensure
thread safety
+ channel.eventLoop()
+ .execute(
+ () -> {
+ if (channel.isActive() &&
!channel.config().isAutoRead()) {
+ channel.config().setAutoRead(true);
+ LOG.info(
+ "Queue size ({}) dropped
below resume threshold ({}), resumed channel: {}",
+ requestsCount(),
+ resumeThreshold,
+ channel.remoteAddress());
+ }
+ });
+ }
+ }
+ } finally {
+ backpressureLock.unlock();
+ }
+ }
}
diff --git
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/protocol/RequestChannelTest.java
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/protocol/RequestChannelTest.java
index 7509df095..d71b01665 100644
---
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/protocol/RequestChannelTest.java
+++
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/protocol/RequestChannelTest.java
@@ -22,14 +22,38 @@ import org.apache.fluss.rpc.messages.GetTableInfoRequest;
import org.apache.fluss.rpc.netty.server.FlussRequest;
import org.apache.fluss.rpc.netty.server.RequestChannel;
import org.apache.fluss.rpc.netty.server.RpcRequest;
+import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBufAllocator;
import org.apache.fluss.shaded.netty4.io.netty.buffer.EmptyByteBuf;
import org.apache.fluss.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator;
+import org.apache.fluss.shaded.netty4.io.netty.channel.Channel;
+import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelConfig;
+import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelId;
+import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelMetadata;
+import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelOption;
+import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelPipeline;
+import
org.apache.fluss.shaded.netty4.io.netty.channel.ChannelProgressivePromise;
+import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelPromise;
+import org.apache.fluss.shaded.netty4.io.netty.channel.EventLoop;
+import org.apache.fluss.shaded.netty4.io.netty.channel.EventLoopGroup;
+import org.apache.fluss.shaded.netty4.io.netty.channel.MessageSizeEstimator;
+import org.apache.fluss.shaded.netty4.io.netty.channel.RecvByteBufAllocator;
+import org.apache.fluss.shaded.netty4.io.netty.channel.WriteBufferWaterMark;
+import org.apache.fluss.shaded.netty4.io.netty.util.Attribute;
+import org.apache.fluss.shaded.netty4.io.netty.util.AttributeKey;
+import
org.apache.fluss.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory;
+import
org.apache.fluss.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor;
import org.junit.jupiter.api.Test;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.assertj.core.api.Assertions.assertThat;
@@ -100,4 +124,517 @@ public class RequestChannelTest {
rpcRequest = channel.pollRequest(100);
assertThat(rpcRequest).isEqualTo(rpcRequest2);
}
+
+ /**
+ * Test that backpressure is activated when queue size exceeds threshold
and deactivated when
+ * queue size drops below resume threshold. This verifies the complete
backpressure lifecycle.
+ */
+ @Test
+ void testBackpressureActivationAndDeactivation() throws Exception {
+ int backpressureThreshold = 100;
+ int resumeThreshold = 50; // 50% of backpressureThreshold
+ RequestChannel channel = new RequestChannel(backpressureThreshold);
+
+ // Create a test channel that can track autoRead state changes
+ TestChannel testChannel = new TestChannel();
+ channel.registerChannel(testChannel);
+
+ // Initially, channel should have autoRead enabled
+ assertThat(testChannel.isAutoRead()).isTrue();
+
+ // Step 1: Fill queue to trigger backpressure
+ // Add requests until we exceed the backpressure threshold
+ for (int i = 0; i < backpressureThreshold; i++) {
+ channel.putRequest(createTestRequest(i));
+ }
+
+ // Wait for backpressure to be applied (channel operations are async
via eventLoop)
+ // The backpressure should be triggered when we add the threshold-th
request
+ testChannel.waitForAutoReadChange(false, 2, TimeUnit.SECONDS);
+
+ // Verify backpressure is active: autoRead should be false
+ assertThat(testChannel.isAutoRead()).isFalse();
+
assertThat(channel.requestsCount()).isGreaterThanOrEqualTo(backpressureThreshold);
+
+ // Step 2: Consume requests until queue size drops below resume
threshold
+ // We need to consume enough requests to go from >=100 to <=50
+ // So we need to consume at least 51 requests (100 - 50 + 1 = 51)
+ int requestsToConsume = backpressureThreshold - resumeThreshold + 1;
+ for (int i = 0; i < requestsToConsume; i++) {
+ RpcRequest request = channel.pollRequest(10);
+ assertThat(request).isNotNull();
+ }
+
+ // Wait for backpressure to be released
+ testChannel.waitForAutoReadChange(true, 2, TimeUnit.SECONDS);
+
+ // Verify backpressure is released: autoRead should be true again
+ assertThat(testChannel.isAutoRead()).isTrue();
+
assertThat(channel.requestsCount()).isLessThanOrEqualTo(resumeThreshold);
+
+ // Clean up
+ channel.unregisterChannel(testChannel);
+ }
+
+ /** Helper method to create a test RpcRequest with a unique identifier. */
+ private RpcRequest createTestRequest(int id) {
+ return new FlussRequest(
+ ApiKeys.GET_TABLE_INFO.id,
+ (short) 0,
+ id,
+ null,
+ new GetTableInfoRequest(),
+ new EmptyByteBuf(new UnpooledByteBufAllocator(true, true)),
+ "FLUSS",
+ true,
+ null,
+ null,
+ new CompletableFuture<>());
+ }
+
+ /**
+ * A test Channel implementation that tracks autoRead state changes for
backpressure testing.
+ */
+ private static class TestChannel implements Channel {
+ private final AtomicBoolean autoRead = new AtomicBoolean(true);
+ private final TestChannelConfig config = new TestChannelConfig(this);
+ private final TestEventLoop eventLoop = new TestEventLoop();
+ private final ChannelId channelId = new TestChannelId();
+ private final SocketAddress remoteAddress = new
InetSocketAddress("localhost", 8080);
+
+ @Override
+ public ChannelConfig config() {
+ return config;
+ }
+
+ @Override
+ public EventLoop eventLoop() {
+ return eventLoop;
+ }
+
+ @Override
+ public ChannelId id() {
+ return channelId;
+ }
+
+ @Override
+ public SocketAddress remoteAddress() {
+ return remoteAddress;
+ }
+
+ @Override
+ public boolean isActive() {
+ return true;
+ }
+
+ boolean isAutoRead() {
+ return autoRead.get();
+ }
+
+ void waitForAutoReadChange(boolean expectedValue, long timeout,
TimeUnit unit)
+ throws InterruptedException {
+ long deadline = System.nanoTime() + unit.toNanos(timeout);
+ while (autoRead.get() != expectedValue && System.nanoTime() <
deadline) {
+ Thread.sleep(10);
+ }
+ assertThat(autoRead.get())
+ .as("AutoRead should be " + expectedValue + " within
timeout")
+ .isEqualTo(expectedValue);
+ }
+
+ // Minimal implementation of other required methods
+ @Override
+ public ChannelPipeline pipeline() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ByteBufAllocator alloc() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Channel read() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Channel flush() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ChannelFuture write(Object msg) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ChannelFuture write(Object msg, ChannelPromise promise) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ChannelFuture writeAndFlush(Object msg) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise)
{
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ChannelPromise newPromise() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ChannelFuture newSucceededFuture() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ChannelFuture newFailedFuture(Throwable cause) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ChannelPromise voidPromise() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isWritable() {
+ return true;
+ }
+
+ @Override
+ public long bytesBeforeUnwritable() {
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ public long bytesBeforeWritable() {
+ return 0;
+ }
+
+ @Override
+ public Unsafe unsafe() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ChannelFuture closeFuture() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return true;
+ }
+
+ @Override
+ public boolean isRegistered() {
+ return true;
+ }
+
+ @Override
+ public SocketAddress localAddress() {
+ return new InetSocketAddress("localhost", 0);
+ }
+
+ @Override
+ public ChannelFuture bind(SocketAddress localAddress) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ChannelFuture bind(SocketAddress localAddress, ChannelPromise
promise) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress,
ChannelPromise promise) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress,
SocketAddress localAddress) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ChannelFuture connect(
+ SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ChannelFuture disconnect() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ChannelFuture disconnect(ChannelPromise promise) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ChannelFuture close() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ChannelFuture close(ChannelPromise promise) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ChannelFuture deregister() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ChannelFuture deregister(ChannelPromise promise) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Channel parent() {
+ return null;
+ }
+
+ @Override
+ public ChannelProgressivePromise newProgressivePromise() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ChannelMetadata metadata() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> Attribute<T> attr(AttributeKey<T> key) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> boolean hasAttr(AttributeKey<T> key) {
+ return false;
+ }
+
+ @Override
+ public int compareTo(Channel o) {
+ return id().asLongText().compareTo(o.id().asLongText());
+ }
+
+ /** Test ChannelConfig that tracks autoRead state. */
+ private static class TestChannelConfig implements ChannelConfig {
+ private final TestChannel channel;
+
+ TestChannelConfig(TestChannel channel) {
+ this.channel = channel;
+ }
+
+ @Override
+ public boolean isAutoRead() {
+ return channel.autoRead.get();
+ }
+
+ @Override
+ public ChannelConfig setAutoRead(boolean autoRead) {
+ channel.autoRead.set(autoRead);
+ return this;
+ }
+
+ @Override
+ public boolean isAutoClose() {
+ return false;
+ }
+
+ @Override
+ public ChannelConfig setAutoClose(boolean autoClose) {
+ return this;
+ }
+
+ // Minimal implementation of other required methods
+ @Override
+ public <T> T getOption(ChannelOption<T> option) {
+ return null;
+ }
+
+ @Override
+ public <T> boolean setOption(ChannelOption<T> option, T value) {
+ return false;
+ }
+
+ @Override
+ public Map<ChannelOption<?>, Object> getOptions() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean setOptions(Map<ChannelOption<?>, ?> options) {
+ return false;
+ }
+
+ @Override
+ public int getConnectTimeoutMillis() {
+ return 0;
+ }
+
+ @Override
+ public ChannelConfig setConnectTimeoutMillis(int
connectTimeoutMillis) {
+ return this;
+ }
+
+ @Override
+ public int getMaxMessagesPerRead() {
+ return 1;
+ }
+
+ @Override
+ public ChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead)
{
+ return this;
+ }
+
+ @Override
+ public int getWriteSpinCount() {
+ return 16;
+ }
+
+ @Override
+ public ChannelConfig setWriteSpinCount(int writeSpinCount) {
+ return this;
+ }
+
+ @Override
+ public ByteBufAllocator getAllocator() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ChannelConfig setAllocator(ByteBufAllocator allocator) {
+ return this;
+ }
+
+ @Override
+ public RecvByteBufAllocator getRecvByteBufAllocator() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator
allocator) {
+ return this;
+ }
+
+ @Override
+ public int getWriteBufferHighWaterMark() {
+ return 0;
+ }
+
+ @Override
+ public ChannelConfig setWriteBufferHighWaterMark(int
writeBufferHighWaterMark) {
+ return this;
+ }
+
+ @Override
+ public int getWriteBufferLowWaterMark() {
+ return 0;
+ }
+
+ @Override
+ public ChannelConfig setWriteBufferLowWaterMark(int
writeBufferLowWaterMark) {
+ return this;
+ }
+
+ @Override
+ public WriteBufferWaterMark getWriteBufferWaterMark() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ChannelConfig setWriteBufferWaterMark(
+ WriteBufferWaterMark writeBufferWaterMark) {
+ return this;
+ }
+
+ @Override
+ public MessageSizeEstimator getMessageSizeEstimator() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator
estimator) {
+ return this;
+ }
+ }
+
+ /** Test EventLoop that executes tasks immediately in the current
thread for testing. */
+ private static class TestEventLoop extends SingleThreadEventExecutor
implements EventLoop {
+ TestEventLoop() {
+ super(null, new DefaultThreadFactory("test"), false);
+ }
+
+ @Override
+ protected void run() {
+ // No-op, tasks are executed synchronously
+ }
+
+ @Override
+ public void execute(Runnable task) {
+ // Execute immediately in current thread for testing
+ task.run();
+ }
+
+ @Override
+ public EventLoopGroup parent() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public EventLoop next() {
+ return this;
+ }
+
+ @Override
+ public ChannelFuture register(Channel channel) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ChannelFuture register(Channel channel, ChannelPromise
promise) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ChannelPromise register(ChannelPromise promise) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ /** Simple ChannelId implementation for testing. */
+ private static class TestChannelId implements ChannelId {
+ @Override
+ public String asShortText() {
+ return "test-channel";
+ }
+
+ @Override
+ public String asLongText() {
+ return "test-channel-long";
+ }
+
+ @Override
+ public int compareTo(ChannelId o) {
+ return asLongText().compareTo(o.asLongText());
+ }
+ }
+ }
}