This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch connection-retry
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/connection-retry by this push:
     new 97b8cd78554 fix
97b8cd78554 is described below

commit 97b8cd785543436e34b8b4e227aecc0cbf96cff2
Author: Caideyipi <[email protected]>
AuthorDate: Thu Feb 5 17:05:24 2026 +0800

    fix
---
 .../pipe/agent/task/subtask/PipeAbstractSinkSubtask.java      |  8 +++++---
 .../pipe/agent/task/subtask/PipeReportableSubtask.java        | 11 +++++++----
 2 files changed, 12 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index a5de833a14f..9da901ba6d4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfi
 import org.apache.iotdb.commons.pipe.agent.task.execution.PipeSubtaskScheduler;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -141,9 +142,10 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
         super.onFailure(throwable);
       } else {
         // Print stack trace for better debugging
-        LOGGER.warn(
-            "A non PipeRuntimeConnectorCriticalException occurred, will throw 
a PipeRuntimeConnectorCriticalException.",
-            throwable);
+        PipeLogger.log(
+            LOGGER::warn,
+            throwable,
+            "A non PipeRuntimeSinkCriticalException occurred, will throw a 
PipeRuntimeSinkCriticalException.");
         super.onFailure(new 
PipeRuntimeSinkCriticalException(throwable.getMessage()));
       }
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
index f290f8c4965..934e0ed1bcc 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkRetryTimesConfigurableException;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -94,15 +95,16 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
 
     retryCount.incrementAndGet();
     if (retryCount.get() <= maxRetryTimes) {
-      LOGGER.warn(
+      PipeLogger.log(
+          LOGGER::warn,
+          throwable,
           "Retry executing subtask {} (creation time: {}, simple class: {}), 
retry count [{}/{}], last exception: {}",
           taskID,
           creationTime,
           this.getClass().getSimpleName(),
           retryCount.get(),
           maxRetryTimes,
-          throwable.getMessage(),
-          throwable);
+          throwable.getMessage());
       try {
         sleepIfNoHighPriorityTask(getSleepIntervalBasedOnThrowable(throwable));
       } catch (final InterruptedException e) {
@@ -162,7 +164,8 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
     }
 
     retryCount.incrementAndGet();
-    LOGGER.warn(
+    PipeLogger.log(
+        LOGGER::warn,
         "Retry executing subtask {} (creation time: {}, simple class: {}), 
retry count {}, last exception: {}",
         taskID,
         creationTime,

Reply via email to