This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 76d158abdce [To dev/1.3] Pipe: Add Thrift callback processing logic
for ClosedChannelException (#16421) (#16422)
76d158abdce is described below
commit 76d158abdce5d0110e73906b25eee051d8430eb4
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Sep 16 17:40:50 2025 +0800
[To dev/1.3] Pipe: Add Thrift callback processing logic for
ClosedChannelException (#16421) (#16422)
---
.../iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java | 2 ++
.../protocol/thrift/async/handler/PipeTransferTrackableHandler.java | 5 +++++
.../src/main/java/org/apache/iotdb/commons/client/ThriftClient.java | 4 +++-
3 files changed, 10 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
index 465f1e1cb8e..5901e488a58 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.sink.client;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
@@ -259,6 +260,7 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
@Override
public void onError(final Exception e) {
+ ThriftClient.resolveException(e, client);
PipeLogger.log(
LOGGER::warn,
e,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index 6ea07b0c278..d0d6d054299 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler;
+import org.apache.iotdb.commons.client.ThriftClient;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -61,6 +62,10 @@ public abstract class PipeTransferTrackableHandler
@Override
public void onError(final Exception exception) {
+ if (client != null) {
+ ThriftClient.resolveException(exception, client);
+ }
+
if (connector.isClosed()) {
clearEventsReferenceCount();
connector.eliminateHandler(this, true);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
index 1d25e4bc1b8..d2128bedaa4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
@@ -29,6 +29,7 @@ import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.ConnectException;
import java.net.SocketException;
+import java.nio.channels.ClosedChannelException;
import java.util.Optional;
/**
@@ -113,7 +114,8 @@ public interface ThriftClient {
|| (cause instanceof IOException
&& (hasExpectedMessage(cause, "Connection reset by peer")
|| hasExpectedMessage(cause, "Broken pipe")))
- || (cause instanceof ConnectException && hasExpectedMessage(cause,
"Connection refused"));
+ || (cause instanceof ConnectException && hasExpectedMessage(cause,
"Connection refused")
+ || (cause instanceof ClosedChannelException));
}
static boolean hasExpectedMessage(Throwable cause, String expectedMessage) {