This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch md
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/md by this push:
new 28d0fe005ba partial
28d0fe005ba is described below
commit 28d0fe005ba02688b48b0d72ff3e09386ffdf630
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jan 19 19:24:02 2026 +0800
partial
---
.../agent/task/subtask/sink/PipeSinkSubtask.java | 1 +
...imeSinkNonReportTimeConfigurableException.java} | 20 +++++++++++--------
...RuntimeSinkRetryTimesConfigurableException.java | 7 ++++++-
.../pipe/receiver/PipeReceiverStatusHandler.java | 23 +++++++---------------
4 files changed, 26 insertions(+), 25 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index dba2269b281..a46e174ef12 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -66,6 +66,7 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask {
// when no event can be pulled.
public static final PipeHeartbeatEvent CRON_HEARTBEAT_EVENT =
new PipeHeartbeatEvent("cron", false);
+ private long lastExceptionTime = Long.MIN_VALUE;
public PipeSinkSubtask(
final String taskID,
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkNonReportTimeConfigurableException.java
similarity index 70%
copy from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
copy to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkNonReportTimeConfigurableException.java
index 62b54334a93..d5fba19b650 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkNonReportTimeConfigurableException.java
@@ -19,24 +19,28 @@
package org.apache.iotdb.commons.exception.pipe;
-public class PipeRuntimeSinkRetryTimesConfigurableException
+public class PipeRuntimeSinkNonReportTimeConfigurableException
extends PipeRuntimeSinkCriticalException {
- private final int retryTimes;
+ private final long interval;
- public PipeRuntimeSinkRetryTimesConfigurableException(
- final String message, final int retryTimes) {
+ public PipeRuntimeSinkNonReportTimeConfigurableException(
+ final String message, final long interval) {
super(message);
- this.retryTimes = retryTimes;
+ this.interval = interval;
}
- public int getRetryTimes() {
- return retryTimes;
+ public long getInterval() {
+ return interval;
}
// We do not record the timestamp here for logger reduction detection
@Override
public String toString() {
- return "PipeRuntimeSinkRetryTimesConfigurableException{" + "message='" +
getMessage() + "}";
+ return "PipeRuntimeSinkNonReportTimeConfigurableException{"
+ + "message='"
+ + "', interval='"
+ + interval
+ + "'}";
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
index 62b54334a93..495098683e3 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java
@@ -37,6 +37,11 @@ public class PipeRuntimeSinkRetryTimesConfigurableException
// We do not record the timestamp here for logger reduction detection
@Override
public String toString() {
- return "PipeRuntimeSinkRetryTimesConfigurableException{" + "message='" +
getMessage() + "}";
+ return "PipeRuntimeSinkRetryTimesConfigurableException{"
+ + "message='"
+ + getMessage()
+ + "', retryTimes='"
+ + retryTimes
+ + "'}";
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index fb997d0b14a..e20ee03c275 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -22,8 +22,7 @@ package org.apache.iotdb.commons.pipe.receiver;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import
org.apache.iotdb.commons.exception.pipe.PipeConsensusRetryWithIncreasingIntervalException;
import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
-import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkRetryTimesConfigurableException;
-import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeSubtask;
+import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.commons.utils.RetryUtils;
@@ -142,7 +141,8 @@ public class PipeReceiverStatusHandler {
LOGGER::info,
"Temporary unavailable exception: will retry forever. status:
%s",
status);
- throw new PipeNonReportException(exceptionMessage);
+ throw new PipeRuntimeSinkNonReportTimeConfigurableException(
+ exceptionMessage, Long.MAX_VALUE);
}
case 1810: // PIPE_RECEIVER_USER_CONFLICT_EXCEPTION
@@ -184,13 +184,8 @@ public class PipeReceiverStatusHandler {
throw status.getCode() == 1815
&&
PipeConfig.getInstance().isPipeRetryLocallyForParallelOrUserConflict()
? new PipeNonReportException(exceptionMessage)
- : new PipeRuntimeSinkRetryTimesConfigurableException(
- exceptionMessage,
- (int)
- Math.max(
- PipeSubtask.MAX_RETRY_TIMES,
- Math.min(
- CONFLICT_RETRY_MAX_TIMES,
retryMaxMillisWhenConflictOccurs * 1.1)));
+ : new PipeRuntimeSinkNonReportTimeConfigurableException(
+ exceptionMessage, retryMaxMillisWhenConflictOccurs);
}
case 803: // NO_PERMISSION
@@ -266,12 +261,8 @@ public class PipeReceiverStatusHandler {
}
exceptionEventHasBeenRetried.set(true);
- throw new PipeRuntimeSinkRetryTimesConfigurableException(
- exceptionMessage,
- (int)
- Math.max(
- PipeSubtask.MAX_RETRY_TIMES,
- Math.min(CONFLICT_RETRY_MAX_TIMES,
retryMaxMillisWhenOtherExceptionsOccur * 1.1)));
+ throw new PipeRuntimeSinkNonReportTimeConfigurableException(
+ exceptionMessage, retryMaxMillisWhenOtherExceptionsOccur);
}
private static String getNoPermission(final boolean noPermission) {