This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch logger-pipe
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/logger-pipe by this push:
     new 21b2e77ad40 fix
21b2e77ad40 is described below

commit 21b2e77ad401839b7abab783aa99cd5c77747bc3
Author: Caideyipi <[email protected]>
AuthorDate: Wed Feb 4 19:04:56 2026 +0800

    fix
---
 .../pipe/agent/task/subtask/PipeAbstractSinkSubtask.java      | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index af75e6424f7..5f7156f6324 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -19,11 +19,13 @@
 
 package org.apache.iotdb.commons.pipe.agent.task.subtask;
 
+import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkCriticalException;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
 import org.apache.iotdb.commons.pipe.agent.task.execution.PipeSubtaskScheduler;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -33,6 +35,7 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.tsfile.external.commons.lang3.exception.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -268,7 +271,13 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
 
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
   protected void handleException(final Event event, final Exception e) {
-    if (e instanceof PipeRuntimeSinkNonReportTimeConfigurableException) {
+    if (e instanceof PipeRuntimeOutOfMemoryCriticalException
+        || ExceptionUtils.getRootCause(e) instanceof 
PipeRuntimeOutOfMemoryCriticalException) {
+      PipeLogger.log(
+          LOGGER::info,
+          e,
+          "Temporarily out of memory in pipe event transferring, will wait for 
the memory to release.");
+    } else if (e instanceof PipeRuntimeSinkNonReportTimeConfigurableException) 
{
       if (lastExceptionTime == Long.MAX_VALUE) {
         lastExceptionTime = System.currentTimeMillis();
       }

Reply via email to