This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new b9c212489ba [To dev/1.3] Pipe: Fix and improve async tsfile transfer 
error handling and logging (avoid client connection leak) (#16008) (#16125)
b9c212489ba is described below

commit b9c212489bade0c8f03250203bce67cbfaa660c7
Author: Caideyipi <[email protected]>
AuthorDate: Fri Aug 8 12:06:02 2025 +0800

    [To dev/1.3] Pipe: Fix and improve async tsfile transfer error handling and 
logging (avoid client connection leak) (#16008) (#16125)
    
    * Pipe: Fix and improve async tsfile transfer error handling and logging
    
    Refactored IoTDBDataRegionAsyncConnector to handle exceptions during 
asynchronous tsfile transfer more gracefully. Now logs warnings instead of 
errors, invokes onError on the handler, and provides more context in log 
messages. Added getTsFile() method to PipeTransferTsFileHandler for better 
logging, and ensured memoryBlock is set to null after closing to prevent 
potential resource leaks.
    
    * fix
    
    * fix
    
    * bald-logger
    
    ---------
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../thrift/async/IoTDBDataRegionAsyncSink.java     | 24 ++++++++++-------
 .../handler/PipeTransferTrackableHandler.java      | 30 ++++++++++++++++++++--
 .../async/handler/PipeTransferTsFileHandler.java   | 26 ++++++++++---------
 3 files changed, 57 insertions(+), 23 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index 8c50c37fb5e..02febe6477b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -409,8 +409,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
     }
   }
 
-  private void transfer(final PipeTransferTsFileHandler 
pipeTransferTsFileHandler)
-      throws Exception {
+  private void transfer(final PipeTransferTsFileHandler 
pipeTransferTsFileHandler) {
     transferTsFileCounter.incrementAndGet();
     CompletableFuture<Void> completableFuture =
         CompletableFuture.supplyAsync(
@@ -432,13 +431,20 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
     if (PipeConfig.getInstance().isTransferTsFileSync() || !isRealtimeFirst) {
       try {
         completableFuture.get();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        LOGGER.error("Transfer tsfile event asynchronously was interrupted.", 
e);
-        throw new PipeException("Transfer tsfile event asynchronously was 
interrupted.", e);
-      } catch (Exception e) {
-        LOGGER.error("Failed to transfer tsfile event asynchronously.", e);
-        throw e;
+      } catch (final Exception e) {
+        if (e instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+          LOGGER.warn(
+              "Transfer tsfile event {} asynchronously was interrupted.",
+              pipeTransferTsFileHandler.getTsFile(),
+              e);
+        }
+
+        pipeTransferTsFileHandler.onError(e);
+        LOGGER.warn(
+            "Failed to transfer tsfile event {} asynchronously.",
+            pipeTransferTsFileHandler.getTsFile(),
+            e);
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index 25a341d954d..0b7ba7554fa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -26,11 +26,17 @@ import 
org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
 
 public abstract class PipeTransferTrackableHandler
     implements AsyncMethodCallback<TPipeTransferResp>, AutoCloseable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTransferTsFileHandler.class);
 
   protected final IoTDBDataRegionAsyncSink connector;
+  protected volatile AsyncPipeDataTransferServiceClient client;
 
   public PipeTransferTrackableHandler(final IoTDBDataRegionAsyncSink 
connector) {
     this.connector = connector;
@@ -77,13 +83,21 @@ public abstract class PipeTransferTrackableHandler
   protected boolean tryTransfer(
       final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq 
req)
       throws TException {
+    if (Objects.isNull(this.client)) {
+      this.client = client;
+    }
     // track handler before checking if connector is closed
     connector.trackHandler(this);
     if (connector.isClosed()) {
       clearEventsReferenceCount();
       connector.eliminateHandler(this);
       client.setShouldReturnSelf(true);
-      client.returnSelf();
+      try {
+        client.returnSelf();
+      } catch (final IllegalStateException e) {
+        LOGGER.info(
+            "Illegal state when return the client to object pool, maybe the 
pool is already cleared. Will ignore.");
+      }
       return false;
     }
     doTransfer(client, req);
@@ -106,6 +120,18 @@ public abstract class PipeTransferTrackableHandler
 
   @Override
   public void close() {
-    // do nothing
+    if (Objects.isNull(client)) {
+      return;
+    }
+    try {
+      client.close();
+      client.invalidateAll();
+    } catch (final Exception e) {
+      LOGGER.warn(
+          "Failed to close or invalidate client when connector is closed. 
Client: {}, Exception: {}",
+          client,
+          e.getMessage(),
+          e);
+    }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index e6d1f7b95ff..4e8c31e7da9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -88,7 +88,6 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
   private final AtomicBoolean isSealSignalSent;
 
   private IoTDBDataNodeAsyncClientManager clientManager;
-  private volatile AsyncPipeDataTransferServiceClient client;
 
   public PipeTransferTsFileHandler(
       final IoTDBDataRegionAsyncSink connector,
@@ -130,6 +129,10 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     isSealSignalSent = new AtomicBoolean(false);
   }
 
+  public File getTsFile() {
+    return tsFile;
+  }
+
   public void transfer(
       final IoTDBDataNodeAsyncClientManager clientManager,
       final AsyncPipeDataTransferServiceClient client)
@@ -413,19 +416,16 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     }
 
     if (connector.isClosed()) {
-      try {
-        client.close();
-        client.invalidateAll();
-      } catch (final Exception e) {
-        LOGGER.warn(
-            "Failed to close or invalidate client when connector is closed. 
Client: {}, Exception: {}",
-            client,
-            e.getMessage(),
-            e);
-      }
+      close();
     }
+
     client.setShouldReturnSelf(true);
-    client.returnSelf();
+    try {
+      client.returnSelf();
+    } catch (final IllegalStateException e) {
+      LOGGER.info(
+          "Illegal state when return the client to object pool, maybe the pool 
is already cleared. Will ignore.");
+    }
     client = null;
   }
 
@@ -452,8 +452,10 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
   @Override
   public void close() {
     super.close();
+
     if (memoryBlock != null) {
       memoryBlock.close();
+      memoryBlock = null;
     }
   }
 

Reply via email to