This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch pipe-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/pipe-fix by this push:
     new 0d3054d47c4 fix
0d3054d47c4 is described below

commit 0d3054d47c4221bc272cf66aa01de9b1d010602e
Author: Caideyipi <[email protected]>
AuthorDate: Fri Apr 24 10:00:59 2026 +0800

    fix
---
 .../iotdb/db/pipe/resource/memory/PipeMemoryBlock.java       |  5 +++++
 .../iotdb/db/pipe/resource/memory/PipeMemoryManager.java     | 12 +++++++++++-
 2 files changed, 16 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
index 5f4fcf0dd77..ee71c94508c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -62,6 +63,7 @@ public class PipeMemoryBlock implements AutoCloseable {
 
   public PipeMemoryBlock setShrinkMethod(final LongUnaryOperator shrinkMethod) 
{
     this.shrinkMethod.set(shrinkMethod);
+    pipeMemoryManager.addShrinkableBlock(this);
     return this;
   }
 
@@ -177,6 +179,9 @@ public class PipeMemoryBlock implements AutoCloseable {
         if (lock.tryLock(50, TimeUnit.MICROSECONDS)) {
           try {
             pipeMemoryManager.release(this);
+            if (Objects.nonNull(shrinkMethod.get())) {
+              pipeMemoryManager.removeShrinkableBlock(this);
+            }
             if (isInterrupted) {
               LOGGER.warn("{} is released after thread interruption.", this);
             }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index ada788363d7..797d65fe6fa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -61,6 +61,7 @@ public class PipeMemoryManager {
 
   // Only non-zero memory blocks will be added to this set.
   private final Set<PipeMemoryBlock> allocatedBlocks = new HashSet<>();
+  private final Set<PipeMemoryBlock> shrinkableBlocks = new HashSet<>();
 
   public PipeMemoryManager() {
     PipeDataNodeAgent.runtime()
@@ -528,8 +529,9 @@ public class PipeMemoryManager {
     return returnedMemoryBlock;
   }
 
+  // Single-threaded logic
   private boolean tryShrinkUntilFreeMemorySatisfy(long sizeInBytes) {
-    final List<PipeMemoryBlock> shuffledBlocks = new 
ArrayList<>(allocatedBlocks);
+    final List<PipeMemoryBlock> shuffledBlocks = new 
ArrayList<>(shrinkableBlocks);
     Collections.shuffle(shuffledBlocks);
 
     while (true) {
@@ -549,6 +551,14 @@ public class PipeMemoryManager {
     }
   }
 
+  void addShrinkableBlock(final PipeMemoryBlock block) {
+    shrinkableBlocks.add(block);
+  }
+
+  void removeShrinkableBlock(final PipeMemoryBlock block) {
+    shrinkableBlocks.remove(block);
+  }
+
   public synchronized void tryExpandAllAndCheckConsistency() {
     allocatedBlocks.forEach(PipeMemoryBlock::expand);
 

Reply via email to