This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 4310ea8bda issue 10558: optimize performance for triple protocol(3.2) 
(#10587)
4310ea8bda is described below

commit 4310ea8bda25d935f0c476ba9c9471a99c336891
Author: 一个不知名的Java靓仔 <[email protected]>
AuthorDate: Sat Oct 22 15:37:51 2022 +0800

    issue 10558: optimize performance for triple protocol(3.2) (#10587)
---
 .../apache/dubbo/common/BatchExecutorQueue.java    |   2 +-
 .../rpc/protocol/tri/TripleHttp2Protocol.java      |  11 ++-
 .../dubbo/rpc/protocol/tri/TripleInvoker.java      |   5 +-
 .../rpc/protocol/tri/call/TripleClientCall.java    |   7 +-
 .../tri/command/EndStreamQueueCommand.java         |   2 +-
 .../rpc/protocol/tri/command/QueuedCommand.java    |  12 ++-
 .../protocol/tri/stream/TripleClientStream.java    | 105 +++++++++++++++------
 .../protocol/tri/stream/TripleServerStream.java    |  83 +++++++++++-----
 .../transport/TripleHttp2FrameServerHandler.java   |  25 ++---
 .../protocol/tri/transport/TripleWriteQueue.java   |  54 +++++++++++
 .../rpc/protocol/tri/transport/WriteQueue.java     |  52 +++-------
 .../tri/stream/TripleClientStreamTest.java         |  12 ++-
 .../rpc/protocol/tri/transport/WriteQueueTest.java |  20 ++--
 13 files changed, 256 insertions(+), 134 deletions(-)

diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/common/BatchExecutorQueue.java 
b/dubbo-common/src/main/java/org/apache/dubbo/common/BatchExecutorQueue.java
index 10b5dfae85..4718f21feb 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/BatchExecutorQueue.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/BatchExecutorQueue.java
@@ -74,7 +74,7 @@ public class BatchExecutorQueue<T> {
                     i++;
                 }
             }
-            if (i != 0 || !flushedOnce) {
+            if ((i != 0 || !flushedOnce) && item != null) {
                 flush(item);
             }
         } finally {
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index 6fba53857c..198dcf8abb 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -37,9 +37,9 @@ import 
org.apache.dubbo.rpc.protocol.tri.transport.TripleHttp2FrameServerHandler
 import 
org.apache.dubbo.rpc.protocol.tri.transport.TripleServerConnectionHandler;
 import org.apache.dubbo.rpc.protocol.tri.transport.TripleTailHandler;
 
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.http2.Http2StreamChannel;
 import io.netty.handler.codec.http2.Http2FrameCodec;
 import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
 import io.netty.handler.codec.http2.Http2FrameLogger;
@@ -48,6 +48,8 @@ import io.netty.handler.codec.http2.Http2Settings;
 import io.netty.handler.logging.LogLevel;
 import io.netty.handler.ssl.SslContext;
 
+import org.apache.dubbo.rpc.protocol.tri.transport.TripleWriteQueue;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -125,14 +127,15 @@ public class TripleHttp2Protocol extends 
AbstractWireProtocol implements ScopeMo
                     DEFAULT_MAX_HEADER_LIST_SIZE)))
             .frameLogger(SERVER_LOGGER)
             .build();
+        TripleWriteQueue writeQueue = new TripleWriteQueue();
         final Http2MultiplexHandler handler = new Http2MultiplexHandler(
-            new ChannelInitializer<Channel>() {
+            new ChannelInitializer<Http2StreamChannel>() {
                 @Override
-                protected void initChannel(Channel ch) {
+                protected void initChannel(Http2StreamChannel ch) {
                     final ChannelPipeline p = ch.pipeline();
                     p.addLast(new TripleCommandOutBoundHandler());
                     p.addLast(new 
TripleHttp2FrameServerHandler(frameworkModel, lookupExecutor(url),
-                        headFilters));
+                        headFilters, ch, writeQueue));
                 }
             });
         List<ChannelHandler> handlers = new ArrayList<>();
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
index 16b4373cc9..89f9012150 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
@@ -47,6 +47,7 @@ import 
org.apache.dubbo.rpc.protocol.tri.call.TripleClientCall;
 import org.apache.dubbo.rpc.protocol.tri.call.UnaryClientCallListener;
 import org.apache.dubbo.rpc.protocol.tri.compressor.Compressor;
 import org.apache.dubbo.rpc.protocol.tri.observer.ClientCallToObserverAdapter;
+import org.apache.dubbo.rpc.protocol.tri.transport.TripleWriteQueue;
 import org.apache.dubbo.rpc.support.RpcUtils;
 
 import io.netty.util.AsciiString;
@@ -80,6 +81,7 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
     private final Set<Invoker<?>> invokers;
     private final ExecutorService streamExecutor;
     private final String acceptEncodings;
