This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch pipe-borrow-timeout-opt
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/pipe-borrow-timeout-opt by 
this push:
     new e828dfef99a fix
e828dfef99a is described below

commit e828dfef99ab01789b52285f2652cfa2eb0d767d
Author: Caideyipi <[email protected]>
AuthorDate: Thu Aug 7 18:26:36 2025 +0800

    fix
---
 .../async/handler/PipeTransferTrackableHandler.java  | 20 +++++++++++++++++++-
 .../async/handler/PipeTransferTsFileHandler.java     | 20 ++++++++------------
 2 files changed, 27 insertions(+), 13 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index 25a341d954d..d5e765288d4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -26,11 +26,17 @@ import 
org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
 
 public abstract class PipeTransferTrackableHandler
     implements AsyncMethodCallback<TPipeTransferResp>, AutoCloseable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTransferTsFileHandler.class);
 
   protected final IoTDBDataRegionAsyncSink connector;
+  protected volatile AsyncPipeDataTransferServiceClient client;
 
   public PipeTransferTrackableHandler(final IoTDBDataRegionAsyncSink 
connector) {
     this.connector = connector;
@@ -77,6 +83,9 @@ public abstract class PipeTransferTrackableHandler
   protected boolean tryTransfer(
       final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq 
req)
       throws TException {
+    if (Objects.isNull(this.client)) {
+      this.client = client;
+    }
     // track handler before checking if connector is closed
     connector.trackHandler(this);
     if (connector.isClosed()) {
@@ -106,6 +115,15 @@ public abstract class PipeTransferTrackableHandler
 
   @Override
   public void close() {
-    // do nothing
+    try {
+      client.close();
+      client.invalidateAll();
+    } catch (final Exception e) {
+      LOGGER.warn(
+          "Failed to close or invalidate client when connector is closed. 
Client: {}, Exception: {}",
+          client,
+          e.getMessage(),
+          e);
+    }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index e40bdacd6a5..ed4a0517323 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -87,7 +87,6 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
   private final AtomicBoolean isSealSignalSent;
 
   private IoTDBDataNodeAsyncClientManager clientManager;
-  private volatile AsyncPipeDataTransferServiceClient client;
 
   public PipeTransferTsFileHandler(
       final IoTDBDataRegionAsyncSink connector,
@@ -419,19 +418,16 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     }
 
     if (connector.isClosed()) {
-      try {
-        client.close();
-        client.invalidateAll();
-      } catch (final Exception e) {
-        LOGGER.warn(
-            "Failed to close or invalidate client when connector is closed. 
Client: {}, Exception: {}",
-            client,
-            e.getMessage(),
-            e);
-      }
+      close();
     }
+
     client.setShouldReturnSelf(true);
-    client.returnSelf();
+    try {
+      client.returnSelf();
+    } catch (final IllegalStateException e) {
+      LOGGER.info(
+          "Illegal state when return the client to object pool, maybe the pool 
is already cleared. Will ignore.");
+    }
     client = null;
   }
 

Reply via email to