This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 8080d1ff1a9 Pipe: Fixed Async Connector Repeatedly Putting Events into 
RetryQueue (#15179) (#15184)
8080d1ff1a9 is described below

commit 8080d1ff1a9cf1d5ba3ee0d0f0ae87a12aad6b15
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Mar 25 11:57:51 2025 +0800

    Pipe: Fixed Async Connector Repeatedly Putting Events into RetryQueue 
(#15179) (#15184)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
    
    (cherry picked from commit bc078fccc89c7518db2d44d421373f5c47d51c64)
---
 .../async/IoTDBDataRegionAsyncConnector.java       | 29 ++++++++++++++--------
 1 file changed, 18 insertions(+), 11 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index bf0b8df2d6c..b92c066e8be 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -199,17 +199,24 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
       final AtomicInteger eventsReferenceCount = new 
AtomicInteger(sealedFiles.size());
       final AtomicBoolean eventsHadBeenAddedToRetryQueue = new 
AtomicBoolean(false);
 
-      for (final File sealedFile : sealedFiles) {
-        transfer(
-            new PipeTransferTsFileHandler(
-                this,
-                pipe2WeightMap,
-                events,
-                eventsReferenceCount,
-                eventsHadBeenAddedToRetryQueue,
-                sealedFile,
-                null,
-                false));
+      try {
+        for (final File sealedFile : sealedFiles) {
+          transfer(
+              new PipeTransferTsFileHandler(
+                  this,
+                  pipe2WeightMap,
+                  events,
+                  eventsReferenceCount,
+                  eventsHadBeenAddedToRetryQueue,
+                  sealedFile,
+                  null,
+                  false));
+        }
+      } catch (final Throwable t) {
+        LOGGER.warn("Failed to transfer tsfile batch ({}).", sealedFiles, t);
+        if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
+          addFailureEventsToRetryQueue(events);
+        }
       }
     } else {
       LOGGER.warn(

Reply via email to