This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch fix-pipe-stuck-by-file in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 42f473e0a4ca5fa1ddcf95b12a7cc12da46d7dea Author: Steve Yurong Su <[email protected]> AuthorDate: Thu Mar 13 12:31:30 2025 +0800 Pipe: Fix pipe executor stuck by unlimited file event memory allocation retries --- .../async/handler/PipeTransferTsFileHandler.java | 15 ++++++++++++++- .../common/tsfile/PipeTsFileInsertionEvent.java | 4 +++- .../db/pipe/resource/memory/PipeMemoryManager.java | 22 ++++++++++++++-------- .../pipe/resource/tsfile/PipeTsFileResource.java | 2 +- .../dataregion/wal/utils/WALInsertNodeCache.java | 8 +++++--- 5 files changed, 37 insertions(+), 14 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java index 3423d980079..5de2ef38948 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java @@ -113,7 +113,13 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { this.transferMod = transferMod; currentFile = transferMod ? modFile : tsFile; - waitForResourceEnough4Slicing(Integer.MAX_VALUE); + // NOTE: Waiting for resource enough for slicing here may cause deadlock! + // TsFile events are producing and consuming at the same time, and the memory of a TsFile + // event is not released until the TsFile is sealed. So if the memory is not enough for slicing, + // the TsFile event will be blocked and waiting for the memory to be released. At the same time, + // the memory of the TsFile event is not released, so the memory is not enough for slicing. This + // will cause a deadlock. + waitForResourceEnough4Slicing((long) ((1 + Math.random()) * 20 * 1000)); // 20 - 40 seconds readFileBufferSize = (int) Math.min( @@ -395,7 +401,14 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { memoryBlock.close(); } + /** + * @param timeoutMs CAN NOT BE UNLIMITED, otherwise it may cause deadlock. + */ private void waitForResourceEnough4Slicing(final long timeoutMs) throws InterruptedException { + if (!PipeConfig.getInstance().isPipeConnectorReadFileBufferMemoryControlEnabled()) { + return; + } + final PipeMemoryManager memoryManager = PipeDataNodeResourceManager.memory(); if (memoryManager.isEnough4TsFileSlicing()) { return; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 15378fc6482..4f9823709a3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -402,7 +402,9 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent @Override public Iterable<TabletInsertionEvent> toTabletInsertionEvents() throws PipeException { - return toTabletInsertionEvents(Long.MAX_VALUE); + // 20 - 40 seconds for waiting + // Can not be unlimited or will cause deadlock + return toTabletInsertionEvents((long) ((1 + Math.random()) * 20 * 1000)); } public Iterable<TabletInsertionEvent> toTabletInsertionEvents(final long timeoutMs) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java index 6a2f63858f6..3ee55a5d5e6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java @@ -332,13 +332,13 @@ public class PipeMemoryManager { } /** - * Allocate a {@link PipeMemoryBlock} for pipe only if memory already used is less than the - * specified threshold. + * Allocate a {@link PipeMemoryBlock} for pipe only if memory used after allocation is less than + * the specified threshold. * * @param sizeInBytes size of memory needed to allocate * @param usedThreshold proportion of memory used, ranged from 0.0 to 1.0 - * @return {@code null} if the proportion of memory already used exceeds {@code usedThreshold}. - * Will return a memory block otherwise. + * @return {@code null} if the proportion of memory used after allocation exceeds {@code + * usedThreshold}. Will return a memory block otherwise. */ public synchronized PipeMemoryBlock forceAllocateIfSufficient( long sizeInBytes, float usedThreshold) { @@ -354,13 +354,19 @@ public class PipeMemoryManager { return registerMemoryBlock(0); } - if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= sizeInBytes - && (float) usedMemorySizeInBytes / TOTAL_MEMORY_SIZE_IN_BYTES < usedThreshold) { + if (sizeInBytes + usedMemorySizeInBytes > TOTAL_MEMORY_SIZE_IN_BYTES) { + return null; + } + + if ((float) (usedMemorySizeInBytes + sizeInBytes) / TOTAL_MEMORY_SIZE_IN_BYTES + < usedThreshold) { return forceAllocate(sizeInBytes); } else { - long memoryToShrink = + final long memoryToShrink = Math.max( - usedMemorySizeInBytes - (long) (TOTAL_MEMORY_SIZE_IN_BYTES * usedThreshold), + usedMemorySizeInBytes + + sizeInBytes + - (long) (TOTAL_MEMORY_SIZE_IN_BYTES * usedThreshold), sizeInBytes); if (tryShrink4Allocate(memoryToShrink)) { return forceAllocate(sizeInBytes); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java index 1c2a46e9b59..fec0c837138 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java @@ -49,7 +49,7 @@ public class PipeTsFileResource implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(PipeTsFileResource.class); public static final long TSFILE_MIN_TIME_TO_LIVE_IN_MS = 1000L * 20; - private static final float MEMORY_SUFFICIENT_THRESHOLD = 0.5f; + private static final float MEMORY_SUFFICIENT_THRESHOLD = 0.7f; private final File hardlinkOrCopiedFile; private final boolean isTsFile; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java index 0541ff96188..efba8703072 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java @@ -77,9 +77,11 @@ public class WALInsertNodeCache { final long requestedAllocateSize = (long) Math.min( - (double) PIPE_CONFIG.getPipeMaxAllowedPinnedMemTableCount() - * CONFIG.getWalFileSizeThresholdInByte(), - CONFIG.getAllocateMemoryForPipe() * 0.45); + 1.0 + * PIPE_CONFIG.getPipeMaxAllowedPinnedMemTableCount() + * CONFIG.getWalFileSizeThresholdInByte() + / CONFIG.getDataRegionNum(), + 0.5 * CONFIG.getAllocateMemoryForPipe() / CONFIG.getDataRegionNum()); allocatedMemoryBlock = PipeDataNodeResourceManager.memory() .tryAllocate(requestedAllocateSize)
