This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 29c9186525d Pipe: Enable dynamic adjusting of allocated pipe memory 
(#11518)
29c9186525d is described below

commit 29c9186525d8397359f1644c7955eca8ac7a2899
Author: Caideyipi <[email protected]>
AuthorDate: Mon Nov 13 21:55:26 2023 +0800

    Pipe: Enable dynamic adjusting of allocated pipe memory (#11518)
    
    The allocated pipe memory will automatically shrink after a piece of new 
memory is created if the space is insufficient, and will expand periodically 
when the total space is sufficient.
    
    ---------
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../pipe/agent/runtime/PipeCronEventInjector.java  |  72 -----------
 .../agent/runtime/PipePeriodicalJobExecutor.java   | 100 +++++++++++++++
 .../db/pipe/agent/runtime/PipeRuntimeAgent.java    |   7 +-
 ...oTDBThriftAsyncPipeTransferBatchReqBuilder.java |   2 +-
 ...IoTDBThriftSyncPipeTransferBatchReqBuilder.java |   2 +-
 .../builder/PipeTransferBatchReqBuilder.java       |  30 ++++-
 .../realtime/assigner/DisruptorQueue.java          |   2 +-
 .../db/pipe/resource/memory/PipeMemoryBlock.java   | 137 ++++++++++++++++++++-
 .../db/pipe/resource/memory/PipeMemoryManager.java | 115 ++++++++++++++---
 .../dataregion/wal/utils/WALInsertNodeCache.java   |  62 ++++++++--
 .../iotdb/commons/concurrent/ThreadName.java       |   4 +-
 .../apache/iotdb/commons/conf/CommonConfig.java    |   9 ++
 .../iotdb/commons/conf/CommonDescriptor.java       |   5 +
 .../iotdb/commons/pipe/config/PipeConfig.java      |   4 +
 14 files changed, 430 insertions(+), 121 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeCronEventInjector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeCronEventInjector.java
deleted file mode 100644
index 2198d7608b4..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeCronEventInjector.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.agent.runtime;
-
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import 
org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-public class PipeCronEventInjector {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeCronEventInjector.class);
-
-  private static final long CRON_EVENT_INJECTOR_INTERVAL_SECONDS =
-      
PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds();
-
-  private static final ScheduledExecutorService CRON_EVENT_INJECTOR_EXECUTOR =
-      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
-          ThreadName.PIPE_RUNTIME_CRON_EVENT_INJECTOR.getName());
-
-  private Future<?> injectorFuture;
-
-  public synchronized void start() {
-    if (injectorFuture == null) {
-      injectorFuture =
-          ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
-              CRON_EVENT_INJECTOR_EXECUTOR,
-              this::inject,
-              CRON_EVENT_INJECTOR_INTERVAL_SECONDS,
-              CRON_EVENT_INJECTOR_INTERVAL_SECONDS,
-              TimeUnit.SECONDS);
-      LOGGER.info("Pipe cron event injector is started successfully.");
-    }
-  }
-
-  private synchronized void inject() {
-    PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(false);
-  }
-
-  public synchronized void stop() {
-    if (injectorFuture != null) {
-      injectorFuture.cancel(false);
-      injectorFuture = null;
-      LOGGER.info("Pipe cron event injector is stopped successfully.");
-    }
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
new file mode 100644
index 00000000000..df9240cabe4
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent.runtime;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import 
org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
+import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Single thread to execute pipe periodical jobs on dataNode. This is for 
limiting the thread num on
+ * the DataNode instance.
+ */
+public class PipePeriodicalJobExecutor {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipePeriodicalJobExecutor.class);
+
+  private static final ScheduledExecutorService PERIODICAL_JOB_EXECUTOR =
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+          ThreadName.PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR.getName());
+
+  private static final long CRON_EVENT_INJECTOR_INTERVAL_SECONDS =
+      
PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds();
+  private long cronEventInjectRoundsInterval;
+
+  private static final long MEMORY_EXPANDER_INTERVAL_SECONDS =
+      PipeConfig.getInstance().getPipeMemoryExpanderIntervalSeconds();
+  private long memoryExpandRoundsInterval;
+
+  // Currently we use the CRON_EVENT_INJECTOR_INTERVAL_SECONDS as minimum 
interval
+  private static final long EXECUTOR_INTERVAL_SECONDS = 
CRON_EVENT_INJECTOR_INTERVAL_SECONDS;
+  private long rounds;
+
+  private Future<?> executorFuture;
+
+  public synchronized void start() {
+    if (executorFuture == null) {
+      rounds = 0;
+      cronEventInjectRoundsInterval =
+          Math.max(CRON_EVENT_INJECTOR_INTERVAL_SECONDS / 
EXECUTOR_INTERVAL_SECONDS, 1);
+      memoryExpandRoundsInterval =
+          Math.max(MEMORY_EXPANDER_INTERVAL_SECONDS / 
EXECUTOR_INTERVAL_SECONDS, 1);
+
+      executorFuture =
+          ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+              PERIODICAL_JOB_EXECUTOR,
+              this::execute,
+              EXECUTOR_INTERVAL_SECONDS,
+              EXECUTOR_INTERVAL_SECONDS,
+              TimeUnit.SECONDS);
+      LOGGER.info("Pipe periodical job executor is started successfully.");
+    }
+  }
+
+  private synchronized void execute() {
+    ++rounds;
+
+    if (rounds % cronEventInjectRoundsInterval == 0) {
+      PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(false);
+    }
+
+    if (rounds % memoryExpandRoundsInterval == 0) {
+      PipeResourceManager.memory().tryExpandAll();
+    }
+  }
+
+  public synchronized void stop() {
+    if (executorFuture != null) {
+      executorFuture.cancel(false);
+      executorFuture = null;
+      LOGGER.info("Pipe periodical job executor is stopped successfully.");
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index f7a2a5595f7..e9c6a50920f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -46,7 +46,8 @@ public class PipeRuntimeAgent implements IService {
 
   private final AtomicBoolean isShutdown = new AtomicBoolean(false);
 
-  private final PipeCronEventInjector pipeCronEventInjector = new 
PipeCronEventInjector();
+  private final PipePeriodicalJobExecutor pipePeriodicalJobExecutor =
+      new PipePeriodicalJobExecutor();
 
   private final SimpleConsensusProgressIndexAssigner 
simpleConsensusProgressIndexAssigner =
       new SimpleConsensusProgressIndexAssigner();
@@ -69,7 +70,7 @@ public class PipeRuntimeAgent implements IService {
   public synchronized void start() throws StartupException {
     PipeConfig.getInstance().printAllConfigs();
     PipeAgentLauncher.launchPipeTaskAgent();
-    pipeCronEventInjector.start();
+    pipePeriodicalJobExecutor.start();
 
     isShutdown.set(false);
   }
@@ -81,7 +82,7 @@ public class PipeRuntimeAgent implements IService {
     }
     isShutdown.set(true);
 
-    pipeCronEventInjector.stop();
+    pipePeriodicalJobExecutor.stop();
     PipeAgent.task().dropAllPipeTasks();
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
index 9de0dc84678..d08b9d50233 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
@@ -65,7 +65,7 @@ public class IoTDBThriftAsyncPipeTransferBatchReqBuilder 
extends PipeTransferBat
       bufferSize += req.getBody().length;
     }
 
-    return bufferSize >= maxBatchSizeInBytes
+    return bufferSize >= getMaxBatchSizeInBytes()
         || System.currentTimeMillis() - firstEventProcessingTime >= 
maxDelayInMs;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
index 7347af503a7..bd6244ed600 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
@@ -59,7 +59,7 @@ public class IoTDBThriftSyncPipeTransferBatchReqBuilder 
extends PipeTransferBatc
       bufferSize += req.getBody().length;
     }
 
-    return bufferSize >= maxBatchSizeInBytes
+    return bufferSize >= getMaxBatchSizeInBytes()
         || System.currentTimeMillis() - firstEventProcessingTime >= 
maxDelayInMs;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
index e301645e865..ff0e066dc8a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
@@ -60,7 +60,6 @@ public abstract class PipeTransferBatchReqBuilder implements 
AutoCloseable {
 
   // limit in buffer size
   protected final PipeMemoryBlock allocatedMemoryBlock;
-  protected final long maxBatchSizeInBytes;
   protected long bufferSize = 0;
 
   protected PipeTransferBatchReqBuilder(PipeParameters parameters) {
@@ -74,13 +73,28 @@ public abstract class PipeTransferBatchReqBuilder 
implements AutoCloseable {
         parameters.getLongOrDefault(
             Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, 
SINK_IOTDB_BATCH_SIZE_KEY),
             CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE);
-    allocatedMemoryBlock = 
PipeResourceManager.memory().tryAllocate(requestMaxBatchSizeInBytes);
-    maxBatchSizeInBytes = allocatedMemoryBlock.getMemoryUsageInBytes();
-    if (maxBatchSizeInBytes != requestMaxBatchSizeInBytes) {
+
+    allocatedMemoryBlock =
+        PipeResourceManager.memory()
+            .tryAllocate(requestMaxBatchSizeInBytes)
+            .setShrinkMethod((oldMemory) -> Math.max(oldMemory / 2, 0))
+            .setShrinkCallback(
+                (oldMemory, newMemory) ->
+                    LOGGER.info(
+                        "The batch size limit has shrunk from {} to {}.", 
oldMemory, newMemory))
+            .setExpandMethod(
+                (oldMemory) -> Math.min(Math.max(oldMemory, 1) * 2, 
requestMaxBatchSizeInBytes))
+            .setExpandCallback(
+                (oldMemory, newMemory) ->
+                    LOGGER.info(
+                        "The batch size limit has expanded from {} to {}.", 
oldMemory, newMemory));
+
+    if (getMaxBatchSizeInBytes() != requestMaxBatchSizeInBytes) {
       LOGGER.info(
-          "PipeTransferBatchReqBuilder: the max batch size is adjusted from {} 
to {}.",
+          "PipeTransferBatchReqBuilder: the max batch size is adjusted from {} 
to {} due to the "
+              + "memory restriction",
           requestMaxBatchSizeInBytes,
-          maxBatchSizeInBytes);
+          getMaxBatchSizeInBytes());
     }
   }
 
@@ -88,6 +102,10 @@ public abstract class PipeTransferBatchReqBuilder 
implements AutoCloseable {
     return reqs;
   }
 
+  protected long getMaxBatchSizeInBytes() {
+    return allocatedMemoryBlock.getMemoryUsageInBytes();
+  }
+
   public boolean isEmpty() {
     return reqs.isEmpty();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
index 39a23e0a9d5..7c8b19c0ea5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
@@ -56,7 +56,7 @@ public class DisruptorQueue {
     allocatedMemoryBlock =
         PipeResourceManager.memory()
             .tryAllocate(
-                ringBufferSize * ringBufferEntrySizeInBytes, (currentSize) -> 
currentSize / 2);
+                ringBufferSize * ringBufferEntrySizeInBytes, currentSize -> 
currentSize / 2);
 
     disruptor =
         new Disruptor<>(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
index 331eb32bd3c..7523965aa91 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
@@ -21,18 +21,133 @@ package org.apache.iotdb.db.pipe.resource.memory;
 
 import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
 public class PipeMemoryBlock implements AutoCloseable {
 
-  private final long memoryUsageInBytes;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeMemoryBlock.class);
+
+  private final PipeMemoryManager pipeMemoryManager = 
PipeResourceManager.memory();
+
+  private final ReentrantLock lock = new ReentrantLock();
+
+  private final AtomicLong memoryUsageInBytes = new AtomicLong(0);
+
+  private final AtomicReference<Function<Long, Long>> shrinkMethod = new 
AtomicReference<>();
+  private final AtomicReference<BiConsumer<Long, Long>> shrinkCallback = new 
AtomicReference<>();
+  private final AtomicReference<Function<Long, Long>> expandMethod = new 
AtomicReference<>();
+  private final AtomicReference<BiConsumer<Long, Long>> expandCallback = new 
AtomicReference<>();
 
   private volatile boolean isReleased = false;
 
   public PipeMemoryBlock(long memoryUsageInBytes) {
-    this.memoryUsageInBytes = memoryUsageInBytes;
+    this.memoryUsageInBytes.set(memoryUsageInBytes);
   }
 
   public long getMemoryUsageInBytes() {
-    return memoryUsageInBytes;
+    return memoryUsageInBytes.get();
+  }
+
+  public void setMemoryUsageInBytes(long memoryUsageInBytes) {
+    this.memoryUsageInBytes.set(memoryUsageInBytes);
+  }
+
+  public PipeMemoryBlock setShrinkMethod(Function<Long, Long> shrinkMethod) {
+    this.shrinkMethod.set(shrinkMethod);
+    return this;
+  }
+
+  public PipeMemoryBlock setShrinkCallback(BiConsumer<Long, Long> 
shrinkCallback) {
+    this.shrinkCallback.set(shrinkCallback);
+    return this;
+  }
+
+  public PipeMemoryBlock setExpandMethod(Function<Long, Long> extendMethod) {
+    this.expandMethod.set(extendMethod);
+    return this;
+  }
+
+  public PipeMemoryBlock setExpandCallback(BiConsumer<Long, Long> 
expandCallback) {
+    this.expandCallback.set(expandCallback);
+    return this;
+  }
+
+  boolean shrink() {
+    if (lock.tryLock()) {
+      try {
+        return doShrink();
+      } finally {
+        lock.unlock();
+      }
+    }
+    return false;
+  }
+
+  private boolean doShrink() {
+    if (shrinkMethod.get() == null) {
+      return false;
+    }
+
+    final long oldMemorySizeInBytes = memoryUsageInBytes.get();
+    final long newMemorySizeInBytes = 
shrinkMethod.get().apply(memoryUsageInBytes.get());
+
+    final long memoryInBytesCanBeReleased = oldMemorySizeInBytes - 
newMemorySizeInBytes;
+    if (memoryInBytesCanBeReleased <= 0
+        || !pipeMemoryManager.release(this, memoryInBytesCanBeReleased)) {
+      return false;
+    }
+
+    if (shrinkCallback.get() != null) {
+      try {
+        shrinkCallback.get().accept(oldMemorySizeInBytes, 
newMemorySizeInBytes);
+      } catch (Exception e) {
+        LOGGER.warn("Failed to execute the shrink callback.", e);
+      }
+    }
+    return true;
+  }
+
+  boolean expand() {
+    if (lock.tryLock()) {
+      try {
+        return doExpand();
+      } finally {
+        lock.unlock();
+      }
+    }
+    return false;
+  }
+
+  private boolean doExpand() {
+    if (expandMethod.get() == null) {
+      return false;
+    }
+
+    final long oldMemorySizeInBytes = memoryUsageInBytes.get();
+    final long newMemorySizeInBytes = 
expandMethod.get().apply(memoryUsageInBytes.get());
+
+    final long memoryInBytesNeededToBeAllocated = newMemorySizeInBytes - 
oldMemorySizeInBytes;
+    if (memoryInBytesNeededToBeAllocated <= 0
+        || !pipeMemoryManager.tryAllocate(this, 
memoryInBytesNeededToBeAllocated)) {
+      return false;
+    }
+
+    if (expandCallback.get() != null) {
+      try {
+        expandCallback.get().accept(oldMemorySizeInBytes, 
newMemorySizeInBytes);
+      } catch (Exception e) {
+        LOGGER.warn("Failed to execute the expand callback.", e);
+      }
+    }
+    return true;
   }
 
   boolean isReleased() {
@@ -45,6 +160,20 @@ public class PipeMemoryBlock implements AutoCloseable {
 
   @Override
   public void close() {
-    PipeResourceManager.memory().release(this);
+    while (true) {
+      try {
+        if (lock.tryLock(50, TimeUnit.MICROSECONDS)) {
+          try {
+            pipeMemoryManager.release(this);
+            return;
+          } finally {
+            lock.unlock();
+          }
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOGGER.warn("Interrupted while waiting for the lock.", e);
+      }
+    }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index 644ee13e4e2..dd17f4bbad3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -30,8 +30,12 @@ import 
org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
-import java.util.function.Function;
+import java.util.Set;
+import java.util.function.LongUnaryOperator;
 
 public class PipeMemoryManager {
 
@@ -50,7 +54,9 @@ public class PipeMemoryManager {
   private static final long MEMORY_ALLOCATE_MIN_SIZE_IN_BYTES =
       PipeConfig.getInstance().getPipeMemoryAllocateMinSizeInBytes();
 
-  private long usedMemorySizeInBytes = 0;
+  private long usedMemorySizeInBytes;
+
+  private final Set<PipeMemoryBlock> allocatedBlocks = new HashSet<>();
 
   public synchronized PipeMemoryBlock forceAllocate(long sizeInBytes)
       throws PipeRuntimeOutOfMemoryCriticalException {
@@ -60,11 +66,11 @@ public class PipeMemoryManager {
 
     for (int i = 1; i <= MEMORY_ALLOCATE_MAX_RETRIES; i++) {
       if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= sizeInBytes) {
-        usedMemorySizeInBytes += sizeInBytes;
-        return new PipeMemoryBlock(sizeInBytes);
+        return registeredMemoryBlock(sizeInBytes);
       }
 
       try {
+        tryShrink4Allocate(sizeInBytes);
         this.wait(MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
@@ -146,18 +152,17 @@ public class PipeMemoryManager {
   }
 
   public synchronized PipeMemoryBlock tryAllocate(long sizeInBytes) {
-    return tryAllocate(sizeInBytes, (currentSize) -> currentSize * 2 / 3);
+    return tryAllocate(sizeInBytes, currentSize -> currentSize * 2 / 3);
   }
 
   public synchronized PipeMemoryBlock tryAllocate(
-      long sizeInBytes, Function<Long, Long> customAllocateStrategy) {
+      long sizeInBytes, LongUnaryOperator customAllocateStrategy) {
     if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
       return new PipeMemoryBlock(sizeInBytes);
     }
 
     if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= sizeInBytes) {
-      usedMemorySizeInBytes += sizeInBytes;
-      return new PipeMemoryBlock(sizeInBytes);
+      return registeredMemoryBlock(sizeInBytes);
     }
 
     long sizeToAllocateInBytes = sizeInBytes;
@@ -172,25 +177,83 @@ public class PipeMemoryManager {
             usedMemorySizeInBytes,
             sizeInBytes,
             sizeToAllocateInBytes);
-        usedMemorySizeInBytes += sizeToAllocateInBytes;
-        return new PipeMemoryBlock(sizeToAllocateInBytes);
+        return registeredMemoryBlock(sizeToAllocateInBytes);
       }
 
       sizeToAllocateInBytes =
           Math.max(
-              customAllocateStrategy.apply(sizeToAllocateInBytes),
+              customAllocateStrategy.applyAsLong(sizeToAllocateInBytes),
               MEMORY_ALLOCATE_MIN_SIZE_IN_BYTES);
     }
 
-    LOGGER.warn(
-        "tryAllocate: failed to allocate memory, "
-            + "total memory size {} bytes, used memory size {} bytes, "
-            + "requested memory size {} bytes",
-        TOTAL_MEMORY_SIZE_IN_BYTES,
-        usedMemorySizeInBytes,
-        sizeInBytes);
+    if (tryShrink4Allocate(sizeToAllocateInBytes)) {
+      LOGGER.info(
+          "tryAllocate: allocated memory, "
+              + "total memory size {} bytes, used memory size {} bytes, "
+              + "original requested memory size {} bytes,"
+              + "actual requested memory size {} bytes",
+          TOTAL_MEMORY_SIZE_IN_BYTES,
+          usedMemorySizeInBytes,
+          sizeInBytes,
+          sizeToAllocateInBytes);
+      return registeredMemoryBlock(sizeToAllocateInBytes);
+    } else {
+      LOGGER.warn(
+          "tryAllocate: failed to allocate memory, "
+              + "total memory size {} bytes, used memory size {} bytes, "
+              + "requested memory size {} bytes",
+          TOTAL_MEMORY_SIZE_IN_BYTES,
+          usedMemorySizeInBytes,
+          sizeInBytes);
+      return registeredMemoryBlock(0);
+    }
+  }
+
+  public synchronized boolean tryAllocate(
+      PipeMemoryBlock block, long memoryInBytesNeededToBeAllocated) {
+    if (!PIPE_MEMORY_MANAGEMENT_ENABLED || block == null || 
block.isReleased()) {
+      return false;
+    }
 
-    return new PipeMemoryBlock(0);
+    if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= 
memoryInBytesNeededToBeAllocated) {
+      usedMemorySizeInBytes += memoryInBytesNeededToBeAllocated;
+      block.setMemoryUsageInBytes(block.getMemoryUsageInBytes() + 
memoryInBytesNeededToBeAllocated);
+      return true;
+    }
+
+    return false;
+  }
+
+  private PipeMemoryBlock registeredMemoryBlock(long sizeInBytes) {
+    usedMemorySizeInBytes += sizeInBytes;
+
+    final PipeMemoryBlock returnedMemoryBlock = new 
PipeMemoryBlock(sizeInBytes);
+    allocatedBlocks.add(returnedMemoryBlock);
+    return returnedMemoryBlock;
+  }
+
+  private boolean tryShrink4Allocate(long sizeInBytes) {
+    final List<PipeMemoryBlock> shuffledBlocks = new 
ArrayList<>(allocatedBlocks);
+    Collections.shuffle(shuffledBlocks);
+
+    while (true) {
+      boolean hasAtLeastOneBlockShrinkable = false;
+      for (final PipeMemoryBlock block : shuffledBlocks) {
+        if (block.shrink()) {
+          hasAtLeastOneBlockShrinkable = true;
+          if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= 
sizeInBytes) {
+            return true;
+          }
+        }
+      }
+      if (!hasAtLeastOneBlockShrinkable) {
+        return false;
+      }
+    }
+  }
+
+  public synchronized void tryExpandAll() {
+    allocatedBlocks.forEach(PipeMemoryBlock::expand);
   }
 
   public synchronized void release(PipeMemoryBlock block) {
@@ -198,12 +261,26 @@ public class PipeMemoryManager {
       return;
     }
 
+    allocatedBlocks.remove(block);
     usedMemorySizeInBytes -= block.getMemoryUsageInBytes();
     block.markAsReleased();
 
     this.notifyAll();
   }
 
+  public synchronized boolean release(PipeMemoryBlock block, long sizeInBytes) 
{
+    if (!PIPE_MEMORY_MANAGEMENT_ENABLED || block == null || 
block.isReleased()) {
+      return false;
+    }
+
+    usedMemorySizeInBytes -= sizeInBytes;
+    block.setMemoryUsageInBytes(block.getMemoryUsageInBytes() - sizeInBytes);
+
+    this.notifyAll();
+
+    return true;
+  }
+
   public long getUsedMemorySizeInBytes() {
     return usedMemorySizeInBytes;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index ecb4bb3483a..b65bd1bb4e3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -36,6 +36,7 @@ import com.github.benmanes.caffeine.cache.CacheLoader;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.github.benmanes.caffeine.cache.Weigher;
+import com.google.common.util.concurrent.AtomicDouble;
 import org.checkerframework.checker.nullness.qual.NonNull;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
@@ -50,6 +51,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /** This cache is used by {@link WALEntryPosition}. */
 public class WALInsertNodeCache {
@@ -59,7 +61,9 @@ public class WALInsertNodeCache {
 
   // LRU cache, find Pair<ByteBuffer, InsertNode> by WALEntryPosition
   private final PipeMemoryBlock allocatedMemoryBlock;
-  private boolean isBatchLoadEnabled;
+  // Used to adjust the memory usage of the cache
+  private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
+  private final AtomicBoolean isBatchLoadEnabled = new AtomicBoolean(true);
   private final LoadingCache<WALEntryPosition, Pair<ByteBuffer, InsertNode>> 
lruCache;
 
   // ids of all pinned memTables
@@ -68,21 +72,55 @@ public class WALInsertNodeCache {
   private volatile boolean hasPipeRunning = false;
 
   private WALInsertNodeCache(Integer dataRegionId) {
+    final long requestedAllocateSize =
+        (long)
+            Math.min(
+                2 * CONFIG.getWalFileSizeThresholdInByte(),
+                CONFIG.getAllocateMemoryForPipe() * 0.8 / 5);
     allocatedMemoryBlock =
         PipeResourceManager.memory()
-            .tryAllocate(
-                (long)
-                    Math.min(
-                        2 * CONFIG.getWalFileSizeThresholdInByte(),
-                        CONFIG.getAllocateMemoryForPipe() * 0.8 / 5));
-    isBatchLoadEnabled =
-        allocatedMemoryBlock.getMemoryUsageInBytes() >= 
CONFIG.getWalFileSizeThresholdInByte();
+            .tryAllocate(requestedAllocateSize)
+            .setShrinkMethod((oldMemory) -> Math.max(oldMemory / 2, 1))
+            .setShrinkCallback(
+                (oldMemory, newMemory) -> {
+                  memoryUsageCheatFactor.set(
+                      memoryUsageCheatFactor.get() * ((double) oldMemory / 
newMemory));
+                  isBatchLoadEnabled.set(newMemory >= 
CONFIG.getWalFileSizeThresholdInByte());
+                  LOGGER.info(
+                      "WALInsertNodeCache.allocatedMemoryBlock of dataRegion 
{} has shrunk from {} to {}.",
+                      dataRegionId,
+                      oldMemory,
+                      newMemory);
+                })
+            .setExpandMethod(
+                (oldMemory) -> Math.min(Math.max(oldMemory, 1) * 2, 
requestedAllocateSize))
+            .setExpandCallback(
+                (oldMemory, newMemory) -> {
+                  memoryUsageCheatFactor.set(
+                      memoryUsageCheatFactor.get() / ((double) newMemory / 
oldMemory));
+                  isBatchLoadEnabled.set(newMemory >= 
CONFIG.getWalFileSizeThresholdInByte());
+                  LOGGER.info(
+                      "WALInsertNodeCache allocatedMemoryBlock of dataRegion 
{} has expanded from {} to {}.",
+                      dataRegionId,
+                      oldMemory,
+                      newMemory);
+                });
+    isBatchLoadEnabled.set(
+        allocatedMemoryBlock.getMemoryUsageInBytes() >= 
CONFIG.getWalFileSizeThresholdInByte());
     lruCache =
         Caffeine.newBuilder()
             .maximumWeight(allocatedMemoryBlock.getMemoryUsageInBytes())
             .weigher(
                 (Weigher<WALEntryPosition, Pair<ByteBuffer, InsertNode>>)
-                    (position, pair) -> position.getSize())
+                    (position, pair) -> {
+                      final long weightInLong =
+                          (long) (position.getSize() * 
memoryUsageCheatFactor.get());
+                      if (weightInLong <= 0) {
+                        return Integer.MAX_VALUE;
+                      }
+                      final int weightInInt = (int) weightInLong;
+                      return weightInInt != weightInLong ? Integer.MAX_VALUE : 
weightInInt;
+                    })
             .recordStats()
             .build(new WALInsertNodeCacheLoader());
     PipeWALInsertNodeCacheMetrics.getInstance().register(this, dataRegionId);
@@ -153,7 +191,7 @@ public class WALInsertNodeCache {
     hasPipeRunning = true;
 
     final Pair<ByteBuffer, InsertNode> pair =
-        isBatchLoadEnabled
+        isBatchLoadEnabled.get()
             ? lruCache.getAll(Collections.singleton(position)).get(position)
             : lruCache.get(position);
 
@@ -282,12 +320,12 @@ public class WALInsertNodeCache {
 
   @TestOnly
   public boolean isBatchLoadEnabled() {
-    return isBatchLoadEnabled;
+    return isBatchLoadEnabled.get();
   }
 
   @TestOnly
   public void setIsBatchLoadEnabled(boolean isBatchLoadEnabled) {
-    this.isBatchLoadEnabled = isBatchLoadEnabled;
+    this.isBatchLoadEnabled.set(isBatchLoadEnabled);
   }
 
   @TestOnly
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 7667b3af73a..e2340b02046 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -131,7 +131,7 @@ public enum ThreadName {
   PIPE_RUNTIME_META_SYNCER("Pipe-Runtime-Meta-Syncer"),
   PIPE_RUNTIME_HEARTBEAT("Pipe-Runtime-Heartbeat"),
   PIPE_RUNTIME_PROCEDURE_SUBMITTER("Pipe-Runtime-Procedure-Submitter"),
-  PIPE_RUNTIME_CRON_EVENT_INJECTOR("Pipe-Runtime-Cron-Event-Injector"),
+  PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR("Pipe-Runtime-Periodical-Job-Executor"),
   PIPE_ASYNC_CONNECTOR_CLIENT_POOL("Pipe-Async-Connector-Client-Pool"),
   PIPE_WAL_RESOURCE_TTL_CHECKER("Pipe-WAL-Resource-TTL-Checker"),
   PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"),
@@ -267,7 +267,7 @@ public enum ThreadName {
               PIPE_RUNTIME_META_SYNCER,
               PIPE_RUNTIME_HEARTBEAT,
               PIPE_RUNTIME_PROCEDURE_SUBMITTER,
-              PIPE_RUNTIME_CRON_EVENT_INJECTOR,
+              PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR,
               PIPE_ASYNC_CONNECTOR_CLIENT_POOL,
               PIPE_WAL_RESOURCE_TTL_CHECKER,
               PIPE_RECEIVER_AIR_GAP_AGENT,
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index e2043142971..5ca93c3bd07 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -192,6 +192,7 @@ public class CommonConfig {
   private int pipeMemoryAllocateMaxRetries = 10;
   private long pipeMemoryAllocateMinSizeInBytes = 32;
   private long pipeMemoryAllocateForTsFileSequenceReaderInBytes = 2 * 1024 * 
1024; // 2MB
+  private long pipeMemoryExpanderIntervalSeconds = 3 * 60; // 3Min
 
   /** Whether to use persistent schema mode. */
   private String schemaEngineMode = "Memory";
@@ -779,6 +780,14 @@ public class CommonConfig {
         pipeMemoryAllocateForTsFileSequenceReaderInBytes;
   }
 
+  public long getPipeMemoryExpanderIntervalSeconds() {
+    return pipeMemoryExpanderIntervalSeconds;
+  }
+
+  public void setPipeMemoryExpanderIntervalSeconds(long 
pipeMemoryExpanderIntervalSeconds) {
+    this.pipeMemoryExpanderIntervalSeconds = pipeMemoryExpanderIntervalSeconds;
+  }
+
   public int getPipeMemoryAllocateMaxRetries() {
     return pipeMemoryAllocateMaxRetries;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 6893e9b85ba..a7ae02c1bea 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -428,6 +428,11 @@ public class CommonDescriptor {
             properties.getProperty(
                 "pipe_memory_allocate_for_tsfile_sequence_reader_in_bytes",
                 
String.valueOf(config.getPipeMemoryAllocateForTsFileSequenceReaderInBytes()))));
+    config.setPipeMemoryExpanderIntervalSeconds(
+        Long.parseLong(
+            properties.getProperty(
+                "pipe_memory_expander_interval_seconds",
+                
String.valueOf(config.getPipeMemoryExpanderIntervalSeconds()))));
   }
 
   public void loadGlobalConfig(TGlobalConfig globalConfig) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 4718f4df1e7..c70def090c5 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -191,6 +191,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeMemoryAllocateForTsFileSequenceReaderInBytes();
   }
 
+  public long getPipeMemoryExpanderIntervalSeconds() {
+    return COMMON_CONFIG.getPipeMemoryExpanderIntervalSeconds();
+  }
+
   /////////////////////////////// Utils ///////////////////////////////
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeConfig.class);


Reply via email to