This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new edfd97d9c6a Pipe: Optimized the shrink/expand method of blocks (#17555)
edfd97d9c6a is described below
commit edfd97d9c6a3514a711d1227fb6be3b5f23387e5
Author: Caideyipi <[email protected]>
AuthorDate: Wed Apr 29 10:10:46 2026 +0800
Pipe: Optimized the shrink/expand method of blocks (#17555)
---
.../db/pipe/resource/memory/PipeMemoryBlock.java | 9 +++
.../db/pipe/resource/memory/PipeMemoryManager.java | 91 +++++++++++++---------
2 files changed, 65 insertions(+), 35 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
index 07f5b904523..62069a2fb25 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -62,6 +63,7 @@ public class PipeMemoryBlock implements AutoCloseable {
public PipeMemoryBlock setShrinkMethod(final LongUnaryOperator shrinkMethod)
{
this.shrinkMethod.set(shrinkMethod);
+ pipeMemoryManager.addShrinkableBlock(this);
return this;
}
@@ -72,6 +74,7 @@ public class PipeMemoryBlock implements AutoCloseable {
public PipeMemoryBlock setExpandMethod(final LongUnaryOperator extendMethod)
{
this.expandMethod.set(extendMethod);
+ pipeMemoryManager.addExpandableBlock(this);
return this;
}
@@ -177,6 +180,12 @@ public class PipeMemoryBlock implements AutoCloseable {
if (lock.tryLock(50, TimeUnit.MICROSECONDS)) {
try {
pipeMemoryManager.release(this);
+ if (Objects.nonNull(shrinkMethod.get())) {
+ pipeMemoryManager.removeShrinkableBlock(this);
+ }
+ if (Objects.nonNull(expandMethod.get())) {
+ pipeMemoryManager.removeExpandableBlock(this);
+ }
if (isInterrupted) {
LOGGER.warn("{} is released after thread interruption.", this);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index 87be4d5fb62..0d49487f750 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -56,6 +56,8 @@ public class PipeMemoryManager {
// Only non-zero memory blocks will be added to this set.
private final Set<PipeMemoryBlock> allocatedBlocks = new HashSet<>();
+ private final Set<PipeMemoryBlock> shrinkableBlocks = new HashSet<>();
+ private final Set<PipeMemoryBlock> expandableBlocks = new HashSet<>();
public PipeMemoryManager() {
PipeDataNodeAgent.runtime()
@@ -531,8 +533,9 @@ public class PipeMemoryManager {
return returnedMemoryBlock;
}
+ // Single-threaded logic
private boolean tryShrinkUntilFreeMemorySatisfy(long sizeInBytes) {
- final List<PipeMemoryBlock> shuffledBlocks = new
ArrayList<>(allocatedBlocks);
+ final List<PipeMemoryBlock> shuffledBlocks = new
ArrayList<>(shrinkableBlocks);
Collections.shuffle(shuffledBlocks);
while (true) {
@@ -551,46 +554,64 @@ public class PipeMemoryManager {
}
}
+ void addShrinkableBlock(final PipeMemoryBlock block) {
+ shrinkableBlocks.add(block);
+ }
+
+ void removeShrinkableBlock(final PipeMemoryBlock block) {
+ shrinkableBlocks.remove(block);
+ }
+
public synchronized void tryExpandAllAndCheckConsistency() {
- allocatedBlocks.forEach(PipeMemoryBlock::expand);
+ expandableBlocks.forEach(PipeMemoryBlock::expand);
+
+ if (LOGGER.isDebugEnabled()) {
+ final long blockSum =
+
allocatedBlocks.stream().mapToLong(PipeMemoryBlock::getMemoryUsageInBytes).sum();
+ if (blockSum != usedMemorySizeInBytes) {
+ LOGGER.debug(
+ "tryExpandAllAndCheckConsistency: memory usage is not consistent
with allocated blocks,"
+ + " usedMemorySizeInBytes is {} but sum of all blocks is {}",
+ usedMemorySizeInBytes,
+ blockSum);
+ }
- long blockSum =
-
allocatedBlocks.stream().mapToLong(PipeMemoryBlock::getMemoryUsageInBytes).sum();
- if (blockSum != usedMemorySizeInBytes) {
- LOGGER.warn(
- "tryExpandAllAndCheckConsistency: memory usage is not consistent
with allocated blocks,"
- + " usedMemorySizeInBytes is {} but sum of all blocks is {}",
- usedMemorySizeInBytes,
- blockSum);
- }
+ final long tabletBlockSum =
+ allocatedBlocks.stream()
+ .filter(PipeTabletMemoryBlock.class::isInstance)
+ .mapToLong(PipeMemoryBlock::getMemoryUsageInBytes)
+ .sum();
+ if (tabletBlockSum != usedMemorySizeInBytesOfTablets) {
+ LOGGER.debug(
+ "tryExpandAllAndCheckConsistency: memory usage of tablets is not
consistent with allocated blocks,"
+ + " usedMemorySizeInBytesOfTablets is {} but sum of all tablet
blocks is {}",
+ usedMemorySizeInBytesOfTablets,
+ tabletBlockSum);
+ }
- long tabletBlockSum =
- allocatedBlocks.stream()
- .filter(PipeTabletMemoryBlock.class::isInstance)
- .mapToLong(PipeMemoryBlock::getMemoryUsageInBytes)
- .sum();
- if (tabletBlockSum != usedMemorySizeInBytesOfTablets) {
- LOGGER.warn(
- "tryExpandAllAndCheckConsistency: memory usage of tablets is not
consistent with allocated blocks,"
- + " usedMemorySizeInBytesOfTablets is {} but sum of all tablet
blocks is {}",
- usedMemorySizeInBytesOfTablets,
- tabletBlockSum);
- }
-
- long tsFileBlockSum =
- allocatedBlocks.stream()
- .filter(PipeTsFileMemoryBlock.class::isInstance)
- .mapToLong(PipeMemoryBlock::getMemoryUsageInBytes)
- .sum();
- if (tsFileBlockSum != usedMemorySizeInBytesOfTsFiles) {
- LOGGER.warn(
- "tryExpandAllAndCheckConsistency: memory usage of tsfiles is not
consistent with allocated blocks,"
- + " usedMemorySizeInBytesOfTsFiles is {} but sum of all tsfile
blocks is {}",
- usedMemorySizeInBytesOfTsFiles,
- tsFileBlockSum);
+ final long tsFileBlockSum =
+ allocatedBlocks.stream()
+ .filter(PipeTsFileMemoryBlock.class::isInstance)
+ .mapToLong(PipeMemoryBlock::getMemoryUsageInBytes)
+ .sum();
+ if (tsFileBlockSum != usedMemorySizeInBytesOfTsFiles) {
+ LOGGER.debug(
+ "tryExpandAllAndCheckConsistency: memory usage of tsfiles is not
consistent with allocated blocks,"
+ + " usedMemorySizeInBytesOfTsFiles is {} but sum of all tsfile
blocks is {}",
+ usedMemorySizeInBytesOfTsFiles,
+ tsFileBlockSum);
+ }
}
}
+ void addExpandableBlock(final PipeMemoryBlock block) {
+ expandableBlocks.add(block);
+ }
+
+ void removeExpandableBlock(final PipeMemoryBlock block) {
+ expandableBlocks.remove(block);
+ }
+
public synchronized void release(PipeMemoryBlock block) {
if (!PIPE_MEMORY_MANAGEMENT_ENABLED || block == null ||
block.isReleased()) {
return;