+    private final TripleWriteQueue writeQueue = new TripleWriteQueue();
 
     public TripleInvoker(Class<T> serviceType,
         URL url,
@@ -117,7 +119,7 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
             invocation.getMethodName(),
             invocation.getParameterTypes());
         ClientCall call = new TripleClientCall(connection, streamExecutor,
-            getUrl().getOrDefaultFrameworkModel());
+            getUrl().getOrDefaultFrameworkModel(), writeQueue);
 
         AsyncRpcResult result;
         try {
@@ -311,5 +313,4 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
         }
         return timeout;
     }
-
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
index b13af798cd..90db0098c2 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
@@ -38,6 +38,7 @@ import com.google.protobuf.Any;
 import com.google.rpc.DebugInfo;
 import com.google.rpc.ErrorInfo;
 import com.google.rpc.Status;
+import org.apache.dubbo.rpc.protocol.tri.transport.TripleWriteQueue;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -51,6 +52,7 @@ public class TripleClientCall implements ClientCall, 
ClientStream.Listener {
     private final Connection connection;
     private final Executor executor;
     private final FrameworkModel frameworkModel;
+    private final TripleWriteQueue writeQueue;
     private RequestMetadata requestMetadata;
     private ClientStream stream;
     private ClientCall.Listener listener;
@@ -60,10 +62,11 @@ public class TripleClientCall implements ClientCall, 
ClientStream.Listener {
     private boolean done;
 
     public TripleClientCall(Connection connection, Executor executor,
-        FrameworkModel frameworkModel) {
+                            FrameworkModel frameworkModel, TripleWriteQueue 
writeQueue) {
         this.connection = connection;
         this.executor = executor;
         this.frameworkModel = frameworkModel;
+        this.writeQueue= writeQueue;
     }
 
 
@@ -270,7 +273,7 @@ public class TripleClientCall implements ClientCall, 
ClientStream.Listener {
         this.requestMetadata = metadata;
         this.listener = responseListener;
         this.stream = new TripleClientStream(frameworkModel, executor, 
connection.getChannel(),
-            this);
+            this, writeQueue);
         return new ClientCallToObserverAdapter<>(this);
     }
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/EndStreamQueueCommand.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/EndStreamQueueCommand.java
index 77713cf1bf..9dacafd98f 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/EndStreamQueueCommand.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/EndStreamQueueCommand.java
@@ -29,6 +29,6 @@ public class EndStreamQueueCommand extends QueuedCommand {
 
     @Override
     public void doSend(ChannelHandlerContext ctx, ChannelPromise promise) {
-        ctx.write(new DefaultHttp2DataFrame(true));
+        ctx.write(new DefaultHttp2DataFrame(true), promise);
     }
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/QueuedCommand.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/QueuedCommand.java
index 9fa0b76c12..c16ac22958 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/QueuedCommand.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/QueuedCommand.java
@@ -23,6 +23,8 @@ import io.netty.channel.ChannelPromise;
 
 public abstract class QueuedCommand {
 
+    protected Channel channel;
+
     private ChannelPromise promise;
 
     public ChannelPromise promise() {
@@ -48,10 +50,18 @@ public abstract class QueuedCommand {
     public final void send(ChannelHandlerContext ctx, ChannelPromise promise) {
         if (ctx.channel().isActive()) {
             doSend(ctx, promise);
-            ctx.flush();
         }
     }
 
+    public QueuedCommand channel(Channel channel) {
+        this.channel = channel;
+        return this;
+    }
+
+    public Channel channel() {
+        return channel;
+    }
+
     public abstract void doSend(ChannelHandlerContext ctx, ChannelPromise 
promise);
 }
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
index a3d19abb56..b694c28bc3 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
@@ -17,6 +17,17 @@
 
 package org.apache.dubbo.rpc.protocol.tri.stream;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.http2.Http2Error;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.codec.http2.Http2StreamChannel;
+import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.concurrent.Future;
 import org.apache.dubbo.rpc.TriRpcStatus;
 import org.apache.dubbo.rpc.model.FrameworkModel;
 import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
@@ -32,18 +43,10 @@ import 
org.apache.dubbo.rpc.protocol.tri.transport.AbstractH2TransportListener;
 import org.apache.dubbo.rpc.protocol.tri.transport.H2TransportListener;
 import 
org.apache.dubbo.rpc.protocol.tri.transport.TripleCommandOutBoundHandler;
 import 
org.apache.dubbo.rpc.protocol.tri.transport.TripleHttp2ClientResponseHandler;
+import org.apache.dubbo.rpc.protocol.tri.transport.TripleWriteQueue;
 import org.apache.dubbo.rpc.protocol.tri.transport.WriteQueue;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.handler.codec.http2.Http2Error;
-import io.netty.handler.codec.http2.Http2Headers;
-import io.netty.handler.codec.http2.Http2StreamChannel;
-import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
-import io.netty.util.ReferenceCountUtil;
-import io.netty.util.concurrent.Future;
-
+import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
@@ -58,56 +61,66 @@ import java.util.concurrent.Executor;
 public class TripleClientStream extends AbstractStream implements ClientStream 
{
 
     public final ClientStream.Listener listener;
-    private final WriteQueue writeQueue;
+    private final TripleWriteQueue writeQueue;
     private Deframer deframer;
     private final Channel parent;
+    private final Http2StreamChannel http2StreamChannel;
+    private boolean rst;
 
     // for test
     TripleClientStream(FrameworkModel frameworkModel,
         Executor executor,
-        WriteQueue writeQueue,
-        ClientStream.Listener listener) {
+        TripleWriteQueue writeQueue,
+        ClientStream.Listener listener,
+        Http2StreamChannel http2StreamChannel) {
         super(executor, frameworkModel);
         this.parent = null;
         this.listener = listener;
         this.writeQueue = writeQueue;
+        this.http2StreamChannel = http2StreamChannel;
     }
 
     public TripleClientStream(FrameworkModel frameworkModel,
         Executor executor,
         Channel parent,
-        ClientStream.Listener listener) {
+        ClientStream.Listener listener,
+        TripleWriteQueue writeQueue) {
         super(executor, frameworkModel);
         this.parent = parent;
         this.listener = listener;
-        this.writeQueue = createWriteQueue(parent);
+        this.writeQueue = writeQueue;
+        this.http2StreamChannel = initHttp2StreamChannel(parent);
     }
 
-    private WriteQueue createWriteQueue(Channel parent) {
-        final Http2StreamChannelBootstrap bootstrap = new 
Http2StreamChannelBootstrap(parent);
-        final Future<Http2StreamChannel> future = 
bootstrap.open().syncUninterruptibly();
+    private Http2StreamChannel initHttp2StreamChannel(Channel parent) {
+        Http2StreamChannelBootstrap bootstrap = new 
Http2StreamChannelBootstrap(parent);
+        Future<Http2StreamChannel> future = bootstrap.handler(new 
ChannelInboundHandlerAdapter() {
+                @Override
+                public void handlerAdded(ChannelHandlerContext ctx) throws 
Exception {
+                    Channel channel = ctx.channel();
+                    channel.pipeline().addLast(new 
TripleCommandOutBoundHandler());
+                    channel.pipeline().addLast(new 
TripleHttp2ClientResponseHandler(createTransportListener()));
+                    channel.closeFuture().addListener(f -> 
transportException(f.cause()));
+                }
+            }).open().syncUninterruptibly();
         if (!future.isSuccess()) {
             throw new IllegalStateException("Create remote stream failed. 
channel:" + parent);
         }
-        final Http2StreamChannel channel = future.getNow();
-        channel.pipeline()
-            .addLast(new TripleCommandOutBoundHandler())
-            .addLast(new 
TripleHttp2ClientResponseHandler(createTransportListener()));
-        channel.closeFuture()
-            .addListener(f -> transportException(f.cause()));
-        return new WriteQueue(channel);
+        return future.getNow();
     }
 
-    public void close() {
-        writeQueue.close();
-    }
 
     public ChannelFuture sendHeader(Http2Headers headers) {
         if (this.writeQueue == null) {
             // already processed at createStream()
             return parent.newFailedFuture(new IllegalStateException("Stream 
already closed"));
         }
+        ChannelFuture checkResult = preCheck();
+        if (!checkResult.isSuccess()) {
+            return checkResult;
+        }
         final HeaderQueueCommand headerCmd = 
HeaderQueueCommand.createHeaders(headers);
+        headerCmd.channel(http2StreamChannel);
         return writeQueue.enqueue(headerCmd).addListener(future -> {
             if (!future.isSuccess()) {
                 transportException(future.cause());
@@ -122,7 +135,13 @@ public class TripleClientStream extends AbstractStream 
implements ClientStream {
     }
 
     public ChannelFuture cancelByLocal(TriRpcStatus status) {
+        ChannelFuture checkResult = preCheck();
+        if (!checkResult.isSuccess()) {
+            return checkResult;
+        }
         final CancelQueueCommand cmd = 
CancelQueueCommand.createCommand(Http2Error.CANCEL);
+        cmd.channel(http2StreamChannel);
+        TripleClientStream.this.rst = true;
         return this.writeQueue.enqueue(cmd, true);
     }
 
@@ -134,8 +153,13 @@ public class TripleClientStream extends AbstractStream 
implements ClientStream {
 
     @Override
     public ChannelFuture sendMessage(byte[] message, int compressFlag, boolean 
eos) {
+        ChannelFuture checkResult = preCheck();
+        if (!checkResult.isSuccess()) {
+            return checkResult;
+        }
         final DataQueueCommand cmd = 
DataQueueCommand.createGrpcCommand(message, false,
             compressFlag);
+        cmd.channel(http2StreamChannel);
         return this.writeQueue.enqueue(cmd)
             .addListener(future -> {
                     if (!future.isSuccess()) {
@@ -156,10 +180,25 @@ public class TripleClientStream extends AbstractStream 
implements ClientStream {
 
     @Override
     public ChannelFuture halfClose() {
+        ChannelFuture checkResult = preCheck();
+        if (!checkResult.isSuccess()) {
+            return checkResult;
+        }
         final EndStreamQueueCommand cmd = EndStreamQueueCommand.create();
+        cmd.channel(http2StreamChannel);
         return this.writeQueue.enqueue(cmd);
     }
 
+    private ChannelFuture preCheck() {
+        if (!http2StreamChannel.isActive()) {
+            return http2StreamChannel.newFailedFuture(new IOException("stream 
channel is closed"));
+        }
+        if (rst) {
+            return http2StreamChannel.newFailedFuture(new IOException("stream 
channel has reset"));
+        }
+        return http2StreamChannel.newSucceededFuture();
+    }
+
     /**
      * @return transport listener
      */
@@ -177,7 +216,8 @@ public class TripleClientStream extends AbstractStream 
implements ClientStream {
         private Http2Headers trailers;
 
         void handleH2TransportError(TriRpcStatus status) {
-            
writeQueue.enqueue(CancelQueueCommand.createCommand(Http2Error.NO_ERROR), true);
+            
writeQueue.enqueue(CancelQueueCommand.createCommand(Http2Error.NO_ERROR).channel(http2StreamChannel));
+            TripleClientStream.this.rst = true;
             finishProcess(status, null);
         }
 
@@ -310,8 +350,11 @@ public class TripleClientStream extends AbstractStream 
implements ClientStream {
             executor.execute(() -> {
                 if (endStream) {
                     if (!halfClosed) {
-                        
writeQueue.enqueue(CancelQueueCommand.createCommand(Http2Error.CANCEL),
-                            true);
+                        if (http2StreamChannel.isActive() && !rst) {
+                            
writeQueue.enqueue(CancelQueueCommand.createCommand(Http2Error.CANCEL).channel(http2StreamChannel),
+                                true);
+                            rst = true;
+                        }
                     }
                     onTrailersReceived(headers);
                 } else {
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
index a9aa76c2aa..6b99325f3b 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
@@ -17,6 +17,7 @@
 
 package org.apache.dubbo.rpc.protocol.tri.stream;
 
+import io.netty.handler.codec.http2.Http2StreamChannel;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
@@ -41,13 +42,12 @@ import org.apache.dubbo.rpc.protocol.tri.frame.Deframer;
 import org.apache.dubbo.rpc.protocol.tri.frame.TriDecoder;
 import org.apache.dubbo.rpc.protocol.tri.transport.AbstractH2TransportListener;
 import org.apache.dubbo.rpc.protocol.tri.transport.H2TransportListener;
-import org.apache.dubbo.rpc.protocol.tri.transport.WriteQueue;
+import org.apache.dubbo.rpc.protocol.tri.transport.TripleWriteQueue;
 
 import com.google.protobuf.Any;
 import com.google.rpc.DebugInfo;
 import com.google.rpc.Status;
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.handler.codec.http.HttpHeaderNames;
 import io.netty.handler.codec.http.HttpMethod;
@@ -58,6 +58,7 @@ import io.netty.handler.codec.http2.Http2Error;
 import io.netty.handler.codec.http2.Http2Headers;
 import io.netty.util.concurrent.Future;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.List;
@@ -71,7 +72,7 @@ public class TripleServerStream extends AbstractStream 
implements ServerStream {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(TripleServerStream.class);
     public final ServerTransportObserver transportObserver = new 
ServerTransportObserver();
-    private final WriteQueue writeQueue;
+    private final TripleWriteQueue writeQueue;
     private final PathResolver pathResolver;
     private final List<HeaderFilter> filters;
     private final String acceptEncoding;
@@ -80,22 +81,24 @@ public class TripleServerStream extends AbstractStream 
implements ServerStream {
     private volatile boolean reset;
     private ServerStream.Listener listener;
     private final InetSocketAddress remoteAddress;
-    private final Channel channel;
     private Deframer deframer;
+    private boolean rst = false;
+    private final Http2StreamChannel http2StreamChannel;
 
-    public TripleServerStream(Channel channel,
+    public TripleServerStream(Http2StreamChannel channel,
                               FrameworkModel frameworkModel,
                               Executor executor,
                               PathResolver pathResolver,
                               String acceptEncoding,
-                              List<HeaderFilter> filters) {
+                              List<HeaderFilter> filters,
+                              TripleWriteQueue writeQueue) {
         super(executor, frameworkModel);
-        this.channel = channel;
         this.pathResolver = pathResolver;
         this.acceptEncoding = acceptEncoding;
         this.filters = filters;
-        this.writeQueue = new WriteQueue(channel);
+        this.writeQueue = writeQueue;
         this.remoteAddress = (InetSocketAddress) channel.remoteAddress();
+        this.http2StreamChannel = channel;
     }
 
     @Override
@@ -109,24 +112,31 @@ public class TripleServerStream extends AbstractStream 
implements ServerStream {
     }
 
     public ChannelFuture reset(Http2Error cause) {
-        return writeQueue.enqueue(CancelQueueCommand.createCommand(cause), 
true);
+        ChannelFuture checkResult = preCheck();
+        if (!checkResult.isSuccess()) {
+            return checkResult;
+        }
+        this.rst = true;
+        return 
writeQueue.enqueue(CancelQueueCommand.createCommand(cause).channel(http2StreamChannel));
     }
 
     @Override
     public ChannelFuture sendHeader(Http2Headers headers) {
         if (reset) {
-            return writeQueue.failure(
-                new IllegalStateException("Stream already reset, no more 
headers allowed"));
+            return http2StreamChannel.newFailedFuture(new 
IllegalStateException("Stream already reset, no more headers allowed"));
         }
         if (headerSent) {
-            return writeQueue.failure(new IllegalStateException("Header 
already sent"));
+            return http2StreamChannel.newFailedFuture(new 
IllegalStateException("Header already sent"));
         }
         if (trailersSent) {
-            return writeQueue.failure(new IllegalStateException("Trailers 
already sent"));
+            return http2StreamChannel.newFailedFuture(new 
IllegalStateException("Trailers already sent"));
+        }
+        ChannelFuture checkResult = preCheck();
+        if (!checkResult.isSuccess()) {
+            return checkResult;
         }
         headerSent = true;
-
-        return writeQueue.enqueue(HeaderQueueCommand.createHeaders(headers, 
false))
+        return writeQueue.enqueue(HeaderQueueCommand.createHeaders(headers, 
false).channel(http2StreamChannel))
             .addListener(f -> {
                 if (!f.isSuccess()) {
                     reset(Http2Error.INTERNAL_ERROR);
@@ -137,7 +147,7 @@ public class TripleServerStream extends AbstractStream 
implements ServerStream {
     @Override
     public Future<?> cancelByLocal(TriRpcStatus status) {
         if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug(String.format("Cancel stream:%s by local: %s", 
channel, status));
+            LOGGER.debug(String.format("Cancel stream:%s by local: %s", 
http2StreamChannel, status));
         }
         return reset(Http2Error.CANCEL);
     }
@@ -151,15 +161,18 @@ public class TripleServerStream extends AbstractStream 
implements ServerStream {
 
     private ChannelFuture sendTrailers(Http2Headers trailers) {
         if (reset) {
-            return writeQueue.failure(
-                new IllegalStateException("Stream already reset, no more 
trailers allowed"));
+            return http2StreamChannel.newFailedFuture(new 
IllegalStateException("Stream already reset, no more trailers allowed"));
         }
         if (trailersSent) {
-            return writeQueue.failure(new IllegalStateException("Trailers 
already sent"));
+            return http2StreamChannel.newFailedFuture(new 
IllegalStateException("Trailers already sent"));
+        }
+        ChannelFuture checkResult = preCheck();
+        if (!checkResult.isSuccess()) {
+            return checkResult;
         }
         headerSent = true;
         trailersSent = true;
-        return writeQueue.enqueue(HeaderQueueCommand.createHeaders(trailers, 
true))
+        return writeQueue.enqueue(HeaderQueueCommand.createHeaders(trailers, 
true).channel(http2StreamChannel))
             .addListener(f -> {
                 if (!f.isSuccess()) {
                     reset(Http2Error.INTERNAL_ERROR);
@@ -213,18 +226,22 @@ public class TripleServerStream extends AbstractStream 
implements ServerStream {
     @Override
     public ChannelFuture sendMessage(byte[] message, int compressFlag) {
         if (reset) {
-            return writeQueue.failure(
+            return http2StreamChannel.newFailedFuture(
                 new IllegalStateException("Stream already reset, no more body 
allowed"));
         }
         if (!headerSent) {
-            return writeQueue.failure(
+            return http2StreamChannel.newFailedFuture(
                 new IllegalStateException("Headers did not sent before send 
body"));
         }
         if (trailersSent) {
-            return writeQueue.failure(
+            return http2StreamChannel.newFailedFuture(
                 new IllegalStateException("Trailers already sent, no more body 
allowed"));
         }
-        return writeQueue.enqueue(DataQueueCommand.createGrpcCommand(message, 
false, compressFlag));
+        ChannelFuture checkResult = preCheck();
+        if (!checkResult.isSuccess()) {
+            return checkResult;
+        }
+        return writeQueue.enqueue(DataQueueCommand.createGrpcCommand(message, 
false, compressFlag).channel(http2StreamChannel));
     }
 
     /**
@@ -234,12 +251,16 @@ public class TripleServerStream extends AbstractStream 
implements ServerStream {
      * @param status status of error
      */
     private void responsePlainTextError(int code, TriRpcStatus status) {
+        ChannelFuture checkResult = preCheck();
+        if (!checkResult.isSuccess()) {
+            return;
+        }
         Http2Headers headers = new 
DefaultHttp2Headers(true).status(String.valueOf(code))
             .setInt(TripleHeaderEnum.STATUS_KEY.getHeader(), status.code.code)
             .set(TripleHeaderEnum.MESSAGE_KEY.getHeader(), status.description)
             .set(TripleHeaderEnum.CONTENT_TYPE_KEY.getHeader(), 
TripleConstant.TEXT_PLAIN_UTF8);
-        writeQueue.enqueue(HeaderQueueCommand.createHeaders(headers, false));
-        
writeQueue.enqueue(TextDataQueueCommand.createCommand(status.description, 
true));
+        writeQueue.enqueue(HeaderQueueCommand.createHeaders( headers, 
false).channel(http2StreamChannel));
+        
writeQueue.enqueue(TextDataQueueCommand.createCommand(status.description, 
true).channel(http2StreamChannel));
     }
 
     /**
@@ -275,6 +296,16 @@ public class TripleServerStream extends AbstractStream 
implements ServerStream {
         return invoker;
     }
 
+    private ChannelFuture preCheck() {
+        if (!http2StreamChannel.isActive()) {
+            return http2StreamChannel.newFailedFuture(new IOException("stream 
channel is closed"));
+        }
+        if (rst) {
+            return http2StreamChannel.newFailedFuture(new IOException("stream 
channel has reset"));
+        }
+        return http2StreamChannel.newSucceededFuture();
+    }
+
     public class ServerTransportObserver extends AbstractH2TransportListener 
implements
         H2TransportListener {
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2FrameServerHandler.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2FrameServerHandler.java
index 048d24644f..4e6c992828 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2FrameServerHandler.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2FrameServerHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.dubbo.rpc.protocol.tri.transport;
 
+import io.netty.handler.codec.http2.Http2StreamChannel;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.rpc.HeaderFilter;
@@ -31,7 +32,6 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.http2.Http2DataFrame;
 import io.netty.handler.codec.http2.Http2HeadersFrame;
 import io.netty.handler.codec.http2.Http2ResetFrame;
-import io.netty.util.AttributeKey;
 import io.netty.util.ReferenceCountUtil;
 import io.netty.util.ReferenceCounted;
 
@@ -40,9 +40,6 @@ import java.util.concurrent.Executor;
 
 public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
 
-    private static final AttributeKey<TripleServerStream> SERVER_STREAM_KEY = 
AttributeKey.valueOf(
-        "tri_server_stream");
-
 
     private static final Logger LOGGER = LoggerFactory.getLogger(
         TripleHttp2FrameServerHandler.class);
@@ -51,11 +48,14 @@ public class TripleHttp2FrameServerHandler extends 
ChannelDuplexHandler {
     private final Executor executor;
     private final List<HeaderFilter> filters;
     private final String acceptEncoding;
+    private final TripleServerStream tripleServerStream;
 
     public TripleHttp2FrameServerHandler(
         FrameworkModel frameworkModel,
         Executor executor,
-        List<HeaderFilter> filters) {
+        List<HeaderFilter> filters,
+        Http2StreamChannel channel,
+        TripleWriteQueue writeQueue) {
         this.frameworkModel = frameworkModel;
         this.executor = executor;
         this.filters = filters;
@@ -63,6 +63,7 @@ public class TripleHttp2FrameServerHandler extends 
ChannelDuplexHandler {
             
frameworkModel.getExtensionLoader(DeCompressor.class).getSupportedExtensions());
         this.pathResolver = 
frameworkModel.getExtensionLoader(PathResolver.class)
             .getDefaultExtension();
+        tripleServerStream = new TripleServerStream(channel, frameworkModel, 
executor, pathResolver, acceptEncoding, filters, writeQueue);
     }
 
     @Override
@@ -87,8 +88,6 @@ public class TripleHttp2FrameServerHandler extends 
ChannelDuplexHandler {
     }
 
     public void onResetRead(ChannelHandlerContext ctx, Http2ResetFrame frame) {
-        final TripleServerStream tripleServerStream = 
ctx.channel().attr(SERVER_STREAM_KEY)
-            .get();
         LOGGER.warn("Triple Server received remote reset errorCode=" + 
frame.errorCode());
         if (tripleServerStream != null) {
             
tripleServerStream.transportObserver.cancelByRemote(frame.errorCode());
@@ -102,24 +101,14 @@ public class TripleHttp2FrameServerHandler extends 
ChannelDuplexHandler {
         }
         TriRpcStatus status = TriRpcStatus.getStatus(cause,
             "Provider's error:\n" + cause.getMessage());
-        final TripleServerStream tripleServerStream = 
ctx.channel().attr(SERVER_STREAM_KEY)
-            .get();
-        if (tripleServerStream != null) {
-            tripleServerStream.cancelByLocal(status);
-        }
+        tripleServerStream.cancelByLocal(status);
     }
 
     public void onDataRead(ChannelHandlerContext ctx, Http2DataFrame msg) 
throws Exception {
-        final TripleServerStream tripleServerStream = 
ctx.channel().attr(SERVER_STREAM_KEY)
-            .get();
         tripleServerStream.transportObserver.onData(msg.content(), 
msg.isEndStream());
     }
 
     public void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame 
msg) throws Exception {
-        TripleServerStream tripleServerStream = new 
TripleServerStream(ctx.channel(),
-            frameworkModel, executor,
-            pathResolver, acceptEncoding, filters);
-        ctx.channel().attr(SERVER_STREAM_KEY).set(tripleServerStream);
         tripleServerStream.transportObserver.onHeader(msg.headers(), 
msg.isEndStream());
     }
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleWriteQueue.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleWriteQueue.java
new file mode 100644
index 0000000000..62e586471d
--- /dev/null
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleWriteQueue.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.transport;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelPromise;
+import org.apache.dubbo.common.BatchExecutorQueue;
+import org.apache.dubbo.rpc.protocol.tri.command.QueuedCommand;
+
+public class TripleWriteQueue extends BatchExecutorQueue<QueuedCommand> {
+
+    public ChannelFuture enqueue(QueuedCommand command, boolean rst) {
+        return enqueue(command);
+    }
+
+    public ChannelFuture enqueue(QueuedCommand command) {
+        ChannelPromise promise = command.promise();
+        if (promise == null) {
+            Channel ch = command.channel();
+            promise = ch.newPromise();
+            command.promise(promise);
+        }
+        super.enqueue(command, command.channel().eventLoop());
+        return promise;
+    }
+
+    @Override
+    protected void prepare(QueuedCommand item) {
+        item.run(item.channel());
+    }
+
+    @Override
+    protected void flush(QueuedCommand item) {
+        Channel channel = item.channel();
+        item.run(channel);
+        channel.flush();
+    }
+}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/WriteQueue.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/WriteQueue.java
index 2d70f462d7..b0125681b8 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/WriteQueue.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/WriteQueue.java
@@ -23,89 +23,67 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelPromise;
 
-import java.io.IOException;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+@Deprecated
 public class WriteQueue {
 
     static final int DEQUE_CHUNK_SIZE = 128;
-    private final Channel channel;
     private final Queue<QueuedCommand> queue;
     private final AtomicBoolean scheduled;
-    private volatile boolean rst;
 
-    public WriteQueue(Channel channel) {
-        this.channel = channel;
+    public WriteQueue() {
         queue = new ConcurrentLinkedQueue<>();
         scheduled = new AtomicBoolean(false);
     }
 
-    public ChannelFuture success() {
-        return channel.newSucceededFuture();
-    }
-
-    public ChannelFuture failure(Throwable cause) {
-        return channel.newFailedFuture(cause);
-    }
-
     public ChannelFuture enqueue(QueuedCommand command, boolean rst) {
-        ChannelFuture future = enqueue(command);
-        if (rst) {
-            this.rst = true;
-        }
-        return future;
+        return enqueue(command);
     }
 
     public ChannelFuture enqueue(QueuedCommand command) {
-        if (!channel.isActive()) {
-            return channel.newFailedFuture(new IOException("channel is 
closed"));
-        }
-        if (rst) {
-            return channel.newFailedFuture(new IOException("channel has 
reset"));
-        }
         ChannelPromise promise = command.promise();
         if (promise == null) {
-            promise = channel.newPromise();
+            Channel ch = command.channel();
+            promise = ch.newPromise();
             command.promise(promise);
         }
         queue.add(command);
-        scheduleFlush();
+        scheduleFlush(command.channel());
         return promise;
     }
 
-    public void scheduleFlush() {
+    public void scheduleFlush(Channel ch) {
         if (scheduled.compareAndSet(false, true)) {
-            channel.eventLoop().execute(this::flush);
+            ch.parent().eventLoop().execute(this::flush);
         }
     }
 
-    public void close() {
-        channel.close();
-    }
-
     private void flush() {
+        Channel ch = null;
         try {
             QueuedCommand cmd;
             int i = 0;
             boolean flushedOnce = false;
             while ((cmd = queue.poll()) != null) {
-                cmd.run(channel);
+                ch = cmd.channel();
+                cmd.run(ch);
                 i++;
                 if (i == DEQUE_CHUNK_SIZE) {
                     i = 0;
-                    channel.flush();
+                    ch.parent().flush();
                     flushedOnce = true;
                 }
             }
-            if (i != 0 || !flushedOnce) {
-                channel.flush();
+            if (ch != null && (i != 0 || !flushedOnce)) {
+                ch.parent().flush();
             }
         } finally {
             scheduled.set(false);
             if (!queue.isEmpty()) {
-                scheduleFlush();
+                scheduleFlush(ch);
             }
         }
     }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStreamTest.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStreamTest.java
index 4951f85f9f..c89f6c3998 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStreamTest.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStreamTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.dubbo.rpc.protocol.tri.stream;
 
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.handler.codec.http2.Http2StreamChannel;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.rpc.TriRpcStatus;
 import org.apache.dubbo.rpc.model.ApplicationModel;
@@ -34,7 +36,7 @@ import 
org.apache.dubbo.rpc.protocol.tri.command.QueuedCommand;
 import org.apache.dubbo.rpc.protocol.tri.compressor.Compressor;
 import org.apache.dubbo.rpc.protocol.tri.support.IGreeter;
 import org.apache.dubbo.rpc.protocol.tri.transport.H2TransportListener;
-import org.apache.dubbo.rpc.protocol.tri.transport.WriteQueue;
+import org.apache.dubbo.rpc.protocol.tri.transport.TripleWriteQueue;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
@@ -66,11 +68,15 @@ class TripleClientStreamTest {
             new Class<?>[]{String.class});
 
         MockClientStreamListener listener = new MockClientStreamListener();
-        WriteQueue writeQueue = mock(WriteQueue.class);
+        TripleWriteQueue writeQueue = mock(TripleWriteQueue.class);
         final EmbeddedChannel channel = new EmbeddedChannel();
         when(writeQueue.enqueue(any())).thenReturn(channel.newPromise());
+        Http2StreamChannel http2StreamChannel = mock(Http2StreamChannel.class);
+        when(http2StreamChannel.isActive()).thenReturn(true);
+        
when(http2StreamChannel.newSucceededFuture()).thenReturn(channel.newSucceededFuture());
+        when(http2StreamChannel.eventLoop()).thenReturn(new 
NioEventLoopGroup().next());
         TripleClientStream stream = new 
TripleClientStream(url.getOrDefaultFrameworkModel(),
-            ImmediateEventExecutor.INSTANCE, writeQueue, listener);
+            ImmediateEventExecutor.INSTANCE, writeQueue, listener, 
http2StreamChannel);
 
         final RequestMetadata requestMetadata = new RequestMetadata();
         requestMetadata.method = methodDescriptor;
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/transport/WriteQueueTest.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/transport/WriteQueueTest.java
index 7892628863..bd7b19f5bb 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/transport/WriteQueueTest.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/transport/WriteQueueTest.java
@@ -53,8 +53,12 @@ public class WriteQueueTest {
     @BeforeEach
     public void init() {
         channel = Mockito.mock(Channel.class);
+        Channel parent = Mockito.mock(Channel.class);
         ChannelPromise promise = Mockito.mock(ChannelPromise.class);
         EventLoop eventLoop = new DefaultEventLoop();
+        Mockito.when(parent.eventLoop()).thenReturn(eventLoop);
+
+        Mockito.when(channel.parent()).thenReturn(parent);
         Mockito.when(channel.eventLoop()).thenReturn(eventLoop);
         Mockito.when(channel.isActive()).thenReturn(true);
         Mockito.when(channel.newPromise()).thenReturn(promise);
@@ -70,14 +74,14 @@ public class WriteQueueTest {
     @Test
     public void test() throws Exception {
 
-        WriteQueue writeQueue = new WriteQueue(channel);
-        writeQueue.enqueue(HeaderQueueCommand.createHeaders(new 
DefaultHttp2Headers()));
-        writeQueue.enqueue(DataQueueCommand.createGrpcCommand(new byte[0], 
false, 0));
+        WriteQueue writeQueue = new WriteQueue();
+        writeQueue.enqueue(HeaderQueueCommand.createHeaders(new 
DefaultHttp2Headers()).channel(channel));
+        writeQueue.enqueue(DataQueueCommand.createGrpcCommand(new byte[0], 
false, 0).channel(channel));
         TriRpcStatus status = TriRpcStatus.UNKNOWN
                 .withCause(new RpcException())
                 .withDescription("Encode Response data error");
-        
writeQueue.enqueue(CancelQueueCommand.createCommand(Http2Error.CANCEL));
-        
writeQueue.enqueue(TextDataQueueCommand.createCommand(status.description, 
true));
+        
writeQueue.enqueue(CancelQueueCommand.createCommand(Http2Error.CANCEL).channel(channel));
+        
writeQueue.enqueue(TextDataQueueCommand.createCommand(status.description, 
true).channel(channel));
 
         while (writeMethodCalledTimes.get() != 4) {
             Thread.sleep(50);
@@ -96,13 +100,13 @@ public class WriteQueueTest {
 
     @Test
     public void testChunk() throws Exception {
-        WriteQueue writeQueue = new WriteQueue(channel);
+        WriteQueue writeQueue = new WriteQueue();
         // test deque chunk size
         writeMethodCalledTimes.set(0);
         for (int i = 0; i < DEQUE_CHUNK_SIZE; i++) {
-            writeQueue.enqueue(HeaderQueueCommand.createHeaders(new 
DefaultHttp2Headers()));
+            writeQueue.enqueue(HeaderQueueCommand.createHeaders(new 
DefaultHttp2Headers()).channel(channel));
         }
-        writeQueue.enqueue(HeaderQueueCommand.createHeaders(new 
DefaultHttp2Headers()));
+        writeQueue.enqueue(HeaderQueueCommand.createHeaders(new 
DefaultHttp2Headers()).channel(channel));
         while (writeMethodCalledTimes.get() != (DEQUE_CHUNK_SIZE + 1)) {
             Thread.sleep(50);
         }


Reply via email to