This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 76d158abdce [To dev/1.3] Pipe: Add Thrift callback processing logic 
for ClosedChannelException (#16421) (#16422)
76d158abdce is described below

commit 76d158abdce5d0110e73906b25eee051d8430eb4
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Sep 16 17:40:50 2025 +0800

    [To dev/1.3] Pipe: Add Thrift callback processing logic for 
ClosedChannelException (#16421) (#16422)
---
 .../iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java   | 2 ++
 .../protocol/thrift/async/handler/PipeTransferTrackableHandler.java  | 5 +++++
 .../src/main/java/org/apache/iotdb/commons/client/ThriftClient.java  | 4 +++-
 3 files changed, 10 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
index 465f1e1cb8e..5901e488a58 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.sink.client;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
@@ -259,6 +260,7 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
 
           @Override
           public void onError(final Exception e) {
+            ThriftClient.resolveException(e, client);
             PipeLogger.log(
                 LOGGER::warn,
                 e,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index 6ea07b0c278..d0d6d054299 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler;
 
+import org.apache.iotdb.commons.client.ThriftClient;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -61,6 +62,10 @@ public abstract class PipeTransferTrackableHandler
 
   @Override
   public void onError(final Exception exception) {
+    if (client != null) {
+      ThriftClient.resolveException(exception, client);
+    }
+
     if (connector.isClosed()) {
       clearEventsReferenceCount();
       connector.eliminateHandler(this, true);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
index 1d25e4bc1b8..d2128bedaa4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.ConnectException;
 import java.net.SocketException;
+import java.nio.channels.ClosedChannelException;
 import java.util.Optional;
 
 /**
@@ -113,7 +114,8 @@ public interface ThriftClient {
         || (cause instanceof IOException
             && (hasExpectedMessage(cause, "Connection reset by peer")
                 || hasExpectedMessage(cause, "Broken pipe")))
-        || (cause instanceof ConnectException && hasExpectedMessage(cause, 
"Connection refused"));
+        || (cause instanceof ConnectException && hasExpectedMessage(cause, 
"Connection refused")
+            || (cause instanceof ClosedChannelException));
   }
 
   static boolean hasExpectedMessage(Throwable cause, String expectedMessage) {

Reply via email to