This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new bd6e6751e8c Pipe: Avoid the file reading when the memory is not enough 
for tablet parsing (#13954) (#13988)
bd6e6751e8c is described below

commit bd6e6751e8c9fb223e6b7143942c39bb938ca001
Author: Caideyipi <[email protected]>
AuthorDate: Tue Nov 5 15:03:02 2024 +0800

    Pipe: Avoid the file reading when the memory is not enough for tablet 
parsing (#13954) (#13988)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../common/tsfile/PipeTsFileInsertionEvent.java    | 30 ++++++++++++++++++++++
 .../db/pipe/resource/memory/PipeMemoryManager.java |  5 ++++
 .../apache/iotdb/commons/conf/CommonConfig.java    | 10 ++++++++
 .../iotdb/commons/conf/CommonDescriptor.java       |  5 ++++
 .../iotdb/commons/pipe/config/PipeConfig.java      |  7 +++++
 5 files changed, 57 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 9e7a8d421e3..699dceea669 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.event.common.tsfile;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
@@ -30,6 +31,7 @@ import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDat
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTimePartitionProgressIndexKeeper;
 import 
org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
 import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
@@ -401,6 +403,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
             "Pipe skipping temporary TsFile's parsing which shouldn't be 
transferred: {}", tsFile);
         return Collections.emptyList();
       }
+      waitForResourceEnough4Parsing();
       return initDataContainer().toTabletInsertionEvents();
     } catch (final InterruptedException e) {
       Thread.currentThread().interrupt();
@@ -414,6 +417,33 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
     }
   }
 
+  private void waitForResourceEnough4Parsing() throws InterruptedException {
+    final PipeMemoryManager memoryManager = 
PipeDataNodeResourceManager.memory();
+    if (memoryManager.isEnough4TabletParsing()) {
+      return;
+    }
+
+    final long memoryCheckIntervalMs =
+        
PipeConfig.getInstance().getPipeTsFileParserCheckMemoryEnoughIntervalMs();
+    final long startTime = System.currentTimeMillis();
+    while (!memoryManager.isEnough4TabletParsing()) {
+      Thread.sleep(memoryCheckIntervalMs);
+    }
+
+    final double waitTimeSeconds = (System.currentTimeMillis() - startTime) / 
1000.0;
+    if (waitTimeSeconds > 1.0) {
+      LOGGER.info(
+          "Wait for resource enough for parsing {} for {} seconds.",
+          resource != null ? resource.getTsFilePath() : "tsfile",
+          waitTimeSeconds);
+    } else if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
+          "Wait for resource enough for parsing {} for {} seconds.",
+          resource != null ? resource.getTsFilePath() : "tsfile",
+          waitTimeSeconds);
+    }
+  }
+
   /** The method is used to prevent circular replication in PipeConsensus */
   public boolean isGeneratedByPipeConsensus() {
     return isGeneratedByPipeConsensus;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index 09077a87a3a..f0caf0a631b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -73,6 +73,11 @@ public class PipeMemoryManager {
             PipeConfig.getInstance().getPipeMemoryExpanderIntervalSeconds());
   }
 
+  public boolean isEnough4TabletParsing() {
+    return (double) usedMemorySizeInBytesOfTablets
+        < 0.95 * TABLET_MEMORY_REJECT_THRESHOLD * TOTAL_MEMORY_SIZE_IN_BYTES;
+  }
+
   public synchronized PipeMemoryBlock forceAllocate(long sizeInBytes)
       throws PipeRuntimeOutOfMemoryCriticalException {
     return forceAllocate(sizeInBytes, PipeMemoryBlockType.NORMAL);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 32282e11029..b3162ee7567 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -261,6 +261,7 @@ public class CommonConfig {
   private long pipeMemoryAllocateMinSizeInBytes = 32;
   private long pipeMemoryAllocateForTsFileSequenceReaderInBytes = (long) 2 * 
1024 * 1024; // 2MB
   private long pipeMemoryExpanderIntervalSeconds = (long) 3 * 60; // 3Min
+  private volatile long pipeTsFileParserCheckMemoryEnoughIntervalMs = 10L;
   private float pipeLeaderCacheMemoryUsagePercentage = 0.1F;
   private long pipeListeningQueueTransferSnapshotThreshold = 1000;
   private int pipeSnapshotExecutionMaxBatchSize = 1000;
@@ -1067,6 +1068,15 @@ public class CommonConfig {
     this.pipeMemoryExpanderIntervalSeconds = pipeMemoryExpanderIntervalSeconds;
   }
 
+  public long getPipeTsFileParserCheckMemoryEnoughIntervalMs() {
+    return pipeTsFileParserCheckMemoryEnoughIntervalMs;
+  }
+
+  public void setPipeTsFileParserCheckMemoryEnoughIntervalMs(
+      long pipeTsFileParserCheckMemoryEnoughIntervalMs) {
+    this.pipeTsFileParserCheckMemoryEnoughIntervalMs = 
pipeTsFileParserCheckMemoryEnoughIntervalMs;
+  }
+
   public int getPipeMemoryAllocateMaxRetries() {
     return pipeMemoryAllocateMaxRetries;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 09fb03bbbde..accbdc89895 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -561,6 +561,11 @@ public class CommonDescriptor {
             properties.getProperty(
                 "pipe_memory_expander_interval_seconds",
                 
String.valueOf(config.getPipeMemoryExpanderIntervalSeconds()))));
+    config.setPipeTsFileParserCheckMemoryEnoughIntervalMs(
+        Long.parseLong(
+            properties.getProperty(
+                "pipe_tsfile_parser_check_memory_enough_interval_ms",
+                
String.valueOf(config.getPipeTsFileParserCheckMemoryEnoughIntervalMs()))));
     config.setPipeLeaderCacheMemoryUsagePercentage(
         Float.parseFloat(
             properties.getProperty(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index caa40e67189..c41841264fa 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -298,6 +298,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeMemoryExpanderIntervalSeconds();
   }
 
+  public long getPipeTsFileParserCheckMemoryEnoughIntervalMs() {
+    return COMMON_CONFIG.getPipeTsFileParserCheckMemoryEnoughIntervalMs();
+  }
+
   /////////////////////////////// TwoStage ///////////////////////////////
 
   public long getTwoStageAggregateMaxCombinerLiveTimeInMs() {
@@ -446,6 +450,9 @@ public class PipeConfig {
         "PipeMemoryAllocateForTsFileSequenceReaderInBytes: {}",
         getPipeMemoryAllocateForTsFileSequenceReaderInBytes());
     LOGGER.info("PipeMemoryExpanderIntervalSeconds: {}", 
getPipeMemoryExpanderIntervalSeconds());
+    LOGGER.info(
+        "PipeTsFileParserCheckMemoryEnoughIntervalMs: {}",
+        getPipeTsFileParserCheckMemoryEnoughIntervalMs());
 
     LOGGER.info(
         "TwoStageAggregateMaxCombinerLiveTimeInMs: {}",

Reply via email to