This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch connection-retry
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/connection-retry by this push:
new 43151d3a1f7 try-complete
43151d3a1f7 is described below
commit 43151d3a1f7f6a2048f9189dbf4a63d6ebdb2ea1
Author: Caideyipi <[email protected]>
AuthorDate: Thu Feb 5 17:18:44 2026 +0800
try-complete
---
.../pipe/agent/task/subtask/PipeAbstractSinkSubtask.java | 6 ++++++
.../pipe/agent/task/subtask/PipeReportableSubtask.java | 12 +++++++-----
2 files changed, 13 insertions(+), 5 deletions(-)
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index 9da901ba6d4..70825d3aaec 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -132,6 +132,12 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
// return if the pipe task should be stopped
return;
}
+ if
(PipeConfig.getInstance().isPipeSinkRetryLocallyForConnectionError()) {
+ super.onFailure(
+ new PipeRuntimeSinkNonReportTimeConfigurableException(
+ throwable.getMessage(), Long.MAX_VALUE));
+ return;
+ }
}
// Handle exceptions if any available clients exist
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
index 934e0ed1bcc..3c1a63064af 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.pipe.agent.task.subtask;
import
org.apache.iotdb.commons.exception.pipe.PipeConsensusRetryWithIncreasingIntervalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
+import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkRetryTimesConfigurableException;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
@@ -50,10 +51,11 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
return;
}
- if (lastEvent instanceof EnrichedEvent) {
- onEnrichedEventFailure(throwable);
+ if (lastEvent instanceof EnrichedEvent
+ && !(throwable instanceof
PipeRuntimeSinkNonReportTimeConfigurableException)) {
+ onReportEventFailure(throwable);
} else {
- onNonEnrichedEventFailure(throwable);
+ onNonReportEventFailure(throwable);
}
// Although the pipe task will be stopped, we still don't release the last
event here
@@ -76,7 +78,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
return sleepInterval;
}
- private void onEnrichedEventFailure(final Throwable throwable) {
+ private void onReportEventFailure(final Throwable throwable) {
final int maxRetryTimes =
throwable instanceof PipeRuntimeSinkRetryTimesConfigurableException
? ((PipeRuntimeSinkRetryTimesConfigurableException)
throwable).getRetryTimes()
@@ -151,7 +153,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
protected abstract void report(final EnrichedEvent event, final
PipeRuntimeException exception);
- private void onNonEnrichedEventFailure(final Throwable throwable) {
+ private void onNonReportEventFailure(final Throwable throwable) {
if (retryCount.get() == 0) {
LOGGER.warn(
"Failed to execute subtask {} (creation time: {}, simple class: {}),
"