This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch fix-pipe-stuck-by-file
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 42f473e0a4ca5fa1ddcf95b12a7cc12da46d7dea
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Mar 13 12:31:30 2025 +0800

    Pipe: Fix pipe executor stuck by unlimited file event memory allocation 
retries
---
 .../async/handler/PipeTransferTsFileHandler.java   | 15 ++++++++++++++-
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  4 +++-
 .../db/pipe/resource/memory/PipeMemoryManager.java | 22 ++++++++++++++--------
 .../pipe/resource/tsfile/PipeTsFileResource.java   |  2 +-
 .../dataregion/wal/utils/WALInsertNodeCache.java   |  8 +++++---
 5 files changed, 37 insertions(+), 14 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 3423d980079..5de2ef38948 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -113,7 +113,13 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     this.transferMod = transferMod;
     currentFile = transferMod ? modFile : tsFile;
 
-    waitForResourceEnough4Slicing(Integer.MAX_VALUE);
+    // NOTE: Waiting for resource enough for slicing here may cause deadlock!
+    // TsFile events are producing and consuming at the same time, and the 
memory of a TsFile
+    // event is not released until the TsFile is sealed. So if the memory is 
not enough for slicing,
+    // the TsFile event will be blocked and waiting for the memory to be 
released. At the same time,
+    // the memory of the TsFile event is not released, so the memory is not 
enough for slicing. This
+    // will cause a deadlock.
+    waitForResourceEnough4Slicing((long) ((1 + Math.random()) * 20 * 1000)); 
// 20 - 40 seconds
     readFileBufferSize =
         (int)
             Math.min(
@@ -395,7 +401,14 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     memoryBlock.close();
   }
 
+  /**
+   * @param timeoutMs CAN NOT BE UNLIMITED, otherwise it may cause deadlock.
+   */
   private void waitForResourceEnough4Slicing(final long timeoutMs) throws 
InterruptedException {
+    if 
(!PipeConfig.getInstance().isPipeConnectorReadFileBufferMemoryControlEnabled()) 
{
+      return;
+    }
+
     final PipeMemoryManager memoryManager = 
PipeDataNodeResourceManager.memory();
     if (memoryManager.isEnough4TsFileSlicing()) {
       return;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 15378fc6482..4f9823709a3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -402,7 +402,9 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
 
   @Override
   public Iterable<TabletInsertionEvent> toTabletInsertionEvents() throws 
PipeException {
-    return toTabletInsertionEvents(Long.MAX_VALUE);
+    // 20 - 40 seconds for waiting
+    // Can not be unlimited or will cause deadlock
+    return toTabletInsertionEvents((long) ((1 + Math.random()) * 20 * 1000));
   }
 
   public Iterable<TabletInsertionEvent> toTabletInsertionEvents(final long 
timeoutMs)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index 6a2f63858f6..3ee55a5d5e6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -332,13 +332,13 @@ public class PipeMemoryManager {
   }
 
   /**
-   * Allocate a {@link PipeMemoryBlock} for pipe only if memory already used 
is less than the
-   * specified threshold.
+   * Allocate a {@link PipeMemoryBlock} for pipe only if memory used after 
allocation is less than
+   * the specified threshold.
    *
    * @param sizeInBytes size of memory needed to allocate
    * @param usedThreshold proportion of memory used, ranged from 0.0 to 1.0
-   * @return {@code null} if the proportion of memory already used exceeds 
{@code usedThreshold}.
-   *     Will return a memory block otherwise.
+   * @return {@code null} if the proportion of memory used after allocation 
exceeds {@code
+   *     usedThreshold}. Will return a memory block otherwise.
    */
   public synchronized PipeMemoryBlock forceAllocateIfSufficient(
       long sizeInBytes, float usedThreshold) {
@@ -354,13 +354,19 @@ public class PipeMemoryManager {
       return registerMemoryBlock(0);
     }
 
-    if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= sizeInBytes
-        && (float) usedMemorySizeInBytes / TOTAL_MEMORY_SIZE_IN_BYTES < 
usedThreshold) {
+    if (sizeInBytes + usedMemorySizeInBytes > TOTAL_MEMORY_SIZE_IN_BYTES) {
+      return null;
+    }
+
+    if ((float) (usedMemorySizeInBytes + sizeInBytes) / 
TOTAL_MEMORY_SIZE_IN_BYTES
+        < usedThreshold) {
       return forceAllocate(sizeInBytes);
     } else {
-      long memoryToShrink =
+      final long memoryToShrink =
           Math.max(
-              usedMemorySizeInBytes - (long) (TOTAL_MEMORY_SIZE_IN_BYTES * 
usedThreshold),
+              usedMemorySizeInBytes
+                  + sizeInBytes
+                  - (long) (TOTAL_MEMORY_SIZE_IN_BYTES * usedThreshold),
               sizeInBytes);
       if (tryShrink4Allocate(memoryToShrink)) {
         return forceAllocate(sizeInBytes);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
index 1c2a46e9b59..fec0c837138 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
@@ -49,7 +49,7 @@ public class PipeTsFileResource implements AutoCloseable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTsFileResource.class);
 
   public static final long TSFILE_MIN_TIME_TO_LIVE_IN_MS = 1000L * 20;
-  private static final float MEMORY_SUFFICIENT_THRESHOLD = 0.5f;
+  private static final float MEMORY_SUFFICIENT_THRESHOLD = 0.7f;
 
   private final File hardlinkOrCopiedFile;
   private final boolean isTsFile;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index 0541ff96188..efba8703072 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -77,9 +77,11 @@ public class WALInsertNodeCache {
     final long requestedAllocateSize =
         (long)
             Math.min(
-                (double) PIPE_CONFIG.getPipeMaxAllowedPinnedMemTableCount()
-                    * CONFIG.getWalFileSizeThresholdInByte(),
-                CONFIG.getAllocateMemoryForPipe() * 0.45);
+                1.0
+                    * PIPE_CONFIG.getPipeMaxAllowedPinnedMemTableCount()
+                    * CONFIG.getWalFileSizeThresholdInByte()
+                    / CONFIG.getDataRegionNum(),
+                0.5 * CONFIG.getAllocateMemoryForPipe() / 
CONFIG.getDataRegionNum());
     allocatedMemoryBlock =
         PipeDataNodeResourceManager.memory()
             .tryAllocate(requestedAllocateSize)

Reply via email to