This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e99fc445663 Pipe: Optimize Batch and WAL memory allocation algorithms 
(#15534)
e99fc445663 is described below

commit e99fc44566325a5b72f65d35d7f47e34367f65d1
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed May 28 18:50:14 2025 +0800

    Pipe: Optimize Batch and WAL memory allocation algorithms (#15534)
---
 .../evolvable/batch/PipeTabletEventBatch.java      |  34 ++---
 .../resource/memory/PipeDynamicMemoryBlock.java    | 156 +++++++++++++++++++++
 .../pipe/resource/memory/PipeMemoryBlockType.java  |   2 +
 .../db/pipe/resource/memory/PipeMemoryManager.java |  54 +++++++
 .../resource/memory/PipeModelFixedMemoryBlock.java | 125 +++++++++++++++++
 .../DynamicMemoryAllocationStrategy.java}          |  17 ++-
 .../strategy/ThresholdAllocationStrategy.java      | 134 ++++++++++++++++++
 .../dataregion/wal/utils/WALInsertNodeCache.java   |  96 +++++++------
 .../apache/iotdb/commons/conf/CommonConfig.java    | 115 ++++++++++++++-
 .../iotdb/commons/pipe/config/PipeConfig.java      |  41 ++++++
 .../iotdb/commons/pipe/config/PipeDescriptor.java  |  42 ++++++
 11 files changed, 751 insertions(+), 65 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
index d207ec9018d..9bff91c50ad 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -22,7 +22,9 @@ package 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
-import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+import org.apache.iotdb.db.pipe.resource.memory.PipeDynamicMemoryBlock;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlockType;
+import org.apache.iotdb.db.pipe.resource.memory.PipeModelFixedMemoryBlock;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -38,6 +40,11 @@ import java.util.Objects;
 public abstract class PipeTabletEventBatch implements AutoCloseable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTabletEventBatch.class);
+  private static final PipeModelFixedMemoryBlock PIPE_MODEL_FIXED_MEMORY_BLOCK 
=
+      PipeDataNodeResourceManager.memory()
+          .forceAllocateForModelFixedMemoryBlock(
+              
PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfBatch(),
+              PipeMemoryBlockType.BATCH);
 
   protected final List<EnrichedEvent> events = new ArrayList<>();
 
@@ -45,7 +52,7 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
   private long firstEventProcessingTime = Long.MIN_VALUE;
 
   protected long totalBufferSize = 0;
-  private final PipeMemoryBlock allocatedMemoryBlock;
+  private final PipeDynamicMemoryBlock allocatedMemoryBlock;
 
   protected volatile boolean isClosed = false;
 
@@ -54,19 +61,8 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
 
     // limit in buffer size
     this.allocatedMemoryBlock =
