This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ff98e834bcd Pipe: Add Thrift callback processing logic for 
ClosedChannelException (#16421)
ff98e834bcd is described below

commit ff98e834bcd6b6bb7b2263098bf7e07e9fd546e8
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Sep 16 14:09:02 2025 +0800

    Pipe: Add Thrift callback processing logic for ClosedChannelException 
(#16421)
    
    * Add Thrift callback processing logic for ClosedChannelException
    
    * update
    
    * fix
    
    * fix
---
 .../iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java   | 2 ++
 .../protocol/thrift/async/handler/PipeTransferTrackableHandler.java  | 5 +++++
 .../src/main/java/org/apache/iotdb/commons/client/ThriftClient.java  | 4 +++-
 3 files changed, 10 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
index ff244215ef6..e9db8021e80 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.sink.client;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
@@ -259,6 +260,7 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
 
           @Override
           public void onError(final Exception e) {
+            ThriftClient.resolveException(e, client);
             PipeLogger.log(
                 LOGGER::warn,
                 e,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index 6ea07b0c278..d0d6d054299 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler;
 
+import org.apache.iotdb.commons.client.ThriftClient;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -61,6 +62,10 @@ public abstract class PipeTransferTrackableHandler
 
   @Override
   public void onError(final Exception exception) {
+    if (client != null) {
+      ThriftClient.resolveException(exception, client);
+    }
+
     if (connector.isClosed()) {
       clearEventsReferenceCount();
       connector.eliminateHandler(this, true);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
index 1d25e4bc1b8..d2128bedaa4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.ConnectException;
 import java.net.SocketException;
+import java.nio.channels.ClosedChannelException;
 import java.util.Optional;
 
 /**
@@ -113,7 +114,8 @@ public interface ThriftClient {
         || (cause instanceof IOException
             && (hasExpectedMessage(cause, "Connection reset by peer")
                 || hasExpectedMessage(cause, "Broken pipe")))
-        || (cause instanceof ConnectException && hasExpectedMessage(cause, 
"Connection refused"));
+        || (cause instanceof ConnectException && hasExpectedMessage(cause, 
"Connection refused")
+            || (cause instanceof ClosedChannelException));
   }
 
   static boolean hasExpectedMessage(Throwable cause, String expectedMessage) {

Reply via email to