This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ff98e834bcd Pipe: Add Thrift callback processing logic for
ClosedChannelException (#16421)
ff98e834bcd is described below
commit ff98e834bcd6b6bb7b2263098bf7e07e9fd546e8
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Sep 16 14:09:02 2025 +0800
Pipe: Add Thrift callback processing logic for ClosedChannelException
(#16421)
* Add Thrift callback processing logic for ClosedChannelException
* update
* fix
* fix
---
.../iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java | 2 ++
.../protocol/thrift/async/handler/PipeTransferTrackableHandler.java | 5 +++++
.../src/main/java/org/apache/iotdb/commons/client/ThriftClient.java | 4 +++-
3 files changed, 10 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
index ff244215ef6..e9db8021e80 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.sink.client;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
@@ -259,6 +260,7 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
@Override
public void onError(final Exception e) {
+ ThriftClient.resolveException(e, client);
PipeLogger.log(
LOGGER::warn,
e,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index 6ea07b0c278..d0d6d054299 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler;
+import org.apache.iotdb.commons.client.ThriftClient;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -61,6 +62,10 @@ public abstract class PipeTransferTrackableHandler
@Override
public void onError(final Exception exception) {
+ if (client != null) {
+ ThriftClient.resolveException(exception, client);
+ }
+
if (connector.isClosed()) {
clearEventsReferenceCount();
connector.eliminateHandler(this, true);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
index 1d25e4bc1b8..d2128bedaa4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
@@ -29,6 +29,7 @@ import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.ConnectException;
import java.net.SocketException;
+import java.nio.channels.ClosedChannelException;
import java.util.Optional;
/**
@@ -113,7 +114,8 @@ public interface ThriftClient {
|| (cause instanceof IOException
&& (hasExpectedMessage(cause, "Connection reset by peer")
|| hasExpectedMessage(cause, "Broken pipe")))
- || (cause instanceof ConnectException && hasExpectedMessage(cause,
"Connection refused"));
+ || (cause instanceof ConnectException && hasExpectedMessage(cause,
"Connection refused")
+ || (cause instanceof ClosedChannelException));
}
static boolean hasExpectedMessage(Throwable cause, String expectedMessage) {