This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 01299904bb Close stream when channel was inactive (#14641)
01299904bb is described below

commit 01299904bbcb48797048d0d41f8b16e33e092340
Author: TomlongTK <longqian...@maoyan.com>
AuthorDate: Fri Sep 6 23:23:38 2024 +0800

    Close stream when channel was inactive (#14641)
    
    * Close stream when channel was inactive
    
    * Do onClose in stream executor
    
    refine
    
    ---------
    
    Co-authored-by: Albumen Kevin <jhq0...@gmail.com>
---
 .../org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java | 8 ++++++++
 .../rpc/protocol/tri/h3/negotiation/NegotiateClientCall.java     | 9 +++++++++
 .../rpc/protocol/tri/stream/AbstractTripleClientStream.java      | 5 +++++
 .../org/apache/dubbo/rpc/protocol/tri/stream/ClientStream.java   | 2 ++
 .../dubbo/rpc/protocol/tri/transport/H2TransportListener.java    | 2 ++
 .../protocol/tri/transport/TripleHttp2ClientResponseHandler.java | 1 +
 .../dubbo/rpc/protocol/tri/stream/MockClientStreamListener.java  | 3 +++
 .../protocol/tri/transport/AbstractH2TransportListenerTest.java  | 3 +++
 8 files changed, 33 insertions(+)

diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
index 7c983c29d5..9965191497 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
@@ -135,6 +135,14 @@ public class TripleClientCall implements ClientCall, 
ClientStream.Listener {
         }
     }
 
+    @Override
+    public void onClose() {
+        if (done) {
+            return;
+        }
+        onCancelByRemote(TriRpcStatus.CANCELLED);
+    }
+
     @Override
     public void onStart() {
         listener.onStart(this);
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/negotiation/NegotiateClientCall.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/negotiation/NegotiateClientCall.java
index 309301348c..54db142391 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/negotiation/NegotiateClientCall.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/negotiation/NegotiateClientCall.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.constants.CommonConstants;
 import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
 import org.apache.dubbo.remoting.http12.HttpHeaderNames;
+import org.apache.dubbo.rpc.TriRpcStatus;
 import org.apache.dubbo.rpc.protocol.tri.TripleConstants;
 import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
 import org.apache.dubbo.rpc.protocol.tri.transport.H2TransportListener;
@@ -133,5 +134,13 @@ public class NegotiateClientCall {
         public void cancelByRemote(long errorCode) {
             executor.execute(() -> future.completeExceptionally(new 
RuntimeException("Canceled by remote")));
         }
+
+        @Override
+        public void onClose() {
+            if (future.isDone()) {
+                return;
+            }
+            cancelByRemote(TriRpcStatus.CANCELLED.code.code);
+        }
     }
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
index ae50b9427d..036739f019 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
@@ -453,5 +453,10 @@ public abstract class AbstractTripleClientStream extends 
AbstractStream implemen
                 finishProcess(transportError, null, false);
             });
         }
+
+        @Override
+        public void onClose() {
+            executor.execute(listener::onClose);
+        }
     }
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/ClientStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/ClientStream.java
index 178a9f310e..94bff9865d 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/ClientStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/ClientStream.java
@@ -56,6 +56,8 @@ public interface ClientStream extends Stream {
                 boolean isReturnTriException) {
             onComplete(status, attachments);
         }
+
+        void onClose();
     }
 
     /**
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/H2TransportListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/H2TransportListener.java
index 2d6c62e0be..78f0569ac4 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/H2TransportListener.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/H2TransportListener.java
@@ -42,4 +42,6 @@ public interface H2TransportListener {
     void onData(ByteBuf data, boolean endStream);
 
     void cancelByRemote(long errorCode);
+
+    void onClose();
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2ClientResponseHandler.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2ClientResponseHandler.java
index b8e9230eb5..e7a1d8b297 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2ClientResponseHandler.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2ClientResponseHandler.java
@@ -80,6 +80,7 @@ public final class TripleHttp2ClientResponseHandler extends 
SimpleChannelInbound
 
     @Override
     public void channelInactive(ChannelHandlerContext ctx) {
+        transportListener.onClose();
         ctx.close();
     }
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/MockClientStreamListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/MockClientStreamListener.java
index 4daeea8137..35219f1f05 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/MockClientStreamListener.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/MockClientStreamListener.java
@@ -36,6 +36,9 @@ public class MockClientStreamListener implements 
ClientStream.Listener {
         this.status = status;
     }
 
+    @Override
+    public void onClose() {}
+
     @Override
     public void onMessage(byte[] message, boolean isNeedReturnException) {
         this.message = message;
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/transport/AbstractH2TransportListenerTest.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/transport/AbstractH2TransportListenerTest.java
index 92d5b3c65c..e17496372b 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/transport/AbstractH2TransportListenerTest.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/transport/AbstractH2TransportListenerTest.java
@@ -40,6 +40,9 @@ class AbstractH2TransportListenerTest {
 
             @Override
             public void cancelByRemote(long errorCode) {}
+
+            @Override
+            public void onClose() {}
         };
         DefaultHttp2Headers headers = new DefaultHttp2Headers();
         
headers.scheme(HTTPS.name()).path("/foo.bar").method(HttpMethod.POST.asciiName());

Reply via email to