This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch logger-pipe
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/logger-pipe by this push:
new 21b2e77ad40 fix
21b2e77ad40 is described below
commit 21b2e77ad401839b7abab783aa99cd5c77747bc3
Author: Caideyipi <[email protected]>
AuthorDate: Wed Feb 4 19:04:56 2026 +0800
fix
---
.../pipe/agent/task/subtask/PipeAbstractSinkSubtask.java | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index af75e6424f7..5f7156f6324 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -19,11 +19,13 @@
package org.apache.iotdb.commons.pipe.agent.task.subtask;
+import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkCriticalException;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
import org.apache.iotdb.commons.pipe.agent.task.execution.PipeSubtaskScheduler;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;
@@ -33,6 +35,7 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.tsfile.external.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -268,7 +271,13 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
protected void handleException(final Event event, final Exception e) {
- if (e instanceof PipeRuntimeSinkNonReportTimeConfigurableException) {
+ if (e instanceof PipeRuntimeOutOfMemoryCriticalException
+ || ExceptionUtils.getRootCause(e) instanceof
PipeRuntimeOutOfMemoryCriticalException) {
+ PipeLogger.log(
+ LOGGER::info,
+ e,
+ "Temporarily out of memory in pipe event transferring, will wait for
the memory to release.");
+ } else if (e instanceof PipeRuntimeSinkNonReportTimeConfigurableException)
{
if (lastExceptionTime == Long.MAX_VALUE) {
lastExceptionTime = System.currentTimeMillis();
}