This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 8657782e4c8 Pipe: Optimized logger for temporarily out of memory 
exception & Do not stop pipe for "Waited for memory to parse TsFile" (#17542) 
(#17728)
8657782e4c8 is described below

commit 8657782e4c8bbfea602156005ed80ece0f29cc62
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 20 18:06:17 2026 +0800

    Pipe: Optimized logger for temporarily out of memory exception & Do not 
stop pipe for "Waited for memory to parse TsFile" (#17542) (#17728)
---
 .../subtask/processor/PipeProcessorSubtask.java     |  8 ++++----
 .../common/tsfile/PipeTsFileInsertionEvent.java     | 21 +++++++++++++--------
 2 files changed, 17 insertions(+), 12 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index ca5a8d0f4db..193693c5a95 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -191,15 +191,15 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
     } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
       PipeLogger.log(
           LOGGER::info,
-          e,
-          "Temporarily out of memory in pipe event processing, will wait for 
the memory to release.");
+          "Temporarily out of memory in pipe event processing, will wait for 
the memory to release. Message: %s",
+          e.getMessage());
       return false;
     } catch (final Exception e) {
       if (ExceptionUtils.getRootCause(e) instanceof 
PipeRuntimeOutOfMemoryCriticalException) {
         PipeLogger.log(
             LOGGER::info,
-            e,
-            "Temporarily out of memory in pipe event processing, will wait for 
the memory to release.");
+            "Temporarily out of memory in pipe event processing, will wait for 
the memory to release. Message: %s",
+            e.getMessage());
         return false;
       }
       if (!isClosed.get()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 53f0b16826a..8ffbc9f2f9b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -26,6 +26,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import 
org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource;
 import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
@@ -495,8 +496,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
                 callerName,
                 getTsFile(),
                 tabletEventCount,
-                retryCount,
-                e);
+                retryCount);
           } else if (LOGGER.isDebugEnabled()) {
             LOGGER.debug(
                 "{}: failed to allocate memory for parsing TsFile {}, tablet 
event no. {}, retry count is {}, will keep retrying.",
@@ -542,7 +542,11 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
                   "Interrupted when waiting for closing TsFile %s.", 
resource.getTsFilePath())
               : String.format(
                   "Parse TsFile %s error. Because: %s", 
resource.getTsFilePath(), e.getMessage());
-      LOGGER.warn(errorMsg, e);
+      if (e instanceof PipeRuntimeOutOfMemoryCriticalException) {
+        PipeLogger.log(LOGGER::warn, errorMsg);
+      } else {
+        PipeLogger.log(LOGGER::warn, e, errorMsg);
+      }
       throw new PipeException(errorMsg);
     }
   }
@@ -566,28 +570,29 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent
       final double waitTimeSeconds = (currentTime - startTime) / 1000.0;
       if (elapsedRecordTimeSeconds > 10.0) {
         LOGGER.info(
-            "Wait for resource enough for parsing {} for {} seconds.",
+            "Wait for memory enough for parsing {} for {} seconds.",
             resource != null ? resource.getTsFilePath() : "tsfile",
             waitTimeSeconds);
         lastRecordTime = currentTime;
       } else if (LOGGER.isDebugEnabled()) {
         LOGGER.debug(
-            "Wait for resource enough for parsing {} for {} seconds.",
+            "Wait for memory enough for parsing {} for {} seconds.",
             resource != null ? resource.getTsFilePath() : "tsfile",
             waitTimeSeconds);
       }
 
       if (waitTimeSeconds * 1000 > timeoutMs) {
         // should contain 'TimeoutException' in exception message
-        throw new PipeException(
-            String.format("TimeoutException: Waited %s seconds", 
waitTimeSeconds));
+        throw new PipeRuntimeOutOfMemoryCriticalException(
+            String.format(
+                "TimeoutException: Waited %s seconds for memory to parse 
TsFile", waitTimeSeconds));
       }
     }
 
     final long currentTime = System.currentTimeMillis();
     final double waitTimeSeconds = (currentTime - startTime) / 1000.0;
     LOGGER.info(
-        "Wait for resource enough for parsing {} for {} seconds.",
+        "Wait for memory enough for parsing {} for {} seconds.",
         resource != null ? resource.getTsFilePath() : "tsfile",
         waitTimeSeconds);
   }

Reply via email to