This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new bd6e6751e8c Pipe: Avoid the file reading when the memory is not enough
for tablet parsing (#13954) (#13988)
bd6e6751e8c is described below
commit bd6e6751e8c9fb223e6b7143942c39bb938ca001
Author: Caideyipi <[email protected]>
AuthorDate: Tue Nov 5 15:03:02 2024 +0800
Pipe: Avoid the file reading when the memory is not enough for tablet
parsing (#13954) (#13988)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../common/tsfile/PipeTsFileInsertionEvent.java | 30 ++++++++++++++++++++++
.../db/pipe/resource/memory/PipeMemoryManager.java | 5 ++++
.../apache/iotdb/commons/conf/CommonConfig.java | 10 ++++++++
.../iotdb/commons/conf/CommonDescriptor.java | 5 ++++
.../iotdb/commons/pipe/config/PipeConfig.java | 7 +++++
5 files changed, 57 insertions(+)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 9e7a8d421e3..699dceea669 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.event.common.tsfile;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
@@ -30,6 +31,7 @@ import
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDat
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTimePartitionProgressIndexKeeper;
import
org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
@@ -401,6 +403,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
"Pipe skipping temporary TsFile's parsing which shouldn't be
transferred: {}", tsFile);
return Collections.emptyList();
}
+ waitForResourceEnough4Parsing();
return initDataContainer().toTabletInsertionEvents();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
@@ -414,6 +417,33 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
}
}
+ private void waitForResourceEnough4Parsing() throws InterruptedException {
+ final PipeMemoryManager memoryManager =
PipeDataNodeResourceManager.memory();
+ if (memoryManager.isEnough4TabletParsing()) {
+ return;
+ }
+
+ final long memoryCheckIntervalMs =
+
PipeConfig.getInstance().getPipeTsFileParserCheckMemoryEnoughIntervalMs();
+ final long startTime = System.currentTimeMillis();
+ while (!memoryManager.isEnough4TabletParsing()) {
+ Thread.sleep(memoryCheckIntervalMs);
+ }
+
+ final double waitTimeSeconds = (System.currentTimeMillis() - startTime) /
1000.0;
+ if (waitTimeSeconds > 1.0) {
+ LOGGER.info(
+ "Wait for resource enough for parsing {} for {} seconds.",
+ resource != null ? resource.getTsFilePath() : "tsfile",
+ waitTimeSeconds);
+ } else if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Wait for resource enough for parsing {} for {} seconds.",
+ resource != null ? resource.getTsFilePath() : "tsfile",
+ waitTimeSeconds);
+ }
+ }
+
/** The method is used to prevent circular replication in PipeConsensus */
public boolean isGeneratedByPipeConsensus() {
return isGeneratedByPipeConsensus;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index 09077a87a3a..f0caf0a631b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -73,6 +73,11 @@ public class PipeMemoryManager {
PipeConfig.getInstance().getPipeMemoryExpanderIntervalSeconds());
}
+ public boolean isEnough4TabletParsing() {
+ return (double) usedMemorySizeInBytesOfTablets
+ < 0.95 * TABLET_MEMORY_REJECT_THRESHOLD * TOTAL_MEMORY_SIZE_IN_BYTES;
+ }
+
public synchronized PipeMemoryBlock forceAllocate(long sizeInBytes)
throws PipeRuntimeOutOfMemoryCriticalException {
return forceAllocate(sizeInBytes, PipeMemoryBlockType.NORMAL);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 32282e11029..b3162ee7567 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -261,6 +261,7 @@ public class CommonConfig {
private long pipeMemoryAllocateMinSizeInBytes = 32;
private long pipeMemoryAllocateForTsFileSequenceReaderInBytes = (long) 2 *
1024 * 1024; // 2MB
private long pipeMemoryExpanderIntervalSeconds = (long) 3 * 60; // 3Min
+ private volatile long pipeTsFileParserCheckMemoryEnoughIntervalMs = 10L;
private float pipeLeaderCacheMemoryUsagePercentage = 0.1F;
private long pipeListeningQueueTransferSnapshotThreshold = 1000;
private int pipeSnapshotExecutionMaxBatchSize = 1000;
@@ -1067,6 +1068,15 @@ public class CommonConfig {
this.pipeMemoryExpanderIntervalSeconds = pipeMemoryExpanderIntervalSeconds;
}
+ public long getPipeTsFileParserCheckMemoryEnoughIntervalMs() {
+ return pipeTsFileParserCheckMemoryEnoughIntervalMs;
+ }
+
+ public void setPipeTsFileParserCheckMemoryEnoughIntervalMs(
+ long pipeTsFileParserCheckMemoryEnoughIntervalMs) {
+ this.pipeTsFileParserCheckMemoryEnoughIntervalMs =
pipeTsFileParserCheckMemoryEnoughIntervalMs;
+ }
+
public int getPipeMemoryAllocateMaxRetries() {
return pipeMemoryAllocateMaxRetries;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 09fb03bbbde..accbdc89895 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -561,6 +561,11 @@ public class CommonDescriptor {
properties.getProperty(
"pipe_memory_expander_interval_seconds",
String.valueOf(config.getPipeMemoryExpanderIntervalSeconds()))));
+ config.setPipeTsFileParserCheckMemoryEnoughIntervalMs(
+ Long.parseLong(
+ properties.getProperty(
+ "pipe_tsfile_parser_check_memory_enough_interval_ms",
+
String.valueOf(config.getPipeTsFileParserCheckMemoryEnoughIntervalMs()))));
config.setPipeLeaderCacheMemoryUsagePercentage(
Float.parseFloat(
properties.getProperty(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index caa40e67189..c41841264fa 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -298,6 +298,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeMemoryExpanderIntervalSeconds();
}
+ public long getPipeTsFileParserCheckMemoryEnoughIntervalMs() {
+ return COMMON_CONFIG.getPipeTsFileParserCheckMemoryEnoughIntervalMs();
+ }
+
/////////////////////////////// TwoStage ///////////////////////////////
public long getTwoStageAggregateMaxCombinerLiveTimeInMs() {
@@ -446,6 +450,9 @@ public class PipeConfig {
"PipeMemoryAllocateForTsFileSequenceReaderInBytes: {}",
getPipeMemoryAllocateForTsFileSequenceReaderInBytes());
LOGGER.info("PipeMemoryExpanderIntervalSeconds: {}",
getPipeMemoryExpanderIntervalSeconds());
+ LOGGER.info(
+ "PipeTsFileParserCheckMemoryEnoughIntervalMs: {}",
+ getPipeTsFileParserCheckMemoryEnoughIntervalMs());
LOGGER.info(
"TwoStageAggregateMaxCombinerLiveTimeInMs: {}",