This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch connection-retry
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/connection-retry by this push:
     new 43151d3a1f7 try-complete
43151d3a1f7 is described below

commit 43151d3a1f7f6a2048f9189dbf4a63d6ebdb2ea1
Author: Caideyipi <[email protected]>
AuthorDate: Thu Feb 5 17:18:44 2026 +0800

    try-complete
---
 .../pipe/agent/task/subtask/PipeAbstractSinkSubtask.java     |  6 ++++++
 .../pipe/agent/task/subtask/PipeReportableSubtask.java       | 12 +++++++-----
 2 files changed, 13 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index 9da901ba6d4..70825d3aaec 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -132,6 +132,12 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
           // return if the pipe task should be stopped
           return;
         }
+        if 
(PipeConfig.getInstance().isPipeSinkRetryLocallyForConnectionError()) {
+          super.onFailure(
+              new PipeRuntimeSinkNonReportTimeConfigurableException(
+                  throwable.getMessage(), Long.MAX_VALUE));
+          return;
+        }
       }
 
       // Handle exceptions if any available clients exist
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
index 934e0ed1bcc..3c1a63064af 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.pipe.agent.task.subtask;
 import 
org.apache.iotdb.commons.exception.pipe.PipeConsensusRetryWithIncreasingIntervalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
+import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkRetryTimesConfigurableException;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
@@ -50,10 +51,11 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
       return;
     }
 
-    if (lastEvent instanceof EnrichedEvent) {
-      onEnrichedEventFailure(throwable);
+    if (lastEvent instanceof EnrichedEvent
+        && !(throwable instanceof 
PipeRuntimeSinkNonReportTimeConfigurableException)) {
+      onReportEventFailure(throwable);
     } else {
-      onNonEnrichedEventFailure(throwable);
+      onNonReportEventFailure(throwable);
     }
 
     // Although the pipe task will be stopped, we still don't release the last 
event here
@@ -76,7 +78,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
     return sleepInterval;
   }
 
-  private void onEnrichedEventFailure(final Throwable throwable) {
+  private void onReportEventFailure(final Throwable throwable) {
     final int maxRetryTimes =
         throwable instanceof PipeRuntimeSinkRetryTimesConfigurableException
             ? ((PipeRuntimeSinkRetryTimesConfigurableException) 
throwable).getRetryTimes()
@@ -151,7 +153,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
 
   protected abstract void report(final EnrichedEvent event, final 
PipeRuntimeException exception);
 
-  private void onNonEnrichedEventFailure(final Throwable throwable) {
+  private void onNonReportEventFailure(final Throwable throwable) {
     if (retryCount.get() == 0) {
       LOGGER.warn(
           "Failed to execute subtask {} (creation time: {}, simple class: {}), 
"

Reply via email to