This is an automated email from the ASF dual-hosted git repository. albumenj pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push: new 01299904bb Close stream when channel was inactive (#14641) 01299904bb is described below commit 01299904bbcb48797048d0d41f8b16e33e092340 Author: TomlongTK <longqian...@maoyan.com> AuthorDate: Fri Sep 6 23:23:38 2024 +0800 Close stream when channel was inactive (#14641) * Close stream when channel was inactive * Do onClose in stream executor refine --------- Co-authored-by: Albumen Kevin <jhq0...@gmail.com> --- .../org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java | 8 ++++++++ .../rpc/protocol/tri/h3/negotiation/NegotiateClientCall.java | 9 +++++++++ .../rpc/protocol/tri/stream/AbstractTripleClientStream.java | 5 +++++ .../org/apache/dubbo/rpc/protocol/tri/stream/ClientStream.java | 2 ++ .../dubbo/rpc/protocol/tri/transport/H2TransportListener.java | 2 ++ .../protocol/tri/transport/TripleHttp2ClientResponseHandler.java | 1 + .../dubbo/rpc/protocol/tri/stream/MockClientStreamListener.java | 3 +++ .../protocol/tri/transport/AbstractH2TransportListenerTest.java | 3 +++ 8 files changed, 33 insertions(+) diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java index 7c983c29d5..9965191497 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java @@ -135,6 +135,14 @@ public class TripleClientCall implements ClientCall, ClientStream.Listener { } } + @Override + public void onClose() { + if (done) { + return; + } + onCancelByRemote(TriRpcStatus.CANCELLED); + } + @Override public void onStart() { listener.onStart(this); diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/negotiation/NegotiateClientCall.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/negotiation/NegotiateClientCall.java index 309301348c..54db142391 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/negotiation/NegotiateClientCall.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/negotiation/NegotiateClientCall.java @@ -20,6 +20,7 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.constants.CommonConstants; import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient; import org.apache.dubbo.remoting.http12.HttpHeaderNames; +import org.apache.dubbo.rpc.TriRpcStatus; import org.apache.dubbo.rpc.protocol.tri.TripleConstants; import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum; import org.apache.dubbo.rpc.protocol.tri.transport.H2TransportListener; @@ -133,5 +134,13 @@ public class NegotiateClientCall { public void cancelByRemote(long errorCode) { executor.execute(() -> future.completeExceptionally(new RuntimeException("Canceled by remote"))); } + + @Override + public void onClose() { + if (future.isDone()) { + return; + } + cancelByRemote(TriRpcStatus.CANCELLED.code.code); + } } } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java index ae50b9427d..036739f019 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java @@ -453,5 +453,10 @@ public abstract class AbstractTripleClientStream extends AbstractStream implemen finishProcess(transportError, null, false); }); } + + @Override + public void onClose() { + executor.execute(listener::onClose); + } } } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/ClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/ClientStream.java index 178a9f310e..94bff9865d 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/ClientStream.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/ClientStream.java @@ -56,6 +56,8 @@ public interface ClientStream extends Stream { boolean isReturnTriException) { onComplete(status, attachments); } + + void onClose(); } /** diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/H2TransportListener.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/H2TransportListener.java index 2d6c62e0be..78f0569ac4 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/H2TransportListener.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/H2TransportListener.java @@ -42,4 +42,6 @@ public interface H2TransportListener { void onData(ByteBuf data, boolean endStream); void cancelByRemote(long errorCode); + + void onClose(); } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2ClientResponseHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2ClientResponseHandler.java index b8e9230eb5..e7a1d8b297 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2ClientResponseHandler.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2ClientResponseHandler.java @@ -80,6 +80,7 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound @Override public void channelInactive(ChannelHandlerContext ctx) { + transportListener.onClose(); ctx.close(); } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/MockClientStreamListener.java b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/MockClientStreamListener.java index 4daeea8137..35219f1f05 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/MockClientStreamListener.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/MockClientStreamListener.java @@ -36,6 +36,9 @@ public class MockClientStreamListener implements ClientStream.Listener { this.status = status; } + @Override + public void onClose() {} + @Override public void onMessage(byte[] message, boolean isNeedReturnException) { this.message = message; diff --git a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/transport/AbstractH2TransportListenerTest.java b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/transport/AbstractH2TransportListenerTest.java index 92d5b3c65c..e17496372b 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/transport/AbstractH2TransportListenerTest.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/transport/AbstractH2TransportListenerTest.java @@ -40,6 +40,9 @@ class AbstractH2TransportListenerTest { @Override public void cancelByRemote(long errorCode) {} + + @Override + public void onClose() {} }; DefaultHttp2Headers headers = new DefaultHttp2Headers(); headers.scheme(HTTPS.name()).path("/foo.bar").method(HttpMethod.POST.asciiName());