This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new cf94f1e0114 Pipe: Avoid unnecessary close-client in async client 
(Follow up fix for #16008)
cf94f1e0114 is described below

commit cf94f1e0114fe84243b20520b65e2408a55665be
Author: Caideyipi <[email protected]>
AuthorDate: Sat Aug 9 14:05:10 2025 +0800

    Pipe: Avoid unnecessary close-client in async client (Follow up fix for 
#16008)
---
 .../thrift/async/IoTDBDataRegionAsyncSink.java         |  8 ++++++--
 .../async/handler/PipeTransferTrackableHandler.java    | 18 +++++++++++-------
 .../async/handler/PipeTransferTsFileHandler.java       |  2 +-
 3 files changed, 18 insertions(+), 10 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index 5d6d1beba0b..ff35329ffb9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -756,7 +756,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
           .forEach(
               handler -> {
                 handler.clearEventsReferenceCount();
-                eliminateHandler(handler);
+                eliminateHandler(handler, true);
               });
     }
 
@@ -813,7 +813,11 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
     pendingHandlers.put(handler, handler);
   }
 
-  public void eliminateHandler(final PipeTransferTrackableHandler handler) {
+  public void eliminateHandler(
+      final PipeTransferTrackableHandler handler, final boolean closeClient) {
+    if (closeClient) {
+      handler.closeClient();
+    }
     handler.close();
     pendingHandlers.remove(handler);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index 7f0617a2785..6ea07b0c278 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -46,7 +46,7 @@ public abstract class PipeTransferTrackableHandler
   public void onComplete(final TPipeTransferResp response) {
     if (connector.isClosed()) {
       clearEventsReferenceCount();
-      connector.eliminateHandler(this);
+      connector.eliminateHandler(this, true);
       return;
     }
 
@@ -55,7 +55,7 @@ public abstract class PipeTransferTrackableHandler
       // completed
       // NOTE: We should not clear the reference count of events, as this 
would cause the
       // 
`org.apache.iotdb.pipe.it.dual.tablemodel.manual.basic.IoTDBPipeDataSinkIT#testSinkTsFileFormat3`
 test to fail.
-      connector.eliminateHandler(this);
+      connector.eliminateHandler(this, false);
     }
   }
 
@@ -63,12 +63,12 @@ public abstract class PipeTransferTrackableHandler
   public void onError(final Exception exception) {
     if (connector.isClosed()) {
       clearEventsReferenceCount();
-      connector.eliminateHandler(this);
+      connector.eliminateHandler(this, true);
       return;
     }
 
     onErrorInternal(exception);
-    connector.eliminateHandler(this);
+    connector.eliminateHandler(this, false);
   }
 
   /**
@@ -90,7 +90,7 @@ public abstract class PipeTransferTrackableHandler
     connector.trackHandler(this);
     if (connector.isClosed()) {
       clearEventsReferenceCount();
-      connector.eliminateHandler(this);
+      connector.eliminateHandler(this, true);
       client.setShouldReturnSelf(true);
       try {
         client.returnSelf();
@@ -119,8 +119,7 @@ public abstract class PipeTransferTrackableHandler
 
   public abstract void clearEventsReferenceCount();
 
-  @Override
-  public void close() {
+  public void closeClient() {
     if (Objects.isNull(client)) {
       return;
     }
@@ -135,4 +134,9 @@ public abstract class PipeTransferTrackableHandler
           e);
     }
   }
+
+  @Override
+  public void close() {
+    // Do nothing
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index ed4a0517323..39c4b148e21 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -418,7 +418,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     }
 
     if (connector.isClosed()) {
-      close();
+      closeClient();
     }
 
     client.setShouldReturnSelf(true);

Reply via email to