This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new cce71eb3800 [To dev/1.3] Fixed the potential problem in
DataNodeDevicePathCache & Pipe: Reduced the logs in "PipeReceiverStatusHandler"
(#16397) (#16400)
cce71eb3800 is described below
commit cce71eb38008e45c018b2e3b98af376feb8dee74
Author: Caideyipi <[email protected]>
AuthorDate: Fri Sep 12 10:06:07 2025 +0800
[To dev/1.3] Fixed the potential problem in DataNodeDevicePathCache & Pipe:
Reduced the logs in "PipeReceiverStatusHandler" (#16397) (#16400)
* Update pom.xml
* partial
* Update PipeReceiverStatusHandler.java
---
.../pipe/receiver/PipeReceiverStatusHandler.java | 55 +++++++++++++---------
1 file changed, 32 insertions(+), 23 deletions(-)
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index 42dddeb40d6..799db749832 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkCriticalException;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkRetryTimesConfigurableException;
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeSubtask;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -108,7 +109,10 @@ public class PipeReceiverStatusHandler {
case 1808: // PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION
{
- LOGGER.info("Temporary unavailable exception: will retry forever.
status: {}", status);
+ PipeLogger.log(
+ LOGGER::info,
+ "Temporary unavailable exception: will retry forever. status:
%s",
+ status);
throw new PipeRuntimeSinkCriticalException(exceptionMessage);
}
@@ -174,17 +178,19 @@ public class PipeReceiverStatusHandler {
return;
}
- LOGGER.warn(
- "No permission: will retry {}. status: {}",
- retryMaxMillisWhenOtherExceptionsOccur == Long.MAX_VALUE
- ? "forever"
- : "for at least "
- + (retryMaxMillisWhenOtherExceptionsOccur
- + exceptionFirstEncounteredTime.get()
- - System.currentTimeMillis())
- / 1000.0
- + " seconds",
- status);
+ // Reduce the log if retry forever
+ if (retryMaxMillisWhenOtherExceptionsOccur == Long.MAX_VALUE) {
+ PipeLogger.log(LOGGER::warn, "No permission: will retry forever.
status: {}", status);
+ } else {
+ LOGGER.warn(
+ "No permission: will retry for at least {} seconds. status:
{}",
+ (retryMaxMillisWhenOtherExceptionsOccur
+ + exceptionFirstEncounteredTime.get()
+ - System.currentTimeMillis())
+ / 1000.0,
+ status);
+ }
+
exceptionEventHasBeenRetried.set(true);
throw new PipeRuntimeSinkRetryTimesConfigurableException(
exceptionMessage,
@@ -210,17 +216,20 @@ public class PipeReceiverStatusHandler {
return;
}
- LOGGER.warn(
- "Unclassified exception: will retry {}. status: {}",
- retryMaxMillisWhenOtherExceptionsOccur == Long.MAX_VALUE
- ? "forever"
- : "for at least "
- + (retryMaxMillisWhenOtherExceptionsOccur
- + exceptionFirstEncounteredTime.get()
- - System.currentTimeMillis())
- / 1000.0
- + " seconds",
- status);
+ // Reduce the log if retry forever
+ if (retryMaxMillisWhenOtherExceptionsOccur == Long.MAX_VALUE) {
+ PipeLogger.log(
+ LOGGER::warn, "Unclassified exception: will retry forever.
status: %s", status);
+ } else {
+ LOGGER.warn(
+ "Unclassified exception: will retry for at least {} seconds.
status: {}",
+ (retryMaxMillisWhenOtherExceptionsOccur
+ + exceptionFirstEncounteredTime.get()
+ - System.currentTimeMillis())
+ / 1000.0,
+ status);
+ }
+
exceptionEventHasBeenRetried.set(true);
throw new PipeRuntimeSinkRetryTimesConfigurableException(
exceptionMessage,