This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 4310ea8bda issue 10558: optimize performance for triple protocol(3.2)
(#10587)
4310ea8bda is described below
commit 4310ea8bda25d935f0c476ba9c9471a99c336891
Author: 一个不知名的Java靓仔 <[email protected]>
AuthorDate: Sat Oct 22 15:37:51 2022 +0800
issue 10558: optimize performance for triple protocol(3.2) (#10587)
---
.../apache/dubbo/common/BatchExecutorQueue.java | 2 +-
.../rpc/protocol/tri/TripleHttp2Protocol.java | 11 ++-
.../dubbo/rpc/protocol/tri/TripleInvoker.java | 5 +-
.../rpc/protocol/tri/call/TripleClientCall.java | 7 +-
.../tri/command/EndStreamQueueCommand.java | 2 +-
.../rpc/protocol/tri/command/QueuedCommand.java | 12 ++-
.../protocol/tri/stream/TripleClientStream.java | 105 +++++++++++++++------
.../protocol/tri/stream/TripleServerStream.java | 83 +++++++++++-----
.../transport/TripleHttp2FrameServerHandler.java | 25 ++---
.../protocol/tri/transport/TripleWriteQueue.java | 54 +++++++++++
.../rpc/protocol/tri/transport/WriteQueue.java | 52 +++-------
.../tri/stream/TripleClientStreamTest.java | 12 ++-
.../rpc/protocol/tri/transport/WriteQueueTest.java | 20 ++--
13 files changed, 256 insertions(+), 134 deletions(-)
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/BatchExecutorQueue.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/BatchExecutorQueue.java
index 10b5dfae85..4718f21feb 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/BatchExecutorQueue.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/BatchExecutorQueue.java
@@ -74,7 +74,7 @@ public class BatchExecutorQueue<T> {
i++;
}
}
- if (i != 0 || !flushedOnce) {
+ if ((i != 0 || !flushedOnce) && item != null) {
flush(item);
}
} finally {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index 6fba53857c..198dcf8abb 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -37,9 +37,9 @@ import
org.apache.dubbo.rpc.protocol.tri.transport.TripleHttp2FrameServerHandler
import
org.apache.dubbo.rpc.protocol.tri.transport.TripleServerConnectionHandler;
import org.apache.dubbo.rpc.protocol.tri.transport.TripleTailHandler;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.codec.http2.Http2FrameCodec;
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
import io.netty.handler.codec.http2.Http2FrameLogger;
@@ -48,6 +48,8 @@ import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.ssl.SslContext;
+import org.apache.dubbo.rpc.protocol.tri.transport.TripleWriteQueue;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -125,14 +127,15 @@ public class TripleHttp2Protocol extends
AbstractWireProtocol implements ScopeMo
DEFAULT_MAX_HEADER_LIST_SIZE)))
.frameLogger(SERVER_LOGGER)
.build();
+ TripleWriteQueue writeQueue = new TripleWriteQueue();
final Http2MultiplexHandler handler = new Http2MultiplexHandler(
- new ChannelInitializer<Channel>() {
+ new ChannelInitializer<Http2StreamChannel>() {
@Override
- protected void initChannel(Channel ch) {
+ protected void initChannel(Http2StreamChannel ch) {
final ChannelPipeline p = ch.pipeline();
p.addLast(new TripleCommandOutBoundHandler());
p.addLast(new
TripleHttp2FrameServerHandler(frameworkModel, lookupExecutor(url),
- headFilters));
+ headFilters, ch, writeQueue));
}
});
List<ChannelHandler> handlers = new ArrayList<>();
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
index 16b4373cc9..89f9012150 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
@@ -47,6 +47,7 @@ import
org.apache.dubbo.rpc.protocol.tri.call.TripleClientCall;
import org.apache.dubbo.rpc.protocol.tri.call.UnaryClientCallListener;
import org.apache.dubbo.rpc.protocol.tri.compressor.Compressor;
import org.apache.dubbo.rpc.protocol.tri.observer.ClientCallToObserverAdapter;
+import org.apache.dubbo.rpc.protocol.tri.transport.TripleWriteQueue;
import org.apache.dubbo.rpc.support.RpcUtils;
import io.netty.util.AsciiString;
@@ -80,6 +81,7 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
private final Set<Invoker<?>> invokers;
private final ExecutorService streamExecutor;
private final String acceptEncodings;
+ private final TripleWriteQueue writeQueue = new TripleWriteQueue();
public TripleInvoker(Class<T> serviceType,
URL url,
@@ -117,7 +119,7 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
invocation.getMethodName(),
invocation.getParameterTypes());
ClientCall call = new TripleClientCall(connection, streamExecutor,
- getUrl().getOrDefaultFrameworkModel());
+ getUrl().getOrDefaultFrameworkModel(), writeQueue);
AsyncRpcResult result;
try {
@@ -311,5 +313,4 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
}
return timeout;
}
-
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
index b13af798cd..90db0098c2 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
@@ -38,6 +38,7 @@ import com.google.protobuf.Any;
import com.google.rpc.DebugInfo;
import com.google.rpc.ErrorInfo;
import com.google.rpc.Status;
+import org.apache.dubbo.rpc.protocol.tri.transport.TripleWriteQueue;
import java.io.IOException;
import java.util.HashMap;
@@ -51,6 +52,7 @@ public class TripleClientCall implements ClientCall,
ClientStream.Listener {
private final Connection connection;
private final Executor executor;
private final FrameworkModel frameworkModel;
+ private final TripleWriteQueue writeQueue;
private RequestMetadata requestMetadata;
private ClientStream stream;
private ClientCall.Listener listener;
@@ -60,10 +62,11 @@ public class TripleClientCall implements ClientCall,
ClientStream.Listener {
private boolean done;
public TripleClientCall(Connection connection, Executor executor,
- FrameworkModel frameworkModel) {
+ FrameworkModel frameworkModel, TripleWriteQueue
writeQueue) {
this.connection = connection;
this.executor = executor;
this.frameworkModel = frameworkModel;
+ this.writeQueue= writeQueue;
}
@@ -270,7 +273,7 @@ public class TripleClientCall implements ClientCall,
ClientStream.Listener {
this.requestMetadata = metadata;
this.listener = responseListener;
this.stream = new TripleClientStream(frameworkModel, executor,
connection.getChannel(),
- this);
+ this, writeQueue);
return new ClientCallToObserverAdapter<>(this);
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/EndStreamQueueCommand.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/EndStreamQueueCommand.java
index 77713cf1bf..9dacafd98f 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/EndStreamQueueCommand.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/EndStreamQueueCommand.java
@@ -29,6 +29,6 @@ public class EndStreamQueueCommand extends QueuedCommand {
@Override
public void doSend(ChannelHandlerContext ctx, ChannelPromise promise) {
- ctx.write(new DefaultHttp2DataFrame(true));
+ ctx.write(new DefaultHttp2DataFrame(true), promise);
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/QueuedCommand.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/QueuedCommand.java
index 9fa0b76c12..c16ac22958 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/QueuedCommand.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/QueuedCommand.java
@@ -23,6 +23,8 @@ import io.netty.channel.ChannelPromise;
public abstract class QueuedCommand {
+ protected Channel channel;
+
private ChannelPromise promise;
public ChannelPromise promise() {
@@ -48,10 +50,18 @@ public abstract class QueuedCommand {
public final void send(ChannelHandlerContext ctx, ChannelPromise promise) {
if (ctx.channel().isActive()) {
doSend(ctx, promise);
- ctx.flush();
}
}
+ public QueuedCommand channel(Channel channel) {
+ this.channel = channel;
+ return this;
+ }
+
+ public Channel channel() {
+ return channel;
+ }
+
public abstract void doSend(ChannelHandlerContext ctx, ChannelPromise
promise);
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
index a3d19abb56..b694c28bc3 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
@@ -17,6 +17,17 @@
package org.apache.dubbo.rpc.protocol.tri.stream;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.http2.Http2Error;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.codec.http2.Http2StreamChannel;
+import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.concurrent.Future;
import org.apache.dubbo.rpc.TriRpcStatus;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
@@ -32,18 +43,10 @@ import
org.apache.dubbo.rpc.protocol.tri.transport.AbstractH2TransportListener;
import org.apache.dubbo.rpc.protocol.tri.transport.H2TransportListener;
import
org.apache.dubbo.rpc.protocol.tri.transport.TripleCommandOutBoundHandler;
import
org.apache.dubbo.rpc.protocol.tri.transport.TripleHttp2ClientResponseHandler;
+import org.apache.dubbo.rpc.protocol.tri.transport.TripleWriteQueue;
import org.apache.dubbo.rpc.protocol.tri.transport.WriteQueue;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.handler.codec.http2.Http2Error;
-import io.netty.handler.codec.http2.Http2Headers;
-import io.netty.handler.codec.http2.Http2StreamChannel;
-import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
-import io.netty.util.ReferenceCountUtil;
-import io.netty.util.concurrent.Future;
-
+import java.io.IOException;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Map;
@@ -58,56 +61,66 @@ import java.util.concurrent.Executor;
public class TripleClientStream extends AbstractStream implements ClientStream
{
public final ClientStream.Listener listener;
- private final WriteQueue writeQueue;
+ private final TripleWriteQueue writeQueue;
private Deframer deframer;
private final Channel parent;
+ private final Http2StreamChannel http2StreamChannel;
+ private boolean rst;
// for test
TripleClientStream(FrameworkModel frameworkModel,
Executor executor,
- WriteQueue writeQueue,
- ClientStream.Listener listener) {
+ TripleWriteQueue writeQueue,
+ ClientStream.Listener listener,
+ Http2StreamChannel http2StreamChannel) {
super(executor, frameworkModel);
this.parent = null;
this.listener = listener;
this.writeQueue = writeQueue;
+ this.http2StreamChannel = http2StreamChannel;
}
public TripleClientStream(FrameworkModel frameworkModel,
Executor executor,
Channel parent,
- ClientStream.Listener listener) {
+ ClientStream.Listener listener,
+ TripleWriteQueue writeQueue) {
super(executor, frameworkModel);
this.parent = parent;
this.listener = listener;
- this.writeQueue = createWriteQueue(parent);
+ this.writeQueue = writeQueue;
+ this.http2StreamChannel = initHttp2StreamChannel(parent);
}
- private WriteQueue createWriteQueue(Channel parent) {
- final Http2StreamChannelBootstrap bootstrap = new
Http2StreamChannelBootstrap(parent);
- final Future<Http2StreamChannel> future =
bootstrap.open().syncUninterruptibly();
+ private Http2StreamChannel initHttp2StreamChannel(Channel parent) {
+ Http2StreamChannelBootstrap bootstrap = new
Http2StreamChannelBootstrap(parent);
+ Future<Http2StreamChannel> future = bootstrap.handler(new
ChannelInboundHandlerAdapter() {
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) throws
Exception {
+ Channel channel = ctx.channel();
+ channel.pipeline().addLast(new
TripleCommandOutBoundHandler());
+ channel.pipeline().addLast(new
TripleHttp2ClientResponseHandler(createTransportListener()));
+ channel.closeFuture().addListener(f ->
transportException(f.cause()));
+ }
+ }).open().syncUninterruptibly();
if (!future.isSuccess()) {
throw new IllegalStateException("Create remote stream failed.
channel:" + parent);
}
- final Http2StreamChannel channel = future.getNow();
- channel.pipeline()
- .addLast(new TripleCommandOutBoundHandler())
- .addLast(new
TripleHttp2ClientResponseHandler(createTransportListener()));
- channel.closeFuture()
- .addListener(f -> transportException(f.cause()));
- return new WriteQueue(channel);
+ return future.getNow();
}
- public void close() {
- writeQueue.close();
- }
public ChannelFuture sendHeader(Http2Headers headers) {
if (this.writeQueue == null) {
// already processed at createStream()
return parent.newFailedFuture(new IllegalStateException("Stream
already closed"));
}
+ ChannelFuture checkResult = preCheck();
+ if (!checkResult.isSuccess()) {
+ return checkResult;
+ }
final HeaderQueueCommand headerCmd =
HeaderQueueCommand.createHeaders(headers);
+ headerCmd.channel(http2StreamChannel);
return writeQueue.enqueue(headerCmd).addListener(future -> {
if (!future.isSuccess()) {
transportException(future.cause());
@@ -122,7 +135,13 @@ public class TripleClientStream extends AbstractStream
implements ClientStream {
}
public ChannelFuture cancelByLocal(TriRpcStatus status) {
+ ChannelFuture checkResult = preCheck();
+ if (!checkResult.isSuccess()) {
+ return checkResult;
+ }
final CancelQueueCommand cmd =
CancelQueueCommand.createCommand(Http2Error.CANCEL);
+ cmd.channel(http2StreamChannel);
+ TripleClientStream.this.rst = true;
return this.writeQueue.enqueue(cmd, true);
}
@@ -134,8 +153,13 @@ public class TripleClientStream extends AbstractStream
implements ClientStream {
@Override
public ChannelFuture sendMessage(byte[] message, int compressFlag, boolean
eos) {
+ ChannelFuture checkResult = preCheck();
+ if (!checkResult.isSuccess()) {
+ return checkResult;
+ }
final DataQueueCommand cmd =
DataQueueCommand.createGrpcCommand(message, false,
compressFlag);
+ cmd.channel(http2StreamChannel);
return this.writeQueue.enqueue(cmd)
.addListener(future -> {
if (!future.isSuccess()) {
@@ -156,10 +180,25 @@ public class TripleClientStream extends AbstractStream
implements ClientStream {
@Override
public ChannelFuture halfClose() {
+ ChannelFuture checkResult = preCheck();
+ if (!checkResult.isSuccess()) {
+ return checkResult;
+ }
final EndStreamQueueCommand cmd = EndStreamQueueCommand.create();
+ cmd.channel(http2StreamChannel);
return this.writeQueue.enqueue(cmd);
}
+ private ChannelFuture preCheck() {
+ if (!http2StreamChannel.isActive()) {
+ return http2StreamChannel.newFailedFuture(new IOException("stream
channel is closed"));
+ }
+ if (rst) {
+ return http2StreamChannel.newFailedFuture(new IOException("stream
channel has reset"));
+ }
+ return http2StreamChannel.newSucceededFuture();
+ }
+
/**
* @return transport listener
*/
@@ -177,7 +216,8 @@ public class TripleClientStream extends AbstractStream
implements ClientStream {
private Http2Headers trailers;
void handleH2TransportError(TriRpcStatus status) {
-
writeQueue.enqueue(CancelQueueCommand.createCommand(Http2Error.NO_ERROR), true);
+
writeQueue.enqueue(CancelQueueCommand.createCommand(Http2Error.NO_ERROR).channel(http2StreamChannel));
+ TripleClientStream.this.rst = true;
finishProcess(status, null);
}
@@ -310,8 +350,11 @@ public class TripleClientStream extends AbstractStream
implements ClientStream {
executor.execute(() -> {
if (endStream) {
if (!halfClosed) {
-
writeQueue.enqueue(CancelQueueCommand.createCommand(Http2Error.CANCEL),
- true);
+ if (http2StreamChannel.isActive() && !rst) {
+
writeQueue.enqueue(CancelQueueCommand.createCommand(Http2Error.CANCEL).channel(http2StreamChannel),
+ true);
+ rst = true;
+ }
}
onTrailersReceived(headers);
} else {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
index a9aa76c2aa..6b99325f3b 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.rpc.protocol.tri.stream;
+import io.netty.handler.codec.http2.Http2StreamChannel;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
@@ -41,13 +42,12 @@ import org.apache.dubbo.rpc.protocol.tri.frame.Deframer;
import org.apache.dubbo.rpc.protocol.tri.frame.TriDecoder;
import org.apache.dubbo.rpc.protocol.tri.transport.AbstractH2TransportListener;
import org.apache.dubbo.rpc.protocol.tri.transport.H2TransportListener;
-import org.apache.dubbo.rpc.protocol.tri.transport.WriteQueue;
+import org.apache.dubbo.rpc.protocol.tri.transport.TripleWriteQueue;
import com.google.protobuf.Any;
import com.google.rpc.DebugInfo;
import com.google.rpc.Status;
import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
@@ -58,6 +58,7 @@ import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.util.concurrent.Future;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
@@ -71,7 +72,7 @@ public class TripleServerStream extends AbstractStream
implements ServerStream {
private static final Logger LOGGER =
LoggerFactory.getLogger(TripleServerStream.class);
public final ServerTransportObserver transportObserver = new
ServerTransportObserver();
- private final WriteQueue writeQueue;
+ private final TripleWriteQueue writeQueue;
private final PathResolver pathResolver;
private final List<HeaderFilter> filters;
private final String acceptEncoding;
@@ -80,22 +81,24 @@ public class TripleServerStream extends AbstractStream
implements ServerStream {
private volatile boolean reset;
private ServerStream.Listener listener;
private final InetSocketAddress remoteAddress;
- private final Channel channel;
private Deframer deframer;
+ private boolean rst = false;
+ private final Http2StreamChannel http2StreamChannel;
- public TripleServerStream(Channel channel,
+ public TripleServerStream(Http2StreamChannel channel,
FrameworkModel frameworkModel,
Executor executor,
PathResolver pathResolver,
String acceptEncoding,
- List<HeaderFilter> filters) {
+ List<HeaderFilter> filters,
+ TripleWriteQueue writeQueue) {
super(executor, frameworkModel);
- this.channel = channel;
this.pathResolver = pathResolver;
this.acceptEncoding = acceptEncoding;
this.filters = filters;
- this.writeQueue = new WriteQueue(channel);
+ this.writeQueue = writeQueue;
this.remoteAddress = (InetSocketAddress) channel.remoteAddress();
+ this.http2StreamChannel = channel;
}
@Override
@@ -109,24 +112,31 @@ public class TripleServerStream extends AbstractStream
implements ServerStream {
}
public ChannelFuture reset(Http2Error cause) {
- return writeQueue.enqueue(CancelQueueCommand.createCommand(cause),
true);
+ ChannelFuture checkResult = preCheck();
+ if (!checkResult.isSuccess()) {
+ return checkResult;
+ }
+ this.rst = true;
+ return
writeQueue.enqueue(CancelQueueCommand.createCommand(cause).channel(http2StreamChannel));
}
@Override
public ChannelFuture sendHeader(Http2Headers headers) {
if (reset) {
- return writeQueue.failure(
- new IllegalStateException("Stream already reset, no more
headers allowed"));
+ return http2StreamChannel.newFailedFuture(new
IllegalStateException("Stream already reset, no more headers allowed"));
}
if (headerSent) {
- return writeQueue.failure(new IllegalStateException("Header
already sent"));
+ return http2StreamChannel.newFailedFuture(new
IllegalStateException("Header already sent"));
}
if (trailersSent) {
- return writeQueue.failure(new IllegalStateException("Trailers
already sent"));
+ return http2StreamChannel.newFailedFuture(new
IllegalStateException("Trailers already sent"));
+ }
+ ChannelFuture checkResult = preCheck();
+ if (!checkResult.isSuccess()) {
+ return checkResult;
}
headerSent = true;
-
- return writeQueue.enqueue(HeaderQueueCommand.createHeaders(headers,
false))
+ return writeQueue.enqueue(HeaderQueueCommand.createHeaders(headers,
false).channel(http2StreamChannel))
.addListener(f -> {
if (!f.isSuccess()) {
reset(Http2Error.INTERNAL_ERROR);
@@ -137,7 +147,7 @@ public class TripleServerStream extends AbstractStream
implements ServerStream {
@Override
public Future<?> cancelByLocal(TriRpcStatus status) {
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(String.format("Cancel stream:%s by local: %s",
channel, status));
+ LOGGER.debug(String.format("Cancel stream:%s by local: %s",
http2StreamChannel, status));
}
return reset(Http2Error.CANCEL);
}
@@ -151,15 +161,18 @@ public class TripleServerStream extends AbstractStream
implements ServerStream {
private ChannelFuture sendTrailers(Http2Headers trailers) {
if (reset) {
- return writeQueue.failure(
- new IllegalStateException("Stream already reset, no more
trailers allowed"));
+ return http2StreamChannel.newFailedFuture(new
IllegalStateException("Stream already reset, no more trailers allowed"));
}
if (trailersSent) {
- return writeQueue.failure(new IllegalStateException("Trailers
already sent"));
+ return http2StreamChannel.newFailedFuture(new
IllegalStateException("Trailers already sent"));
+ }
+ ChannelFuture checkResult = preCheck();
+ if (!checkResult.isSuccess()) {
+ return checkResult;
}
headerSent = true;
trailersSent = true;
- return writeQueue.enqueue(HeaderQueueCommand.createHeaders(trailers,
true))
+ return writeQueue.enqueue(HeaderQueueCommand.createHeaders(trailers,
true).channel(http2StreamChannel))
.addListener(f -> {
if (!f.isSuccess()) {
reset(Http2Error.INTERNAL_ERROR);
@@ -213,18 +226,22 @@ public class TripleServerStream extends AbstractStream
implements ServerStream {
@Override
public ChannelFuture sendMessage(byte[] message, int compressFlag) {
if (reset) {
- return writeQueue.failure(
+ return http2StreamChannel.newFailedFuture(
new IllegalStateException("Stream already reset, no more body
allowed"));
}
if (!headerSent) {
- return writeQueue.failure(
+ return http2StreamChannel.newFailedFuture(
new IllegalStateException("Headers did not sent before send
body"));
}
if (trailersSent) {
- return writeQueue.failure(
+ return http2StreamChannel.newFailedFuture(
new IllegalStateException("Trailers already sent, no more body
allowed"));
}
- return writeQueue.enqueue(DataQueueCommand.createGrpcCommand(message,
false, compressFlag));
+ ChannelFuture checkResult = preCheck();
+ if (!checkResult.isSuccess()) {
+ return checkResult;
+ }
+ return writeQueue.enqueue(DataQueueCommand.createGrpcCommand(message,
false, compressFlag).channel(http2StreamChannel));
}
/**
@@ -234,12 +251,16 @@ public class TripleServerStream extends AbstractStream
implements ServerStream {
* @param status status of error
*/
private void responsePlainTextError(int code, TriRpcStatus status) {
+ ChannelFuture checkResult = preCheck();
+ if (!checkResult.isSuccess()) {
+ return;
+ }
Http2Headers headers = new
DefaultHttp2Headers(true).status(String.valueOf(code))
.setInt(TripleHeaderEnum.STATUS_KEY.getHeader(), status.code.code)
.set(TripleHeaderEnum.MESSAGE_KEY.getHeader(), status.description)
.set(TripleHeaderEnum.CONTENT_TYPE_KEY.getHeader(),
TripleConstant.TEXT_PLAIN_UTF8);
- writeQueue.enqueue(HeaderQueueCommand.createHeaders(headers, false));
-
writeQueue.enqueue(TextDataQueueCommand.createCommand(status.description,
true));
+ writeQueue.enqueue(HeaderQueueCommand.createHeaders( headers,
false).channel(http2StreamChannel));
+
writeQueue.enqueue(TextDataQueueCommand.createCommand(status.description,
true).channel(http2StreamChannel));
}
/**
@@ -275,6 +296,16 @@ public class TripleServerStream extends AbstractStream
implements ServerStream {
return invoker;
}
+ private ChannelFuture preCheck() {
+ if (!http2StreamChannel.isActive()) {
+ return http2StreamChannel.newFailedFuture(new IOException("stream
channel is closed"));
+ }
+ if (rst) {
+ return http2StreamChannel.newFailedFuture(new IOException("stream
channel has reset"));
+ }
+ return http2StreamChannel.newSucceededFuture();
+ }
+
public class ServerTransportObserver extends AbstractH2TransportListener
implements
H2TransportListener {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2FrameServerHandler.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2FrameServerHandler.java
index 048d24644f..4e6c992828 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2FrameServerHandler.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2FrameServerHandler.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.rpc.protocol.tri.transport;
+import io.netty.handler.codec.http2.Http2StreamChannel;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.rpc.HeaderFilter;
@@ -31,7 +32,6 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2DataFrame;
import io.netty.handler.codec.http2.Http2HeadersFrame;
import io.netty.handler.codec.http2.Http2ResetFrame;
-import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
@@ -40,9 +40,6 @@ import java.util.concurrent.Executor;
public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
- private static final AttributeKey<TripleServerStream> SERVER_STREAM_KEY =
AttributeKey.valueOf(
- "tri_server_stream");
-
private static final Logger LOGGER = LoggerFactory.getLogger(
TripleHttp2FrameServerHandler.class);
@@ -51,11 +48,14 @@ public class TripleHttp2FrameServerHandler extends
ChannelDuplexHandler {
private final Executor executor;
private final List<HeaderFilter> filters;
private final String acceptEncoding;
+ private final TripleServerStream tripleServerStream;
public TripleHttp2FrameServerHandler(
FrameworkModel frameworkModel,
Executor executor,
- List<HeaderFilter> filters) {
+ List<HeaderFilter> filters,
+ Http2StreamChannel channel,
+ TripleWriteQueue writeQueue) {
this.frameworkModel = frameworkModel;
this.executor = executor;
this.filters = filters;
@@ -63,6 +63,7 @@ public class TripleHttp2FrameServerHandler extends
ChannelDuplexHandler {
frameworkModel.getExtensionLoader(DeCompressor.class).getSupportedExtensions());
this.pathResolver =
frameworkModel.getExtensionLoader(PathResolver.class)
.getDefaultExtension();
+ tripleServerStream = new TripleServerStream(channel, frameworkModel,
executor, pathResolver, acceptEncoding, filters, writeQueue);
}
@Override
@@ -87,8 +88,6 @@ public class TripleHttp2FrameServerHandler extends
ChannelDuplexHandler {
}
public void onResetRead(ChannelHandlerContext ctx, Http2ResetFrame frame) {
- final TripleServerStream tripleServerStream =
ctx.channel().attr(SERVER_STREAM_KEY)
- .get();
LOGGER.warn("Triple Server received remote reset errorCode=" +
frame.errorCode());
if (tripleServerStream != null) {
tripleServerStream.transportObserver.cancelByRemote(frame.errorCode());
@@ -102,24 +101,14 @@ public class TripleHttp2FrameServerHandler extends
ChannelDuplexHandler {
}
TriRpcStatus status = TriRpcStatus.getStatus(cause,
"Provider's error:\n" + cause.getMessage());
- final TripleServerStream tripleServerStream =
ctx.channel().attr(SERVER_STREAM_KEY)
- .get();
- if (tripleServerStream != null) {
- tripleServerStream.cancelByLocal(status);
- }
+ tripleServerStream.cancelByLocal(status);
}
public void onDataRead(ChannelHandlerContext ctx, Http2DataFrame msg)
throws Exception {
- final TripleServerStream tripleServerStream =
ctx.channel().attr(SERVER_STREAM_KEY)
- .get();
tripleServerStream.transportObserver.onData(msg.content(),
msg.isEndStream());
}
public void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame
msg) throws Exception {
- TripleServerStream tripleServerStream = new
TripleServerStream(ctx.channel(),
- frameworkModel, executor,
- pathResolver, acceptEncoding, filters);
- ctx.channel().attr(SERVER_STREAM_KEY).set(tripleServerStream);
tripleServerStream.transportObserver.onHeader(msg.headers(),
msg.isEndStream());
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleWriteQueue.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleWriteQueue.java
new file mode 100644
index 0000000000..62e586471d
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleWriteQueue.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri.transport;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelPromise;
+import org.apache.dubbo.common.BatchExecutorQueue;
+import org.apache.dubbo.rpc.protocol.tri.command.QueuedCommand;
+
+public class TripleWriteQueue extends BatchExecutorQueue<QueuedCommand> {
+
+ public ChannelFuture enqueue(QueuedCommand command, boolean rst) {
+ return enqueue(command);
+ }
+
+ public ChannelFuture enqueue(QueuedCommand command) {
+ ChannelPromise promise = command.promise();
+ if (promise == null) {
+ Channel ch = command.channel();
+ promise = ch.newPromise();
+ command.promise(promise);
+ }
+ super.enqueue(command, command.channel().eventLoop());
+ return promise;
+ }
+
+ @Override
+ protected void prepare(QueuedCommand item) {
+ item.run(item.channel());
+ }
+
+ @Override
+ protected void flush(QueuedCommand item) {
+ Channel channel = item.channel();
+ item.run(channel);
+ channel.flush();
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/WriteQueue.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/WriteQueue.java
index 2d70f462d7..b0125681b8 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/WriteQueue.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/WriteQueue.java
@@ -23,89 +23,67 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
-import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+@Deprecated
public class WriteQueue {
static final int DEQUE_CHUNK_SIZE = 128;
- private final Channel channel;
private final Queue<QueuedCommand> queue;
private final AtomicBoolean scheduled;
- private volatile boolean rst;
- public WriteQueue(Channel channel) {
- this.channel = channel;
+ public WriteQueue() {
queue = new ConcurrentLinkedQueue<>();
scheduled = new AtomicBoolean(false);
}
- public ChannelFuture success() {
- return channel.newSucceededFuture();
- }
-
- public ChannelFuture failure(Throwable cause) {
- return channel.newFailedFuture(cause);
- }
-
public ChannelFuture enqueue(QueuedCommand command, boolean rst) {
- ChannelFuture future = enqueue(command);
- if (rst) {
- this.rst = true;
- }
- return future;
+ return enqueue(command);
}
public ChannelFuture enqueue(QueuedCommand command) {
- if (!channel.isActive()) {
- return channel.newFailedFuture(new IOException("channel is
closed"));
- }
- if (rst) {
- return channel.newFailedFuture(new IOException("channel has
reset"));
- }
ChannelPromise promise = command.promise();
if (promise == null) {
- promise = channel.newPromise();
+ Channel ch = command.channel();
+ promise = ch.newPromise();
command.promise(promise);
}
queue.add(command);
- scheduleFlush();
+ scheduleFlush(command.channel());
return promise;
}
- public void scheduleFlush() {
+ public void scheduleFlush(Channel ch) {
if (scheduled.compareAndSet(false, true)) {
- channel.eventLoop().execute(this::flush);
+ ch.parent().eventLoop().execute(this::flush);
}
}
- public void close() {
- channel.close();
- }
-
private void flush() {
+ Channel ch = null;
try {
QueuedCommand cmd;
int i = 0;
boolean flushedOnce = false;
while ((cmd = queue.poll()) != null) {
- cmd.run(channel);
+ ch = cmd.channel();
+ cmd.run(ch);
i++;
if (i == DEQUE_CHUNK_SIZE) {
i = 0;
- channel.flush();
+ ch.parent().flush();
flushedOnce = true;
}
}
- if (i != 0 || !flushedOnce) {
- channel.flush();
+ if (ch != null && (i != 0 || !flushedOnce)) {
+ ch.parent().flush();
}
} finally {
scheduled.set(false);
if (!queue.isEmpty()) {
- scheduleFlush();
+ scheduleFlush(ch);
}
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStreamTest.java
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStreamTest.java
index 4951f85f9f..c89f6c3998 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStreamTest.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStreamTest.java
@@ -17,6 +17,8 @@
package org.apache.dubbo.rpc.protocol.tri.stream;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.handler.codec.http2.Http2StreamChannel;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.TriRpcStatus;
import org.apache.dubbo.rpc.model.ApplicationModel;
@@ -34,7 +36,7 @@ import
org.apache.dubbo.rpc.protocol.tri.command.QueuedCommand;
import org.apache.dubbo.rpc.protocol.tri.compressor.Compressor;
import org.apache.dubbo.rpc.protocol.tri.support.IGreeter;
import org.apache.dubbo.rpc.protocol.tri.transport.H2TransportListener;
-import org.apache.dubbo.rpc.protocol.tri.transport.WriteQueue;
+import org.apache.dubbo.rpc.protocol.tri.transport.TripleWriteQueue;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -66,11 +68,15 @@ class TripleClientStreamTest {
new Class<?>[]{String.class});
MockClientStreamListener listener = new MockClientStreamListener();
- WriteQueue writeQueue = mock(WriteQueue.class);
+ TripleWriteQueue writeQueue = mock(TripleWriteQueue.class);
final EmbeddedChannel channel = new EmbeddedChannel();
when(writeQueue.enqueue(any())).thenReturn(channel.newPromise());
+ Http2StreamChannel http2StreamChannel = mock(Http2StreamChannel.class);
+ when(http2StreamChannel.isActive()).thenReturn(true);
+
when(http2StreamChannel.newSucceededFuture()).thenReturn(channel.newSucceededFuture());
+ when(http2StreamChannel.eventLoop()).thenReturn(new
NioEventLoopGroup().next());
TripleClientStream stream = new
TripleClientStream(url.getOrDefaultFrameworkModel(),
- ImmediateEventExecutor.INSTANCE, writeQueue, listener);
+ ImmediateEventExecutor.INSTANCE, writeQueue, listener,
http2StreamChannel);
final RequestMetadata requestMetadata = new RequestMetadata();
requestMetadata.method = methodDescriptor;
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/transport/WriteQueueTest.java
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/transport/WriteQueueTest.java
index 7892628863..bd7b19f5bb 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/transport/WriteQueueTest.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/transport/WriteQueueTest.java
@@ -53,8 +53,12 @@ public class WriteQueueTest {
@BeforeEach
public void init() {
channel = Mockito.mock(Channel.class);
+ Channel parent = Mockito.mock(Channel.class);
ChannelPromise promise = Mockito.mock(ChannelPromise.class);
EventLoop eventLoop = new DefaultEventLoop();
+ Mockito.when(parent.eventLoop()).thenReturn(eventLoop);
+
+ Mockito.when(channel.parent()).thenReturn(parent);
Mockito.when(channel.eventLoop()).thenReturn(eventLoop);
Mockito.when(channel.isActive()).thenReturn(true);
Mockito.when(channel.newPromise()).thenReturn(promise);
@@ -70,14 +74,14 @@ public class WriteQueueTest {
@Test
public void test() throws Exception {
- WriteQueue writeQueue = new WriteQueue(channel);
- writeQueue.enqueue(HeaderQueueCommand.createHeaders(new
DefaultHttp2Headers()));
- writeQueue.enqueue(DataQueueCommand.createGrpcCommand(new byte[0],
false, 0));
+ WriteQueue writeQueue = new WriteQueue();
+ writeQueue.enqueue(HeaderQueueCommand.createHeaders(new
DefaultHttp2Headers()).channel(channel));
+ writeQueue.enqueue(DataQueueCommand.createGrpcCommand(new byte[0],
false, 0).channel(channel));
TriRpcStatus status = TriRpcStatus.UNKNOWN
.withCause(new RpcException())
.withDescription("Encode Response data error");
-
writeQueue.enqueue(CancelQueueCommand.createCommand(Http2Error.CANCEL));
-
writeQueue.enqueue(TextDataQueueCommand.createCommand(status.description,
true));
+
writeQueue.enqueue(CancelQueueCommand.createCommand(Http2Error.CANCEL).channel(channel));
+
writeQueue.enqueue(TextDataQueueCommand.createCommand(status.description,
true).channel(channel));
while (writeMethodCalledTimes.get() != 4) {
Thread.sleep(50);
@@ -96,13 +100,13 @@ public class WriteQueueTest {
@Test
public void testChunk() throws Exception {
- WriteQueue writeQueue = new WriteQueue(channel);
+ WriteQueue writeQueue = new WriteQueue();
// test deque chunk size
writeMethodCalledTimes.set(0);
for (int i = 0; i < DEQUE_CHUNK_SIZE; i++) {
- writeQueue.enqueue(HeaderQueueCommand.createHeaders(new
DefaultHttp2Headers()));
+ writeQueue.enqueue(HeaderQueueCommand.createHeaders(new
DefaultHttp2Headers()).channel(channel));
}
- writeQueue.enqueue(HeaderQueueCommand.createHeaders(new
DefaultHttp2Headers()));
+ writeQueue.enqueue(HeaderQueueCommand.createHeaders(new
DefaultHttp2Headers()).channel(channel));
while (writeMethodCalledTimes.get() != (DEQUE_CHUNK_SIZE + 1)) {
Thread.sleep(50);
}