-        PipeDataNodeResourceManager.memory()
-            .tryAllocate(requestMaxBatchSizeInBytes)
-            .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 0))
-            .setShrinkCallback(
-                (oldMemory, newMemory) ->
-                    LOGGER.info(
-                        "The batch size limit has shrunk from {} to {}.", 
oldMemory, newMemory))
-            .setExpandMethod(
-                oldMemory -> Math.min(Math.max(oldMemory, 1) * 2, 
requestMaxBatchSizeInBytes))
-            .setExpandCallback(
-                (oldMemory, newMemory) ->
-                    LOGGER.info(
-                        "The batch size limit has expanded from {} to {}.", 
oldMemory, newMemory));
+        
PIPE_MODEL_FIXED_MEMORY_BLOCK.registerPipeBatchMemoryBlock(requestMaxBatchSizeInBytes);
+    allocatedMemoryBlock.setExpandable(false);
 
     if (getMaxBatchSizeInBytes() != requestMaxBatchSizeInBytes) {
       LOGGER.info(
@@ -131,8 +127,12 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
       throws WALPipeException, IOException;
 
   public boolean shouldEmit() {
-    return totalBufferSize >= getMaxBatchSizeInBytes()
-        || System.currentTimeMillis() - firstEventProcessingTime >= 
maxDelayInMs;
+    final long diff = System.currentTimeMillis() - firstEventProcessingTime;
+    if (totalBufferSize >= getMaxBatchSizeInBytes() || diff >= maxDelayInMs) {
+      allocatedMemoryBlock.updateCurrentMemoryEfficiencyAdjustMem((double) 
diff / maxDelayInMs);
+      return true;
+    }
+    return false;
   }
 
   private long getMaxBatchSizeInBytes() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeDynamicMemoryBlock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeDynamicMemoryBlock.java
new file mode 100644
index 00000000000..4e33b871828
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeDynamicMemoryBlock.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.memory;
+
+import org.apache.tsfile.utils.Pair;
+
+import javax.validation.constraints.NotNull;
+
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+public class PipeDynamicMemoryBlock {
+
+  private final PipeModelFixedMemoryBlock fixedMemoryBlock;
+
+  private boolean isExpandable = true;
+
+  private Consumer<PipeDynamicMemoryBlock> expand = null;
+
+  private volatile boolean released = false;
+
+  private volatile long memoryUsageInBytes;
+
+  private volatile double historyMemoryEfficiency;
+
+  private volatile double currentMemoryEfficiency;
+
+  PipeDynamicMemoryBlock(
+      final @NotNull PipeModelFixedMemoryBlock fixedMemoryBlock, final long 
memoryUsageInBytes) {
+    this.memoryUsageInBytes = Math.min(memoryUsageInBytes, 0);
+    this.fixedMemoryBlock = fixedMemoryBlock;
+  }
+
+  public long getMemoryUsageInBytes() {
+    return memoryUsageInBytes;
+  }
+
+  public void setMemoryUsageInBytes(final long memoryUsageInBytes) {
+    this.memoryUsageInBytes = memoryUsageInBytes;
+  }
+
+  public Pair<Double, Double> getMemoryEfficiency() {
+    synchronized (fixedMemoryBlock) {
+      return new Pair<>(historyMemoryEfficiency, currentMemoryEfficiency);
+    }
+  }
+
+  public void setExpandable(boolean expandable) {
+    isExpandable = expandable;
+  }
+
+  public void setExpand(Consumer<PipeDynamicMemoryBlock> expand) {
+    this.expand = expand;
+  }
+
+  public double getMemoryBlockUsageRatio() {
+    return (double) memoryUsageInBytes / 
fixedMemoryBlock.getMemoryUsageInBytes();
+  }
+
+  public double getFixedMemoryBlockUsageRatio() {
+    return (double) fixedMemoryBlock.getMemoryAllocatedInBytes()
+        / fixedMemoryBlock.getMemoryUsageInBytes();
+  }
+
+  public long canAllocateMemorySize() {
+    return fixedMemoryBlock.getMemoryUsageInBytes() - 
fixedMemoryBlock.getMemoryAllocatedInBytes();
+  }
+
+  public synchronized long getExpectedAverageAllocatedMemorySize() {
+    return fixedMemoryBlock.getMemoryUsageInBytes() / 
fixedMemoryBlock.getMemoryBlocks().size();
+  }
+
+  public void updateCurrentMemoryEfficiencyAdjustMem(double 
currentMemoryEfficiency) {
+    synchronized (fixedMemoryBlock) {
+      this.historyMemoryEfficiency = this.currentMemoryEfficiency;
+      if (Double.isNaN(currentMemoryEfficiency)
+          || Double.isInfinite(currentMemoryEfficiency)
+          || currentMemoryEfficiency < 0.0) {
+        currentMemoryEfficiency = 0.0;
+      }
+      this.currentMemoryEfficiency = Math.min(currentMemoryEfficiency, 1.0);
+      fixedMemoryBlock.dynamicallyAdjustMemory(this);
+    }
+  }
+
+  public long getFixedMemoryCapacity() {
+    return fixedMemoryBlock.getMemoryUsageInBytes();
+  }
+
+  public void updateMemoryEfficiency(
+      double currentMemoryEfficiency, double historyMemoryEfficiency) {
+    synchronized (fixedMemoryBlock) {
+      if (Double.isNaN(currentMemoryEfficiency)
+          || Double.isInfinite(currentMemoryEfficiency)
+          || currentMemoryEfficiency < 0.0) {
+        currentMemoryEfficiency = 0.0;
+      }
+
+      if (Double.isNaN(historyMemoryEfficiency)
+          || Double.isInfinite(historyMemoryEfficiency)
+          || historyMemoryEfficiency < 0.0) {
+        currentMemoryEfficiency = 0.0;
+      }
+
+      this.historyMemoryEfficiency = Math.min(historyMemoryEfficiency, 1.0);
+      this.currentMemoryEfficiency = Math.min(currentMemoryEfficiency, 1.0);
+    }
+  }
+
+  public Stream<PipeDynamicMemoryBlock> getMemoryBlocks() {
+    return fixedMemoryBlock.getMemoryBlocksStream();
+  }
+
+  public void applyForDynamicMemory(final long memoryUsageInBytes) {
+    fixedMemoryBlock.resetMemoryBlockSize(this, memoryUsageInBytes);
+  }
+
+  public boolean isReleased() {
+    return released;
+  }
+
+  public void close() {
+    if (released) {
+      return;
+    }
+    synchronized (fixedMemoryBlock) {
+      if (!released) {
+        fixedMemoryBlock.releaseMemory(this);
+        released = true;
+      }
+    }
+  }
+
+  void doExpand() {
+    if (isExpandable && expand != null) {
+      expand.accept(this);
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlockType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlockType.java
index 5b626df04c3..846fc7dd1ce 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlockType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlockType.java
@@ -23,4 +23,6 @@ public enum PipeMemoryBlockType {
   NORMAL,
   TABLET,
   TS_FILE,
+  BATCH,
+  WAL
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index a98338dd417..90e168a8f9b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.memory.MemoryBlockType;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+import 
org.apache.iotdb.db.pipe.resource.memory.strategy.ThresholdAllocationStrategy;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -100,6 +101,18 @@ public class PipeMemoryManager {
         * getTotalNonFloatingMemorySizeInBytes();
   }
 
+  public long getAllocatedMemorySizeInBytesOfWAL() {
+    return (long)
+        (PIPE_CONFIG.getPipeDataStructureWalMemoryProportion()
+            * getTotalNonFloatingMemorySizeInBytes());
+  }
+
+  public long getAllocatedMemorySizeInBytesOfBatch() {
+    return (long)
+        (PIPE_CONFIG.getPipeDataStructureBatchMemoryProportion()
+            * getTotalNonFloatingMemorySizeInBytes());
+  }
+
   public boolean isEnough4TabletParsing() {
     return (double) usedMemorySizeInBytesOfTablets + (double) 
usedMemorySizeInBytesOfTsFiles
             < EXCEED_PROTECT_THRESHOLD * 
allowedMaxMemorySizeInBytesOfTabletsAndTsFiles()
@@ -225,6 +238,39 @@ public class PipeMemoryManager {
     }
   }
 
+  public PipeModelFixedMemoryBlock forceAllocateForModelFixedMemoryBlock(
+      long fixedSizeInBytes, PipeMemoryBlockType type)
+      throws PipeRuntimeOutOfMemoryCriticalException {
+    if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
+      return new PipeModelFixedMemoryBlock(Long.MAX_VALUE, new 
ThresholdAllocationStrategy());
+    }
+
+    if (fixedSizeInBytes == 0) {
+      return (PipeModelFixedMemoryBlock) registerMemoryBlock(0, type);
+    }
+
+    for (int i = 1, size = PIPE_CONFIG.getPipeMemoryAllocateMaxRetries(); i <= 
size; i++) {
+      if (getFreeMemorySizeInBytes() >= fixedSizeInBytes) {
+        break;
+      }
+
+      try {
+        Thread.sleep(PIPE_CONFIG.getPipeMemoryAllocateRetryIntervalInMs());
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+        LOGGER.warn("forceAllocateWithRetry: interrupted while waiting for 
available memory", ex);
+      }
+    }
+
+    if (getFreeMemorySizeInBytes() < fixedSizeInBytes) {
+      return (PipeModelFixedMemoryBlock) 
forceAllocateWithRetry(getFreeMemorySizeInBytes(), type);
+    }
+
+    synchronized (this) {
+      return (PipeModelFixedMemoryBlock) 
forceAllocateWithRetry(fixedSizeInBytes, type);
+    }
+  }
+
   private PipeMemoryBlock forceAllocateWithRetry(long sizeInBytes, 
PipeMemoryBlockType type)
       throws PipeRuntimeOutOfMemoryCriticalException {
     if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
@@ -233,6 +279,9 @@ public class PipeMemoryManager {
           return new PipeTabletMemoryBlock(sizeInBytes);
         case TS_FILE:
           return new PipeTsFileMemoryBlock(sizeInBytes);
+        case BATCH:
+        case WAL:
+          return new PipeModelFixedMemoryBlock(sizeInBytes, new 
ThresholdAllocationStrategy());
         default:
           return new PipeMemoryBlock(sizeInBytes);
       }
@@ -466,6 +515,11 @@ public class PipeMemoryManager {
       case TS_FILE:
         returnedMemoryBlock = new PipeTsFileMemoryBlock(sizeInBytes);
         break;
+      case BATCH:
+      case WAL:
+        returnedMemoryBlock =
+            new PipeModelFixedMemoryBlock(sizeInBytes, new 
ThresholdAllocationStrategy());
+        break;
       default:
         returnedMemoryBlock = new PipeMemoryBlock(sizeInBytes);
         break;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeModelFixedMemoryBlock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeModelFixedMemoryBlock.java
new file mode 100644
index 00000000000..647fb81a4b9
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeModelFixedMemoryBlock.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.memory;
+
+import 
org.apache.iotdb.db.pipe.resource.memory.strategy.DynamicMemoryAllocationStrategy;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
+
+public class PipeModelFixedMemoryBlock extends PipeFixedMemoryBlock {
+
+  private final Set<PipeDynamicMemoryBlock> memoryBlocks =
+      Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+  private final DynamicMemoryAllocationStrategy allocationStrategy;
+
+  private volatile long memoryAllocatedInBytes;
+
+  public PipeModelFixedMemoryBlock(
+      final long memoryUsageInBytes, final DynamicMemoryAllocationStrategy 
allocationStrategy) {
+    super(memoryUsageInBytes);
+    this.memoryAllocatedInBytes = 0;
+    this.allocationStrategy = allocationStrategy;
+  }
+
+  public synchronized PipeDynamicMemoryBlock registerPipeBatchMemoryBlock(
+      final long memorySizeInBytes) {
+    final PipeDynamicMemoryBlock memoryBlock = new 
PipeDynamicMemoryBlock(this, 0);
+    memoryBlocks.add(memoryBlock);
+    if (memorySizeInBytes != 0) {
+      resetMemoryBlockSize(memoryBlock, memorySizeInBytes);
+      double e = (double) getMemoryUsageInBytes() / memorySizeInBytes;
+      memoryBlock.updateMemoryEfficiency(e, e);
+      return memoryBlock;
+    }
+
+    memoryBlock.updateMemoryEfficiency(0.0, 0.0);
+    return memoryBlock;
+  }
+
+  @Override
+  public synchronized boolean expand() {
+    // Ensure that the memory block that gets most of the memory is released 
first, which can reduce
+    // the jitter of memory allocationIf the memory block is not expanded, it 
will not be expanded
+    // again.This function not only completes the expansion but also the 
reduction.
+    memoryBlocks.stream()
+        .sorted((a, b) -> Long.compare(b.getMemoryUsageInBytes(), 
a.getMemoryUsageInBytes()))
+        .forEach(PipeDynamicMemoryBlock::doExpand);
+    return false;
+  }
+
+  public long getMemoryAllocatedInBytes() {
+    return memoryAllocatedInBytes;
+  }
+
+  public synchronized Set<PipeDynamicMemoryBlock> getMemoryBlocks() {
+    return memoryBlocks;
+  }
+
+  synchronized void releaseMemory(final PipeDynamicMemoryBlock memoryBlock) {
+    resetMemoryBlockSize(memoryBlock, 0);
+    memoryBlocks.remove(memoryBlock);
+  }
+
+  synchronized void dynamicallyAdjustMemory(final PipeDynamicMemoryBlock 
block) {
+    if (this.isReleased() || block.isReleased() || 
!memoryBlocks.contains(block)) {
+      throw new IllegalStateException("The memory block has been released");
+    }
+    allocationStrategy.dynamicallyAdjustMemory(block);
+  }
+
+  synchronized void resetMemoryBlockSize(
+      final PipeDynamicMemoryBlock block, final long memorySizeInBytes) {
+    if (this.isReleased() || block.isReleased() || 
!memoryBlocks.contains(block)) {
+      throw new IllegalStateException("The memory block has been released");
+    }
+
+    final long diff = memorySizeInBytes - block.getMemoryUsageInBytes();
+
+    // If the capacity is expanded, determine whether it will exceed the 
maximum value of the fixed
+    // module
+    if (getMemoryUsageInBytes() - memoryAllocatedInBytes < diff) {
+      // Pay attention to the order of calls, otherwise it will cause resource 
leakage
+      block.setMemoryUsageInBytes(
+          block.getMemoryUsageInBytes() + getMemoryUsageInBytes() - 
memoryAllocatedInBytes);
+      memoryAllocatedInBytes = getMemoryUsageInBytes();
+      return;
+    }
+
+    memoryAllocatedInBytes = memoryAllocatedInBytes + diff;
+    block.setMemoryUsageInBytes(memorySizeInBytes);
+  }
+
+  Stream<PipeDynamicMemoryBlock> getMemoryBlocksStream() {
+    if (isReleased()) {
+      throw new IllegalStateException("The memory block has been released");
+    }
+    return memoryBlocks.stream();
+  }
+
+  @Override
+  public synchronized void close() {
+    memoryBlocks.forEach(PipeDynamicMemoryBlock::close);
+    super.close();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlockType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/strategy/DynamicMemoryAllocationStrategy.java
similarity index 50%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlockType.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/strategy/DynamicMemoryAllocationStrategy.java
index 5b626df04c3..8e5ba9af053 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlockType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/strategy/DynamicMemoryAllocationStrategy.java
@@ -17,10 +17,17 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.resource.memory;
+package org.apache.iotdb.db.pipe.resource.memory.strategy;
 
-public enum PipeMemoryBlockType {
-  NORMAL,
-  TABLET,
-  TS_FILE,
+import org.apache.iotdb.db.pipe.resource.memory.PipeDynamicMemoryBlock;
+
+// Now let's define the operation memory behavior: Producers produce memory, 
consumers consume
+// memory, and in order to ensure that consumers do not encounter back 
pressure, the memory that
+// consumers need to use is allocated in advance. Consumer instances obtain 
their expected memory
+// through allocation strategies, and the total memory of all consumer 
instances must not be greater
+// than the pre-allocated memory. The memory allocation algorithm is to adjust 
the memory of
+// consumers so that the consumption rate can reach the optimal
+public interface DynamicMemoryAllocationStrategy {
+
+  void dynamicallyAdjustMemory(PipeDynamicMemoryBlock dynamicMemoryBlock);
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/strategy/ThresholdAllocationStrategy.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/strategy/ThresholdAllocationStrategy.java
new file mode 100644
index 00000000000..72a390f799e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/strategy/ThresholdAllocationStrategy.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.memory.strategy;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.resource.memory.PipeDynamicMemoryBlock;
+
+import org.apache.tsfile.utils.Pair;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+// The algorithm is optimized for different scenarios: The following describes 
the behavior of the
+// algorithm in different scenarios:
+// 1. When the memory is large enough, it will try to allocate memory to the 
memory block
+// 2. When the memory is insufficient, the algorithm will try to ensure that 
the memory with a
+// relatively large memory share is released
+public class ThresholdAllocationStrategy implements 
DynamicMemoryAllocationStrategy {
+
+  private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
+
+  @Override
+  public void dynamicallyAdjustMemory(final PipeDynamicMemoryBlock 
dynamicMemoryBlock) {
+    final double deficitRatio = calculateDeficitRatio(dynamicMemoryBlock);
+    final long oldMemoryUsageInBytes = 
dynamicMemoryBlock.getMemoryUsageInBytes();
+    final long expectedMemory = (long) (oldMemoryUsageInBytes / deficitRatio);
+    final double memoryBlockUsageRatio = 
dynamicMemoryBlock.getMemoryBlockUsageRatio();
+    final long maximumMemoryIncrease =
+        (long)
+            (dynamicMemoryBlock.getFixedMemoryCapacity()
+                * 
PIPE_CONFIG.getPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio());
+
+    // Avoid overflow and infinite values
+    if (deficitRatio <= 0.0 || oldMemoryUsageInBytes == 0 || expectedMemory == 
0) {
+      dynamicMemoryBlock.applyForDynamicMemory(maximumMemoryIncrease);
+      final double efficiencyRatio =
+          (double) dynamicMemoryBlock.getMemoryUsageInBytes() / 
maximumMemoryIncrease;
+      dynamicMemoryBlock.updateMemoryEfficiency(efficiencyRatio, 
efficiencyRatio);
+      return;
+    }
+
+    // No matter what, give priority to applying for memory use, and adjust 
the memory size when the
+    // memory is insufficient
+    final double lowUsageThreshold =
+        PIPE_CONFIG.getPipeThresholdAllocationStrategyLowUsageThreshold();
+    if (dynamicMemoryBlock.getFixedMemoryBlockUsageRatio()
+        < 
PIPE_CONFIG.getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold()) {
+      if (deficitRatio >= 1.0) {
+        return;
+      }
+
+      final long maxAvailableMemory =
+          Math.min(expectedMemory, dynamicMemoryBlock.canAllocateMemorySize());
+      long newMemoryRequest;
+
+      // Need to ensure that you get memory in smaller chunks and get more 
memory faster
+      if (memoryBlockUsageRatio > lowUsageThreshold) {
+        newMemoryRequest =
+            Math.min(oldMemoryUsageInBytes + oldMemoryUsageInBytes / 2, 
maxAvailableMemory);
+      } else {
+        newMemoryRequest = Math.min(oldMemoryUsageInBytes * 2, 
maxAvailableMemory);
+      }
+
+      dynamicMemoryBlock.applyForDynamicMemory(newMemoryRequest);
+      final double efficiencyRatio =
+          dynamicMemoryBlock.getMemoryUsageInBytes() / (double) expectedMemory;
+      dynamicMemoryBlock.updateMemoryEfficiency(efficiencyRatio, 
efficiencyRatio);
+      return;
+    }
+
+    // Entering this logic means that the memory is insufficient and the 
memory allocation needs to
+    // be adjusted
+    final AtomicBoolean isMemoryNotEnough = new AtomicBoolean(false);
+    final double averageDeficitRatio =
+        dynamicMemoryBlock
+            .getMemoryBlocks()
+            .mapToDouble(
+                block -> {
+                  double ratio = calculateDeficitRatio(block);
+                  if (block.getMemoryUsageInBytes() == 0 || ratio == 0.0) {
+                    isMemoryNotEnough.set(true);
+                  }
+                  return ratio;
+                })
+            .average()
+            .orElse(1.0);
+
+    final double adjustmentThreshold = 
PIPE_CONFIG.getPipeDynamicMemoryAdjustmentThreshold();
+    // When memory is insufficient, try to ensure that smaller memory blocks 
apply for less memory,
+    // and larger memory blocks release more memory.
+    final double diff =
+        isMemoryNotEnough.get() && averageDeficitRatio > 2 * 
adjustmentThreshold
+            ? averageDeficitRatio - deficitRatio - adjustmentThreshold
+            : averageDeficitRatio - deficitRatio;
+
+    if (Math.abs(diff) > 
PIPE_CONFIG.getPipeDynamicMemoryAdjustmentThreshold()) {
+      final long mem = (long) ((dynamicMemoryBlock.getMemoryUsageInBytes() / 
deficitRatio) * diff);
+      
dynamicMemoryBlock.applyForDynamicMemory(dynamicMemoryBlock.getMemoryUsageInBytes()
 + mem);
+      if (oldMemoryUsageInBytes != dynamicMemoryBlock.getMemoryUsageInBytes()) 
{
+        final double efficiencyRatio =
+            dynamicMemoryBlock.getMemoryUsageInBytes() / (double) 
expectedMemory;
+        dynamicMemoryBlock.updateMemoryEfficiency(efficiencyRatio, 
efficiencyRatio);
+      }
+    } else if (memoryBlockUsageRatio > lowUsageThreshold
+        && memoryBlockUsageRatio > 
dynamicMemoryBlock.getExpectedAverageAllocatedMemorySize()) {
+      // If there is insufficient memory, some memory must be released
+      dynamicMemoryBlock.applyForDynamicMemory(oldMemoryUsageInBytes / 2);
+      dynamicMemoryBlock.updateMemoryEfficiency(deficitRatio / 2, deficitRatio 
/ 2);
+    }
+  }
+
+  private double calculateDeficitRatio(final PipeDynamicMemoryBlock block) {
+    final Pair<Double, Double> memoryEfficiency = block.getMemoryEfficiency();
+    double pipeDynamicMemoryHistoryWeight = 
PIPE_CONFIG.getPipeDynamicMemoryHistoryWeight();
+    return (1 - pipeDynamicMemoryHistoryWeight) * memoryEfficiency.getRight()
+        + pipeDynamicMemoryHistoryWeight * memoryEfficiency.getLeft();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index 0823c7e7b6e..20469f2b798 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -27,7 +27,9 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.metric.overview.PipeWALInsertNodeCacheMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator;
-import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+import org.apache.iotdb.db.pipe.resource.memory.PipeDynamicMemoryBlock;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlockType;
+import org.apache.iotdb.db.pipe.resource.memory.PipeModelFixedMemoryBlock;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
@@ -64,7 +66,14 @@ public class WALInsertNodeCache {
       IoTDBDescriptor.getInstance().getMemoryConfig();
   private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
 
-  private final PipeMemoryBlock allocatedMemoryBlock;
+  private static final PipeModelFixedMemoryBlock WAL_MODEL_FIXED_MEMORY =
+      PipeDataNodeResourceManager.memory()
+          .forceAllocateForModelFixedMemoryBlock(
+              
PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfWAL(),
+              PipeMemoryBlockType.WAL);
+
+  private final PipeDynamicMemoryBlock memoryBlock;
+
   // Used to adjust the memory usage of the cache
   private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
   private final AtomicBoolean isBatchLoadEnabled = new AtomicBoolean(true);
@@ -87,28 +96,12 @@ public class WALInsertNodeCache {
                 0.5
                     * 
MEMORY_CONFIG.getPipeMemoryManager().getTotalMemorySizeInBytes()
                     / CONFIG.getDataRegionNum());
-    allocatedMemoryBlock =
-        PipeDataNodeResourceManager.memory()
-            .tryAllocate(requestedAllocateSize)
-            .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
-            .setExpandMethod(
-                oldMemory -> Math.min(Math.max(oldMemory, 1) * 2, 
requestedAllocateSize))
-            .setExpandCallback(
-                (oldMemory, newMemory) -> {
-                  memoryUsageCheatFactor.updateAndGet(
-                      factor -> factor / ((double) newMemory / oldMemory));
-                  isBatchLoadEnabled.set(newMemory >= 
CONFIG.getWalFileSizeThresholdInByte());
-                  LOGGER.info(
-                      "WALInsertNodeCache.allocatedMemoryBlock of dataRegion 
{} has expanded from {} to {}.",
-                      dataRegionId,
-                      oldMemory,
-                      newMemory);
-                });
+    memoryBlock = 
WAL_MODEL_FIXED_MEMORY.registerPipeBatchMemoryBlock(requestedAllocateSize);
     isBatchLoadEnabled.set(
-        allocatedMemoryBlock.getMemoryUsageInBytes() >= 
CONFIG.getWalFileSizeThresholdInByte());
+        memoryBlock.getMemoryUsageInBytes() >= 
CONFIG.getWalFileSizeThresholdInByte());
     lruCache =
         Caffeine.newBuilder()
-            .maximumWeight(allocatedMemoryBlock.getMemoryUsageInBytes())
+            .maximumWeight(requestedAllocateSize)
             .weigher(
                 (Weigher<WALEntryPosition, Pair<ByteBuffer, InsertNode>>)
                     (position, pair) -> {
@@ -129,30 +122,51 @@ public class WALInsertNodeCache {
                     })
             .recordStats()
             .build(new WALInsertNodeCacheLoader());
-    allocatedMemoryBlock.setShrinkCallback(
-        (oldMemory, newMemory) -> {
-          memoryUsageCheatFactor.updateAndGet(factor -> factor * ((double) 
oldMemory / newMemory));
-          isBatchLoadEnabled.set(newMemory >= 
CONFIG.getWalFileSizeThresholdInByte());
-          LOGGER.info(
-              "WALInsertNodeCache.allocatedMemoryBlock of dataRegion {} has 
shrunk from {} to {}.",
-              dataRegionId,
-              oldMemory,
-              newMemory);
-          if (CONFIG.getWALCacheShrinkClearEnabled()) {
-            try {
-              lruCache.cleanUp();
-            } catch (Exception e) {
-              LOGGER.warn(
-                  "Failed to clear WALInsertNodeCache for dataRegion ID: {}.", 
dataRegionId, e);
-              return;
-            }
-            LOGGER.info(
-                "Successfully cleared WALInsertNodeCache for dataRegion ID: 
{}.", dataRegionId);
+
+    memoryBlock.setExpandable(true);
+    memoryBlock.setExpand(
+        memoryBlock -> {
+          final long oldMemory = memoryBlock.getMemoryUsageInBytes();
+          
memoryBlock.updateCurrentMemoryEfficiencyAdjustMem(lruCache.stats().hitRate());
+          final long newMemory = memoryBlock.getMemoryUsageInBytes();
+          if (newMemory > oldMemory) {
+            setExpandCallback(oldMemory, newMemory, dataRegionId);
+          } else if (newMemory < oldMemory) {
+            shrinkCallback(oldMemory, newMemory, dataRegionId);
           }
         });
     PipeWALInsertNodeCacheMetrics.getInstance().register(this, dataRegionId);
   }
 
+  private void setExpandCallback(long oldMemory, long newMemory, Integer 
dataRegionId) {
+    memoryUsageCheatFactor.updateAndGet(factor -> factor / ((double) newMemory 
/ oldMemory));
+    isBatchLoadEnabled.set(newMemory >= 
CONFIG.getWalFileSizeThresholdInByte());
+    LOGGER.info(
+        "WALInsertNodeCache.allocatedMemoryBlock of dataRegion {} has expanded 
from {} to {}.",
+        dataRegionId,
+        oldMemory,
+        newMemory);
+  }
+
+  private void shrinkCallback(long oldMemory, long newMemory, Integer 
dataRegionId) {
+    memoryUsageCheatFactor.updateAndGet(factor -> factor * ((double) oldMemory 
/ newMemory));
+    isBatchLoadEnabled.set(newMemory >= 
CONFIG.getWalFileSizeThresholdInByte());
+    LOGGER.info(
+        "WALInsertNodeCache.allocatedMemoryBlock of dataRegion {} has shrunk 
from {} to {}.",
+        dataRegionId,
+        oldMemory,
+        newMemory);
+    if (CONFIG.getWALCacheShrinkClearEnabled()) {
+      try {
+        lruCache.cleanUp();
+      } catch (Exception e) {
+        LOGGER.warn("Failed to clear WALInsertNodeCache for dataRegion ID: 
{}.", dataRegionId, e);
+        return;
+      }
+      LOGGER.info("Successfully cleared WALInsertNodeCache for dataRegion ID: 
{}.", dataRegionId);
+    }
+  }
+
   /////////////////////////// Getter & Setter ///////////////////////////
 
   public InsertNode getInsertNode(final WALEntryPosition position) {
@@ -378,7 +392,7 @@ public class WALInsertNodeCache {
   @TestOnly
   public void clear() {
     lruCache.invalidateAll();
-    allocatedMemoryBlock.close();
+    memoryBlock.close();
     memTablesNeedSearch.clear();
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index c3f2e0f4a11..4a1c9ea104c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -216,8 +216,10 @@ public class CommonConfig {
 
   private int pipeDataStructureTabletRowSize = 2048;
   private int pipeDataStructureTabletSizeInBytes = 2097152;
-  private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 
0.4;
-  private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold = 
0.4;
+  private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 
0.2;
+  private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold = 
0.2;
+  private double pipeDataStructureWalMemoryProportion = 0.2;
+  private double PipeDataStructureBatchMemoryProportion = 0.2;
   private double pipeTotalFloatingMemoryProportion = 0.2;
 
   private int pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount = 
10_000;
@@ -302,6 +304,11 @@ public class CommonConfig {
   private PipeRemainingTimeRateAverageTime 
pipeRemainingTimeCommitRateAverageTime =
       PipeRemainingTimeRateAverageTime.MEAN;
   private double pipeTsFileScanParsingThreshold = 0.05;
+  private double pipeDynamicMemoryHistoryWeight = 0.5;
+  private double pipeDynamicMemoryAdjustmentThreshold = 0.05;
+  private double pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio = 
0.1d;
+  private double pipeThresholdAllocationStrategyLowUsageThreshold = 0.2d;
+  private double pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold 
= 0.8d;
 
   private long twoStageAggregateMaxCombinerLiveTimeInMs = 8 * 60 * 1000L; // 8 
minutes
   private long twoStageAggregateDataRegionInfoCacheTimeInMs = 3 * 60 * 1000L; 
// 3 minutes
@@ -841,6 +848,34 @@ public class CommonConfig {
         pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold);
   }
 
+  public double getPipeDataStructureWalMemoryProportion() {
+    return pipeDataStructureWalMemoryProportion;
+  }
+
+  public void setPipeDataStructureWalMemoryProportion(double 
pipeDataStructureWalMemoryProportion) {
+    if (this.pipeDataStructureWalMemoryProportion == 
pipeDataStructureWalMemoryProportion) {
+      return;
+    }
+    this.pipeDataStructureWalMemoryProportion = 
pipeDataStructureWalMemoryProportion;
+    logger.info(
+        "pipeDataStructureWalMemoryProportion is set to {}.", 
pipeDataStructureWalMemoryProportion);
+  }
+
+  public double getPipeDataStructureBatchMemoryProportion() {
+    return PipeDataStructureBatchMemoryProportion;
+  }
+
+  public void setPipeDataStructureBatchMemoryProportion(
+      double PipeDataStructureBatchMemoryProportion) {
+    if (this.PipeDataStructureBatchMemoryProportion == 
PipeDataStructureBatchMemoryProportion) {
+      return;
+    }
+    this.PipeDataStructureBatchMemoryProportion = 
PipeDataStructureBatchMemoryProportion;
+    logger.info(
+        "PipeDataStructureBatchMemoryProportion is set to {}.",
+        PipeDataStructureBatchMemoryProportion);
+  }
+
   public double getPipeTotalFloatingMemoryProportion() {
     return pipeTotalFloatingMemoryProportion;
   }
@@ -1824,6 +1859,82 @@ public class CommonConfig {
     logger.info("pipeTsFileScanParsingThreshold is set to {}", 
pipeTsFileScanParsingThreshold);
   }
 
+  public double getPipeDynamicMemoryHistoryWeight() {
+    return pipeDynamicMemoryHistoryWeight;
+  }
+
+  public void setPipeDynamicMemoryHistoryWeight(double 
pipeDynamicMemoryHistoryWeight) {
+    if (this.pipeDynamicMemoryHistoryWeight == pipeDynamicMemoryHistoryWeight) 
{
+      return;
+    }
+    this.pipeDynamicMemoryHistoryWeight = pipeDynamicMemoryHistoryWeight;
+    logger.info("PipeDynamicMemoryHistoryWeight is set to {}", 
pipeDynamicMemoryHistoryWeight);
+  }
+
+  public double getPipeDynamicMemoryAdjustmentThreshold() {
+    return pipeDynamicMemoryAdjustmentThreshold;
+  }
+
+  public void setPipeDynamicMemoryAdjustmentThreshold(double 
pipeDynamicMemoryAdjustmentThreshold) {
+    if (this.pipeDynamicMemoryAdjustmentThreshold == 
pipeDynamicMemoryAdjustmentThreshold) {
+      return;
+    }
+    this.pipeDynamicMemoryAdjustmentThreshold = 
pipeDynamicMemoryAdjustmentThreshold;
+    logger.info(
+        "pipeDynamicMemoryAdjustmentThreshold is set to {}", 
pipeDynamicMemoryAdjustmentThreshold);
+  }
+
+  public double 
getPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio() {
+    return pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio;
+  }
+
+  public void setPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio(
+      double pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio) {
+    if (this.pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio
+        == pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio) {
+      return;
+    }
+    this.pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio =
+        pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio;
+    logger.info(
+        "pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio is set to 
{}",
+        pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio);
+  }
+
+  public double getPipeThresholdAllocationStrategyLowUsageThreshold() {
+    return pipeThresholdAllocationStrategyLowUsageThreshold;
+  }
+
+  public void setPipeThresholdAllocationStrategyLowUsageThreshold(
+      double pipeThresholdAllocationStrategyLowUsageThreshold) {
+    if (this.pipeThresholdAllocationStrategyLowUsageThreshold
+        == pipeThresholdAllocationStrategyLowUsageThreshold) {
+      return;
+    }
+    this.pipeThresholdAllocationStrategyLowUsageThreshold =
+        pipeThresholdAllocationStrategyLowUsageThreshold;
+    logger.info(
+        "pipeMemoryBlockLowUsageThreshold is set to {}",
+        pipeThresholdAllocationStrategyLowUsageThreshold);
+  }
+
+  public double 
getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold() {
+    return pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold;
+  }
+
+  public void setPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold(
+      double pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold) {
+    if (this.pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold
+        == pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold) {
+      return;
+    }
+    this.pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold =
+        pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold;
+    logger.info(
+        "pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold is set 
to {}",
+        pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold);
+  }
+
   public double getPipeAllSinksRateLimitBytesPerSecond() {
     return pipeAllSinksRateLimitBytesPerSecond;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 851ed8c4b5d..b10d5580f73 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -79,6 +79,14 @@ public class PipeConfig {
     return 
COMMON_CONFIG.getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold();
   }
 
+  public double getPipeDataStructureWalMemoryProportion() {
+    return COMMON_CONFIG.getPipeDataStructureWalMemoryProportion();
+  }
+
+  public double getPipeDataStructureBatchMemoryProportion() {
+    return COMMON_CONFIG.getPipeDataStructureBatchMemoryProportion();
+  }
+
   public double getPipeTotalFloatingMemoryProportion() {
     return COMMON_CONFIG.getPipeTotalFloatingMemoryProportion();
   }
@@ -223,6 +231,26 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeTsFileScanParsingThreshold();
   }
 
+  public double getPipeDynamicMemoryHistoryWeight() {
+    return COMMON_CONFIG.getPipeDynamicMemoryHistoryWeight();
+  }
+
+  public double getPipeDynamicMemoryAdjustmentThreshold() {
+    return COMMON_CONFIG.getPipeDynamicMemoryAdjustmentThreshold();
+  }
+
+  public double 
getPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio() {
+    return 
COMMON_CONFIG.getPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio();
+  }
+
+  public double getPipeThresholdAllocationStrategyLowUsageThreshold() {
+    return COMMON_CONFIG.getPipeThresholdAllocationStrategyLowUsageThreshold();
+  }
+
+  public double 
getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold() {
+    return 
COMMON_CONFIG.getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold();
+  }
+
   /////////////////////////////// Meta Consistency 
///////////////////////////////
 
   public boolean isSeperatedPipeHeartbeatEnabled() {
@@ -487,6 +515,19 @@ public class PipeConfig {
         "PipeRemainingTimeCommitRateAverageTime: {}", 
getPipeRemainingTimeCommitRateAverageTime());
     LOGGER.info("PipeTsFileScanParsingThreshold(): {}", 
getPipeTsFileScanParsingThreshold());
 
+    LOGGER.info("PipeDynamicMemoryHistoryWeight: {}", 
getPipeDynamicMemoryHistoryWeight());
+    LOGGER.info(
+        "PipeDynamicMemoryAdjustmentThreshold: {}", 
getPipeDynamicMemoryAdjustmentThreshold());
+    LOGGER.info(
+        "PipeThresholdAllocationStrategyMaximumMemoryIncrementRatio: {}",
+        getPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio());
+    LOGGER.info(
+        "PipeThresholdAllocationStrategyLowUsageThreshold: {}",
+        getPipeThresholdAllocationStrategyLowUsageThreshold());
+    LOGGER.info(
+        "PipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold: {}",
+        getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold());
+
     LOGGER.info(
         "PipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold: {}",
         getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index f25c285ae5a..ea2829172ce 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -228,6 +228,16 @@ public class PipeDescriptor {
                 
"pipe_data_structure_ts_file_memory_block_allocation_reject_threshold",
                 String.valueOf(
                     
config.getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold()))));
+    config.setPipeDataStructureWalMemoryProportion(
+        Double.parseDouble(
+            properties.getProperty(
+                "pipe_data_structure_wal_memory_proportion",
+                
String.valueOf(config.getPipeDataStructureWalMemoryProportion()))));
+    config.setPipeDataStructureBatchMemoryProportion(
+        Double.parseDouble(
+            properties.getProperty(
+                "pipe_data_structure_batch_memory_proportion",
+                
String.valueOf(config.getPipeDataStructureBatchMemoryProportion()))));
     config.setPipeTotalFloatingMemoryProportion(
         Double.parseDouble(
             properties.getProperty(
@@ -520,6 +530,38 @@ public class PipeDescriptor {
             properties.getProperty(
                 "pipe_tsfile_scan_parsing_threshold",
                 String.valueOf(config.getPipeTsFileScanParsingThreshold()))));
+
+    config.setPipeDynamicMemoryHistoryWeight(
+        Double.parseDouble(
+            properties.getProperty(
+                "pipe_dynamic_memory_history_weight",
+                String.valueOf(config.getPipeDynamicMemoryHistoryWeight()))));
+
+    config.setPipeDynamicMemoryAdjustmentThreshold(
+        Double.parseDouble(
+            properties.getProperty(
+                "pipe_dynamic_memory_adjustment_threshold",
+                
String.valueOf(config.getPipeDynamicMemoryAdjustmentThreshold()))));
+
+    config.setPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio(
+        Double.parseDouble(
+            properties.getProperty(
+                
"pipe_threshold_allocation_strategy_maximum_memory_increment_ratio",
+                String.valueOf(
+                    
config.getPipeThresholdAllocationStrategyMaximumMemoryIncrementRatio()))));
+
+    config.setPipeThresholdAllocationStrategyLowUsageThreshold(
+        Double.parseDouble(
+            properties.getProperty(
+                "pipe_threshold_allocation_strategy_low_usage_threshold",
+                
String.valueOf(config.getPipeThresholdAllocationStrategyLowUsageThreshold()))));
+
+    config.setPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold(
+        Double.parseDouble(
+            properties.getProperty(
+                "pipe_threshold_allocation_strategy_high_usage_threshold",
+                String.valueOf(
+                    
config.getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold()))));
   }
 
   public static void loadPipeExternalConfig(

Reply via email to