This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new d4605c4128b Pipe: Add retry for tablet batch req to avoid
retransmission when memory is insufficient (#15715) (#15771)
d4605c4128b is described below
commit d4605c4128bab845fe0a8f621e5e3288d6a7fb6e
Author: Zikun Ma <[email protected]>
AuthorDate: Wed Jun 18 18:41:22 2025 +0800
Pipe: Add retry for tablet batch req to avoid retransmission when memory is
insufficient (#15715) (#15771)
---
.../protocol/thrift/IoTDBDataNodeReceiver.java | 70 +++++++++++++---------
1 file changed, 43 insertions(+), 27 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 7719a338fb6..00a7e4c46a0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -399,10 +399,10 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
Stream.of(
statementPair.getLeft().isEmpty()
? RpcUtils.SUCCESS_STATUS
- :
executeStatementAndAddRedirectInfo(statementPair.getLeft()),
+ :
executeBatchStatementAndAddRedirectInfo(statementPair.getLeft()),
statementPair.getRight().isEmpty()
? RpcUtils.SUCCESS_STATUS
- :
executeStatementAndAddRedirectInfo(statementPair.getRight()))
+ :
executeBatchStatementAndAddRedirectInfo(statementPair.getRight()))
.collect(Collectors.toList())));
}
@@ -594,8 +594,8 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
* request. So for each sub-status which needs to redirect, we record the
device path using the
* message field.
*/
- private TSStatus executeStatementAndAddRedirectInfo(final
InsertBaseStatement statement) {
- final TSStatus result = executeStatementAndClassifyExceptions(statement);
+ private TSStatus executeBatchStatementAndAddRedirectInfo(final
InsertBaseStatement statement) {
+ final TSStatus result = executeStatementAndClassifyExceptions(statement,
5);
if (result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
&& result.getSubStatusSize() > 0) {
@@ -631,18 +631,50 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
}
private TSStatus executeStatementAndClassifyExceptions(final Statement
statement) {
+ return executeStatementAndClassifyExceptions(statement, 1);
+ }
+
+ private TSStatus executeStatementAndClassifyExceptions(
+ final Statement statement, final int tryCount) {
long estimatedMemory = 0L;
try {
if (statement instanceof InsertBaseStatement) {
estimatedMemory = ((InsertBaseStatement) statement).ramBytesUsed();
- allocatedMemoryBlock =
- PipeDataNodeResourceManager.memory()
- .forceAllocate(
- (long)
- (estimatedMemory
- * PipeConfig.getInstance()
-
.getPipeReceiverActualToEstimatedMemoryRatio()));
+ for (int i = 0; i < tryCount; ++i) {
+ try {
+ allocatedMemoryBlock =
+ PipeDataNodeResourceManager.memory()
+ .forceAllocate(
+ (long)
+ (estimatedMemory
+ * PipeConfig.getInstance()
+
.getPipeReceiverActualToEstimatedMemoryRatio()));
+ break;
+ } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
+ if (i == tryCount - 1) {
+ final String message =
+ String.format(
+ "Temporarily out of memory when executing statement %s,
Requested memory: %s, "
+ + "used memory: %s, free memory: %s, total
non-floating memory: %s",
+ statement,
+ estimatedMemory
+ *
PipeConfig.getInstance().getPipeReceiverActualToEstimatedMemoryRatio(),
+
PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes(),
+
PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes(),
+
PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Receiver id = {}: {}", receiverId.get(),
message, e);
+ }
+ return new TSStatus(
+
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+ .setMessage(message);
+ } else {
+ Thread.sleep(100L * (i + 1));
+ }
+ }
+ }
}
+
final TSStatus result =
executeStatementWithRetryOnDataTypeMismatch(statement);
if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
|| result.getCode() ==
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
@@ -655,22 +687,6 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
result);
return statement.accept(STATEMENT_STATUS_VISITOR, result);
}
- } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
- final String message =
- String.format(
- "Temporarily out of memory when executing statement %s,
Requested memory: %s, "
- + "used memory: %s, free memory: %s, total non-floating
memory: %s",
- statement,
- estimatedMemory,
- PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes(),
- PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes(),
-
PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes());
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Receiver id = {}: {}", receiverId.get(), message, e);
- }
- return new TSStatus(
-
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
- .setMessage(message);
} catch (final Exception e) {
LOGGER.warn(
"Receiver id = {}: Exception encountered while executing statement
{}: ",