This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new edfd97d9c6a Pipe: Optimized the shrink/expand method of blocks (#17555)
edfd97d9c6a is described below

commit edfd97d9c6a3514a711d1227fb6be3b5f23387e5
Author: Caideyipi <[email protected]>
AuthorDate: Wed Apr 29 10:10:46 2026 +0800

    Pipe: Optimized the shrink/expand method of blocks (#17555)
---
 .../db/pipe/resource/memory/PipeMemoryBlock.java   |  9 +++
 .../db/pipe/resource/memory/PipeMemoryManager.java | 91 +++++++++++++---------
 2 files changed, 65 insertions(+), 35 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
index 07f5b904523..62069a2fb25 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -62,6 +63,7 @@ public class PipeMemoryBlock implements AutoCloseable {
 
   public PipeMemoryBlock setShrinkMethod(final LongUnaryOperator shrinkMethod) 
{
     this.shrinkMethod.set(shrinkMethod);
+    pipeMemoryManager.addShrinkableBlock(this);
     return this;
   }
 
@@ -72,6 +74,7 @@ public class PipeMemoryBlock implements AutoCloseable {
 
   public PipeMemoryBlock setExpandMethod(final LongUnaryOperator extendMethod) 
{
     this.expandMethod.set(extendMethod);
+    pipeMemoryManager.addExpandableBlock(this);
     return this;
   }
 
@@ -177,6 +180,12 @@ public class PipeMemoryBlock implements AutoCloseable {
         if (lock.tryLock(50, TimeUnit.MICROSECONDS)) {
           try {
             pipeMemoryManager.release(this);
+            if (Objects.nonNull(shrinkMethod.get())) {
+              pipeMemoryManager.removeShrinkableBlock(this);
+            }
+            if (Objects.nonNull(expandMethod.get())) {
+              pipeMemoryManager.removeExpandableBlock(this);
+            }
             if (isInterrupted) {
               LOGGER.warn("{} is released after thread interruption.", this);
             }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index 87be4d5fb62..0d49487f750 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -56,6 +56,8 @@ public class PipeMemoryManager {
 
   // Only non-zero memory blocks will be added to this set.
   private final Set<PipeMemoryBlock> allocatedBlocks = new HashSet<>();
+  private final Set<PipeMemoryBlock> shrinkableBlocks = new HashSet<>();
+  private final Set<PipeMemoryBlock> expandableBlocks = new HashSet<>();
 
   public PipeMemoryManager() {
     PipeDataNodeAgent.runtime()
@@ -531,8 +533,9 @@ public class PipeMemoryManager {
     return returnedMemoryBlock;
   }
 
+  // Single-threaded logic
   private boolean tryShrinkUntilFreeMemorySatisfy(long sizeInBytes) {
-    final List<PipeMemoryBlock> shuffledBlocks = new 
ArrayList<>(allocatedBlocks);
+    final List<PipeMemoryBlock> shuffledBlocks = new 
ArrayList<>(shrinkableBlocks);
     Collections.shuffle(shuffledBlocks);
 
     while (true) {
@@ -551,46 +554,64 @@ public class PipeMemoryManager {
     }
   }
 
+  void addShrinkableBlock(final PipeMemoryBlock block) {
+    shrinkableBlocks.add(block);
+  }
+
+  void removeShrinkableBlock(final PipeMemoryBlock block) {
+    shrinkableBlocks.remove(block);
+  }
+
   public synchronized void tryExpandAllAndCheckConsistency() {
-    allocatedBlocks.forEach(PipeMemoryBlock::expand);
+    expandableBlocks.forEach(PipeMemoryBlock::expand);
+
+    if (LOGGER.isDebugEnabled()) {
+      final long blockSum =
+          
allocatedBlocks.stream().mapToLong(PipeMemoryBlock::getMemoryUsageInBytes).sum();
+      if (blockSum != usedMemorySizeInBytes) {
+        LOGGER.debug(
+            "tryExpandAllAndCheckConsistency: memory usage is not consistent 
with allocated blocks,"
+                + " usedMemorySizeInBytes is {} but sum of all blocks is {}",
+            usedMemorySizeInBytes,
+            blockSum);
+      }
 
-    long blockSum =
-        
allocatedBlocks.stream().mapToLong(PipeMemoryBlock::getMemoryUsageInBytes).sum();
-    if (blockSum != usedMemorySizeInBytes) {
-      LOGGER.warn(
-          "tryExpandAllAndCheckConsistency: memory usage is not consistent 
with allocated blocks,"
-              + " usedMemorySizeInBytes is {} but sum of all blocks is {}",
-          usedMemorySizeInBytes,
-          blockSum);
-    }
+      final long tabletBlockSum =
+          allocatedBlocks.stream()
+              .filter(PipeTabletMemoryBlock.class::isInstance)
+              .mapToLong(PipeMemoryBlock::getMemoryUsageInBytes)
+              .sum();
+      if (tabletBlockSum != usedMemorySizeInBytesOfTablets) {
+        LOGGER.debug(
+            "tryExpandAllAndCheckConsistency: memory usage of tablets is not 
consistent with allocated blocks,"
+                + " usedMemorySizeInBytesOfTablets is {} but sum of all tablet 
blocks is {}",
+            usedMemorySizeInBytesOfTablets,
+            tabletBlockSum);
+      }
 
-    long tabletBlockSum =
-        allocatedBlocks.stream()
-            .filter(PipeTabletMemoryBlock.class::isInstance)
-            .mapToLong(PipeMemoryBlock::getMemoryUsageInBytes)
-            .sum();
-    if (tabletBlockSum != usedMemorySizeInBytesOfTablets) {
-      LOGGER.warn(
-          "tryExpandAllAndCheckConsistency: memory usage of tablets is not 
consistent with allocated blocks,"
-              + " usedMemorySizeInBytesOfTablets is {} but sum of all tablet 
blocks is {}",
-          usedMemorySizeInBytesOfTablets,
-          tabletBlockSum);
-    }
-
-    long tsFileBlockSum =
-        allocatedBlocks.stream()
-            .filter(PipeTsFileMemoryBlock.class::isInstance)
-            .mapToLong(PipeMemoryBlock::getMemoryUsageInBytes)
-            .sum();
-    if (tsFileBlockSum != usedMemorySizeInBytesOfTsFiles) {
-      LOGGER.warn(
-          "tryExpandAllAndCheckConsistency: memory usage of tsfiles is not 
consistent with allocated blocks,"
-              + " usedMemorySizeInBytesOfTsFiles is {} but sum of all tsfile 
blocks is {}",
-          usedMemorySizeInBytesOfTsFiles,
-          tsFileBlockSum);
+      final long tsFileBlockSum =
+          allocatedBlocks.stream()
+              .filter(PipeTsFileMemoryBlock.class::isInstance)
+              .mapToLong(PipeMemoryBlock::getMemoryUsageInBytes)
+              .sum();
+      if (tsFileBlockSum != usedMemorySizeInBytesOfTsFiles) {
+        LOGGER.debug(
+            "tryExpandAllAndCheckConsistency: memory usage of tsfiles is not 
consistent with allocated blocks,"
+                + " usedMemorySizeInBytesOfTsFiles is {} but sum of all tsfile 
blocks is {}",
+            usedMemorySizeInBytesOfTsFiles,
+            tsFileBlockSum);
+      }
     }
   }
 
+  void addExpandableBlock(final PipeMemoryBlock block) {
+    expandableBlocks.add(block);
+  }
+
+  void removeExpandableBlock(final PipeMemoryBlock block) {
+    expandableBlocks.remove(block);
+  }
+
   public synchronized void release(PipeMemoryBlock block) {
     if (!PIPE_MEMORY_MANAGEMENT_ENABLED || block == null || 
block.isReleased()) {
       return;

Reply via email to