This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new bf458d5617e Pipe: Optimized logger for temporarily out of memory
exception & Do not stop pipe for "Waited for memory to parse TsFile" (#17542)
bf458d5617e is described below
commit bf458d5617e6ac0893927c2f9956525be2a0fa77
Author: Caideyipi <[email protected]>
AuthorDate: Fri Apr 24 11:15:04 2026 +0800
Pipe: Optimized logger for temporarily out of memory exception & Do not
stop pipe for "Waited for memory to parse TsFile" (#17542)
---
.../subtask/processor/PipeProcessorSubtask.java | 8 ++++----
.../common/tsfile/PipeTsFileInsertionEvent.java | 21 +++++++++++++--------
2 files changed, 17 insertions(+), 12 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index 688ad1565e8..1163f8e4e8e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -237,15 +237,15 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
} catch (final PipeRuntimeOutOfMemoryCriticalException e) {
PipeLogger.log(
LOGGER::info,
- e,
- "Temporarily out of memory in pipe event processing, will wait for
the memory to release.");
+ "Temporarily out of memory in pipe event processing, will wait for
the memory to release. Message: %s",
+ e.getMessage());
return false;
} catch (final Exception e) {
if (ExceptionUtils.getRootCause(e) instanceof
PipeRuntimeOutOfMemoryCriticalException) {
PipeLogger.log(
LOGGER::info,
- e,
- "Temporarily out of memory in pipe event processing, will wait for
the memory to release.");
+ "Temporarily out of memory in pipe event processing, will wait for
the memory to release. Message: %s",
+ e.getMessage());
return false;
}
if (!isClosed.get()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index adddc9d7ce5..8bd84ebb7d6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -33,6 +33,7 @@ import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import
org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent;
@@ -672,8 +673,7 @@ public class PipeTsFileInsertionEvent extends
PipeInsertionEvent
callerName,
getTsFile(),
tabletEventCount,
- retryCount,
- e);
+ retryCount);
} else if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"{}: failed to allocate memory for parsing TsFile {}, tablet
event no. {}, retry count is {}, will keep retrying.",
@@ -719,7 +719,11 @@ public class PipeTsFileInsertionEvent extends
PipeInsertionEvent
"Interrupted when waiting for closing TsFile %s.",
resource.getTsFilePath())
: String.format(
"Parse TsFile %s error. Because: %s",
resource.getTsFilePath(), e.getMessage());
- LOGGER.warn(errorMsg, e);
+ if (e instanceof PipeRuntimeOutOfMemoryCriticalException) {
+ PipeLogger.log(LOGGER::warn, errorMsg);
+ } else {
+ PipeLogger.log(LOGGER::warn, e, errorMsg);
+ }
throw new PipeException(errorMsg, e);
}
}
@@ -743,28 +747,29 @@ public class PipeTsFileInsertionEvent extends
PipeInsertionEvent
final double waitTimeSeconds = (currentTime - startTime) / 1000.0;
if (elapsedRecordTimeSeconds > 10.0) {
LOGGER.info(
- "Wait for resource enough for parsing {} for {} seconds.",
+ "Wait for memory enough for parsing {} for {} seconds.",
resource != null ? resource.getTsFilePath() : "tsfile",
waitTimeSeconds);
lastRecordTime = currentTime;
} else if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
- "Wait for resource enough for parsing {} for {} seconds.",
+ "Wait for memory enough for parsing {} for {} seconds.",
resource != null ? resource.getTsFilePath() : "tsfile",
waitTimeSeconds);
}
if (waitTimeSeconds * 1000 > timeoutMs) {
// should contain 'TimeoutException' in exception message
- throw new PipeException(
- String.format("TimeoutException: Waited %s seconds",
waitTimeSeconds));
+ throw new PipeRuntimeOutOfMemoryCriticalException(
+ String.format(
+ "TimeoutException: Waited %s seconds for memory to parse
TsFile", waitTimeSeconds));
}
}
final long currentTime = System.currentTimeMillis();
final double waitTimeSeconds = (currentTime - startTime) / 1000.0;
LOGGER.info(
- "Wait for resource enough for parsing {} for {} seconds.",
+ "Wait for memory enough for parsing {} for {} seconds.",
resource != null ? resource.getTsFilePath() : "tsfile",
waitTimeSeconds);
}