This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch md
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/md by this push:
     new 28d0fe005ba partial
28d0fe005ba is described below

commit 28d0fe005ba02688b48b0d72ff3e09386ffdf630
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jan 19 19:24:02 2026 +0800

    partial
---
 .../agent/task/subtask/sink/PipeSinkSubtask.java   |  1 +
 ...imeSinkNonReportTimeConfigurableException.java} | 20 +++++++++++--------
 ...RuntimeSinkRetryTimesConfigurableException.java |  7 ++++++-
 .../pipe/receiver/PipeReceiverStatusHandler.java   | 23 +++++++---------------
 4 files changed, 26 insertions(+), 25 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index dba2269b281..a46e174ef12 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -66,6 +66,7 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask {
   // when no event can be pulled.
   public static final PipeHeartbeatEvent CRON_HEARTBEAT_EVENT =
       new PipeHeartbeatEvent("cron", false);
+  private long lastExceptionTime = Long.MIN_VALUE;
 
   public PipeSinkSubtask(
       final String taskID,
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkNonReportTimeConfigurableException.java
similarity index 70%
copy from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
copy to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkNonReportTimeConfigurableException.java
index 62b54334a93..d5fba19b650 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkNonReportTimeConfigurableException.java
@@ -19,24 +19,28 @@
 
 package org.apache.iotdb.commons.exception.pipe;
 
-public class PipeRuntimeSinkRetryTimesConfigurableException
+public class PipeRuntimeSinkNonReportTimeConfigurableException
     extends PipeRuntimeSinkCriticalException {
 
-  private final int retryTimes;
+  private final long interval;
 
-  public PipeRuntimeSinkRetryTimesConfigurableException(
-      final String message, final int retryTimes) {
+  public PipeRuntimeSinkNonReportTimeConfigurableException(
+      final String message, final long interval) {
     super(message);
-    this.retryTimes = retryTimes;
+    this.interval = interval;
   }
 
-  public int getRetryTimes() {
-    return retryTimes;
+  public long getInterval() {
+    return interval;
   }
 
   // We do not record the timestamp here for logger reduction detection
   @Override
   public String toString() {
-    return "PipeRuntimeSinkRetryTimesConfigurableException{" + "message='" + 
getMessage() + "}";
+    return "PipeRuntimeSinkNonReportTimeConfigurableException{"
+        + "message='"
+        + "', interval='"
+        + interval
+        + "'}";
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
index 62b54334a93..495098683e3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
@@ -37,6 +37,11 @@ public class PipeRuntimeSinkRetryTimesConfigurableException
   // We do not record the timestamp here for logger reduction detection
   @Override
   public String toString() {
-    return "PipeRuntimeSinkRetryTimesConfigurableException{" + "message='" + 
getMessage() + "}";
+    return "PipeRuntimeSinkRetryTimesConfigurableException{"
+        + "message='"
+        + getMessage()
+        + "', retryTimes='"
+        + retryTimes
+        + "'}";
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index fb997d0b14a..e20ee03c275 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -22,8 +22,7 @@ package org.apache.iotdb.commons.pipe.receiver;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import 
org.apache.iotdb.commons.exception.pipe.PipeConsensusRetryWithIncreasingIntervalException;
 import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
-import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkRetryTimesConfigurableException;
-import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeSubtask;
+import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.commons.utils.RetryUtils;
@@ -142,7 +141,8 @@ public class PipeReceiverStatusHandler {
               LOGGER::info,
               "Temporary unavailable exception: will retry forever. status: 
%s",
               status);
-          throw new PipeNonReportException(exceptionMessage);
+          throw new PipeRuntimeSinkNonReportTimeConfigurableException(
+              exceptionMessage, Long.MAX_VALUE);
         }
 
       case 1810: // PIPE_RECEIVER_USER_CONFLICT_EXCEPTION
@@ -184,13 +184,8 @@ public class PipeReceiverStatusHandler {
           throw status.getCode() == 1815
                   && 
PipeConfig.getInstance().isPipeRetryLocallyForParallelOrUserConflict()
               ? new PipeNonReportException(exceptionMessage)
-              : new PipeRuntimeSinkRetryTimesConfigurableException(
-                  exceptionMessage,
-                  (int)
-                      Math.max(
-                          PipeSubtask.MAX_RETRY_TIMES,
-                          Math.min(
-                              CONFLICT_RETRY_MAX_TIMES, 
retryMaxMillisWhenConflictOccurs * 1.1)));
+              : new PipeRuntimeSinkNonReportTimeConfigurableException(
+                  exceptionMessage, retryMaxMillisWhenConflictOccurs);
         }
 
       case 803: // NO_PERMISSION
@@ -266,12 +261,8 @@ public class PipeReceiverStatusHandler {
     }
 
     exceptionEventHasBeenRetried.set(true);
-    throw new PipeRuntimeSinkRetryTimesConfigurableException(
-        exceptionMessage,
-        (int)
-            Math.max(
-                PipeSubtask.MAX_RETRY_TIMES,
-                Math.min(CONFLICT_RETRY_MAX_TIMES, 
retryMaxMillisWhenOtherExceptionsOccur * 1.1)));
+    throw new PipeRuntimeSinkNonReportTimeConfigurableException(
+        exceptionMessage, retryMaxMillisWhenOtherExceptionsOccur);
   }
 
   private static String getNoPermission(final boolean noPermission) {

Reply via email to