This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch pipe-stack
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/pipe-stack by this push:
     new 3a2356146d2 opti-pip-log
3a2356146d2 is described below

commit 3a2356146d2538339bd2fa5ddf7771fd2a3d1232
Author: Caideyipi <[email protected]>
AuthorDate: Thu Apr 23 10:22:30 2026 +0800

    opti-pip-log
---
 .../subtask/processor/PipeProcessorSubtask.java     |  8 ++++----
 .../common/tsfile/PipeTsFileInsertionEvent.java     | 21 +++++++++++++--------
 2 files changed, 17 insertions(+), 12 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index 688ad1565e8..1163f8e4e8e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -237,15 +237,15 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
     } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
       PipeLogger.log(
           LOGGER::info,
-          e,
-          "Temporarily out of memory in pipe event processing, will wait for 
the memory to release.");
+          "Temporarily out of memory in pipe event processing, will wait for 
the memory to release. Message: %s",
+          e.getMessage());
       return false;
     } catch (final Exception e) {
       if (ExceptionUtils.getRootCause(e) instanceof 
PipeRuntimeOutOfMemoryCriticalException) {
         PipeLogger.log(
             LOGGER::info,
-            e,
-            "Temporarily out of memory in pipe event processing, will wait for 
the memory to release.");
+            "Temporarily out of memory in pipe event processing, will wait for 
the memory to release. Message: %s",
+            e.getMessage());
         return false;
       }
       if (!isClosed.get()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index adddc9d7ce5..8bd84ebb7d6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -33,6 +33,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import 
org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent;
@@ -672,8 +673,7 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
                 callerName,
                 getTsFile(),
                 tabletEventCount,
-                retryCount,
-                e);
+                retryCount);
           } else if (LOGGER.isDebugEnabled()) {
             LOGGER.debug(
                 "{}: failed to allocate memory for parsing TsFile {}, tablet 
event no. {}, retry count is {}, will keep retrying.",
@@ -719,7 +719,11 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
                   "Interrupted when waiting for closing TsFile %s.", 
resource.getTsFilePath())
               : String.format(
                   "Parse TsFile %s error. Because: %s", 
resource.getTsFilePath(), e.getMessage());
-      LOGGER.warn(errorMsg, e);
+      if (e instanceof PipeRuntimeOutOfMemoryCriticalException) {
+        PipeLogger.log(LOGGER::warn, errorMsg);
+      } else {
+        PipeLogger.log(LOGGER::warn, e, errorMsg);
+      }
       throw new PipeException(errorMsg, e);
     }
   }
@@ -743,28 +747,29 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
       final double waitTimeSeconds = (currentTime - startTime) / 1000.0;
       if (elapsedRecordTimeSeconds > 10.0) {
         LOGGER.info(
-            "Wait for resource enough for parsing {} for {} seconds.",
+            "Wait for memory enough for parsing {} for {} seconds.",
             resource != null ? resource.getTsFilePath() : "tsfile",
             waitTimeSeconds);
         lastRecordTime = currentTime;
       } else if (LOGGER.isDebugEnabled()) {
         LOGGER.debug(
-            "Wait for resource enough for parsing {} for {} seconds.",
+            "Wait for memory enough for parsing {} for {} seconds.",
             resource != null ? resource.getTsFilePath() : "tsfile",
             waitTimeSeconds);
       }
 
       if (waitTimeSeconds * 1000 > timeoutMs) {
         // should contain 'TimeoutException' in exception message
-        throw new PipeException(
-            String.format("TimeoutException: Waited %s seconds", 
waitTimeSeconds));
+        throw new PipeRuntimeOutOfMemoryCriticalException(
+            String.format(
+                "TimeoutException: Waited %s seconds for memory to parse 
TsFile", waitTimeSeconds));
       }
     }
 
     final long currentTime = System.currentTimeMillis();
     final double waitTimeSeconds = (currentTime - startTime) / 1000.0;
     LOGGER.info(
-        "Wait for resource enough for parsing {} for {} seconds.",
+        "Wait for memory enough for parsing {} for {} seconds.",
         resource != null ? resource.getTsFilePath() : "tsfile",
         waitTimeSeconds);
   }

Reply via email to