This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new d4605c4128b Pipe: Add retry for tablet batch req to avoid 
retransmission when memory is insufficient (#15715) (#15771)
d4605c4128b is described below

commit d4605c4128bab845fe0a8f621e5e3288d6a7fb6e
Author: Zikun Ma <[email protected]>
AuthorDate: Wed Jun 18 18:41:22 2025 +0800

    Pipe: Add retry for tablet batch req to avoid retransmission when memory is 
insufficient (#15715) (#15771)
---
 .../protocol/thrift/IoTDBDataNodeReceiver.java     | 70 +++++++++++++---------
 1 file changed, 43 insertions(+), 27 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 7719a338fb6..00a7e4c46a0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -399,10 +399,10 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
             Stream.of(
                     statementPair.getLeft().isEmpty()
                         ? RpcUtils.SUCCESS_STATUS
-                        : 
executeStatementAndAddRedirectInfo(statementPair.getLeft()),
+                        : 
executeBatchStatementAndAddRedirectInfo(statementPair.getLeft()),
                     statementPair.getRight().isEmpty()
                         ? RpcUtils.SUCCESS_STATUS
-                        : 
executeStatementAndAddRedirectInfo(statementPair.getRight()))
+                        : 
executeBatchStatementAndAddRedirectInfo(statementPair.getRight()))
                 .collect(Collectors.toList())));
   }
 
@@ -594,8 +594,8 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
    * request. So for each sub-status which needs to redirect, we record the 
device path using the
    * message field.
    */
-  private TSStatus executeStatementAndAddRedirectInfo(final 
InsertBaseStatement statement) {
-    final TSStatus result = executeStatementAndClassifyExceptions(statement);
+  private TSStatus executeBatchStatementAndAddRedirectInfo(final 
InsertBaseStatement statement) {
+    final TSStatus result = executeStatementAndClassifyExceptions(statement, 
5);
 
     if (result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
         && result.getSubStatusSize() > 0) {
@@ -631,18 +631,50 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   }
 
   private TSStatus executeStatementAndClassifyExceptions(final Statement 
statement) {
+    return executeStatementAndClassifyExceptions(statement, 1);
+  }
+
+  private TSStatus executeStatementAndClassifyExceptions(
+      final Statement statement, final int tryCount) {
     long estimatedMemory = 0L;
     try {
       if (statement instanceof InsertBaseStatement) {
         estimatedMemory = ((InsertBaseStatement) statement).ramBytesUsed();
-        allocatedMemoryBlock =
-            PipeDataNodeResourceManager.memory()
-                .forceAllocate(
-                    (long)
-                        (estimatedMemory
-                            * PipeConfig.getInstance()
-                                
.getPipeReceiverActualToEstimatedMemoryRatio()));
+        for (int i = 0; i < tryCount; ++i) {
+          try {
+            allocatedMemoryBlock =
+                PipeDataNodeResourceManager.memory()
+                    .forceAllocate(
+                        (long)
+                            (estimatedMemory
+                                * PipeConfig.getInstance()
+                                    
.getPipeReceiverActualToEstimatedMemoryRatio()));
+            break;
+          } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
+            if (i == tryCount - 1) {
+              final String message =
+                  String.format(
+                      "Temporarily out of memory when executing statement %s, 
Requested memory: %s, "
+                          + "used memory: %s, free memory: %s, total 
non-floating memory: %s",
+                      statement,
+                      estimatedMemory
+                          * 
PipeConfig.getInstance().getPipeReceiverActualToEstimatedMemoryRatio(),
+                      
PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes(),
+                      
PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes(),
+                      
PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes());
+              if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Receiver id = {}: {}", receiverId.get(), 
message, e);
+              }
+              return new TSStatus(
+                      
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+                  .setMessage(message);
+            } else {
+              Thread.sleep(100L * (i + 1));
+            }
+          }
+        }
       }
+
       final TSStatus result = 
executeStatementWithRetryOnDataTypeMismatch(statement);
       if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
           || result.getCode() == 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
@@ -655,22 +687,6 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
             result);
         return statement.accept(STATEMENT_STATUS_VISITOR, result);
       }
-    } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
-      final String message =
-          String.format(
-              "Temporarily out of memory when executing statement %s, 
Requested memory: %s, "
-                  + "used memory: %s, free memory: %s, total non-floating 
memory: %s",
-              statement,
-              estimatedMemory,
-              PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes(),
-              PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes(),
-              
PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes());
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("Receiver id = {}: {}", receiverId.get(), message, e);
-      }
-      return new TSStatus(
-              
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
-          .setMessage(message);
     } catch (final Exception e) {
       LOGGER.warn(
           "Receiver id = {}: Exception encountered while executing statement 
{}: ",

Reply via email to