This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch connection-retry
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/connection-retry by this push:
     new 742db047741 fix
742db047741 is described below

commit 742db0477418d3bea5a0db5451290c171752a268
Author: Caideyipi <[email protected]>
AuthorDate: Mon Feb 9 10:03:26 2026 +0800

    fix
---
 .../thrift/async/IoTDBDataRegionAsyncSink.java     | 35 ++++++++++++++--------
 .../PipeTransferTabletBatchEventHandler.java       |  2 +-
 .../PipeTransferTabletInsertionEventHandler.java   |  2 +-
 .../async/handler/PipeTransferTsFileHandler.java   |  2 +-
 4 files changed, 25 insertions(+), 16 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index 7b7ce628f82..2e1d630eeb1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.thrift.async;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.audit.UserEntity;
+import org.apache.iotdb.commons.client.ThriftClient;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
@@ -59,6 +60,7 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
@@ -262,10 +264,10 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
                   false,
                   sealedFile.left));
         }
-      } catch (final Throwable t) {
-        LOGGER.warn("Failed to transfer tsfile batch ({}).", dbTsFilePairs, t);
+      } catch (final Exception e) {
+        LOGGER.warn("Failed to transfer tsfile batch ({}).", dbTsFilePairs, e);
         if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
-          addFailureEventsToRetryQueue(events);
+          addFailureEventsToRetryQueue(events, e);
         }
       }
     } else {
@@ -611,14 +613,17 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
         }
 
         if (remainingEvents <= retryEventQueue.size() + 
retryTsFileQueue.size()) {
-          throw new PipeException(
+          final String message =
               "Failed to retry transferring events in the retry queue. 
Remaining events: "
                   + (retryEventQueue.size() + retryTsFileQueue.size())
                   + " (tablet events: "
                   + retryEventQueueEventCounter.getTabletInsertionEventCount()
                   + ", tsfile events: "
                   + retryEventQueueEventCounter.getTsFileInsertionEventCount()
-                  + ").");
+                  + ").";
+          throw isConnectionException
+              ? new PipeConnectionException(message)
+              : new PipeException(message);
         }
       }
     }
@@ -634,7 +639,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
               
.decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(), false);
         }
       } catch (final Exception e) {
-        addFailureEventToRetryQueue(tabletInsertionEvent);
+        addFailureEventToRetryQueue(tabletInsertionEvent, e);
       }
       return;
     }
@@ -647,14 +652,14 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
               
.decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(), false);
         }
       } else {
-        addFailureEventToRetryQueue(tabletInsertionEvent);
+        addFailureEventToRetryQueue(tabletInsertionEvent, null);
       }
     } catch (final Exception e) {
       if (tabletInsertionEvent instanceof EnrichedEvent) {
         ((EnrichedEvent) tabletInsertionEvent)
             .decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(), 
false);
       }
-      addFailureEventToRetryQueue(tabletInsertionEvent);
+      addFailureEventToRetryQueue(tabletInsertionEvent, e);
     }
   }
 
@@ -664,11 +669,11 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
         tsFileInsertionEvent.decreaseReferenceCount(
             IoTDBDataRegionAsyncSink.class.getName(), false);
       } else {
-        addFailureEventToRetryQueue(tsFileInsertionEvent);
+        addFailureEventToRetryQueue(tsFileInsertionEvent, null);
       }
     } catch (final Exception e) {
       
tsFileInsertionEvent.decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(),
 false);
-      addFailureEventToRetryQueue(tsFileInsertionEvent);
+      addFailureEventToRetryQueue(tsFileInsertionEvent, e);
     }
   }
 
@@ -678,7 +683,10 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
    * @param event {@link Event} to retry
    */
   @SuppressWarnings("java:S899")
-  public void addFailureEventToRetryQueue(final Event event) {
+  public void addFailureEventToRetryQueue(final Event event, final Exception 
e) {
+    if (e instanceof PipeConnectionException || 
ThriftClient.isConnectionBroken(e)) {
+      isConnectionException = true;
+    }
     if (event instanceof EnrichedEvent && ((EnrichedEvent) 
event).isReleased()) {
       return;
     }
@@ -714,8 +722,9 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
    *
    * @param events {@link EnrichedEvent}s to retry
    */
-  public void addFailureEventsToRetryQueue(final Iterable<EnrichedEvent> 
events) {
-    events.forEach(this::addFailureEventToRetryQueue);
+  public void addFailureEventsToRetryQueue(
+      final Iterable<EnrichedEvent> events, final Exception e) {
+    events.forEach(event -> addFailureEventToRetryQueue(event, e));
   }
 
   public boolean isEnableSendTsFileLimit() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index 89baaa02794..d95513a2228 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -123,7 +123,7 @@ public class PipeTransferTabletBatchEventHandler extends 
PipeTransferTrackableHa
           events.size(),
           
events.stream().map(EnrichedEvent::getPipeName).collect(Collectors.toSet()));
     } finally {
-      connector.addFailureEventsToRetryQueue(events);
+      connector.addFailureEventsToRetryQueue(events, exception);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
index df26325a5ca..ca92af92b19 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
@@ -98,7 +98,7 @@ public abstract class PipeTransferTabletInsertionEventHandler 
extends PipeTransf
           event.getCommitterKey(),
           event.getCommitId());
     } finally {
-      connector.addFailureEventToRetryQueue(event);
+      connector.addFailureEventToRetryQueue(event, exception);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 67fa5c9bd8c..6726c9a6701 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -415,7 +415,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
         returnClientIfNecessary();
       } finally {
         if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
-          connector.addFailureEventsToRetryQueue(events);
+          connector.addFailureEventsToRetryQueue(events, exception);
         }
       }
     }

Reply via email to