This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/object_type by this push:
     new e8efdeb01f1 Fix improper retries in dispatcher & measurements not set 
to null when inserting row out-of-ttl (#16297)
e8efdeb01f1 is described below

commit e8efdeb01f1bea1fda3fd32cd6a3debe47d6c505
Author: Jiang Tian <[email protected]>
AuthorDate: Fri Aug 29 09:34:25 2025 +0800

    Fix improper retries in dispatcher & measurements not set to null when 
inserting row out-of-ttl (#16297)
---
 .gitignore                                         |  1 +
 .../it/session/IoTDBSessionRelationalIT.java       | 38 ++++++++++++++++++++++
 .../consensus/iot/client/DispatchLogHandler.java   |  4 +--
 .../scheduler/FragmentInstanceDispatcherImpl.java  | 12 +++++++
 .../db/storageengine/dataregion/DataRegion.java    |  1 +
 .../pipe/receiver/PipeReceiverStatusHandler.java   |  2 +-
 .../org/apache/iotdb/commons/utils/RetryUtils.java |  2 +-
 7 files changed, 56 insertions(+), 4 deletions(-)

diff --git a/.gitignore b/.gitignore
index 111f5922f1a..a489dd048da 100644
--- a/.gitignore
+++ b/.gitignore
@@ -123,3 +123,4 @@ 
iotdb-core/tsfile/src/main/antlr4/org/apache/tsfile/parser/gen/
 # Develocity
 .mvn/.gradle-enterprise/
 .mvn/.develocity/
+.run/
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
index 88b92dbdf04..c95b8cc9fb3 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
@@ -27,12 +27,14 @@ import org.apache.iotdb.isession.ISession;
 import org.apache.iotdb.isession.ITableSession;
 import org.apache.iotdb.isession.SessionDataSet;
 import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.env.SimpleEnv;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.TableClusterIT;
 import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.TableSessionBuilder;
 
 import org.apache.tsfile.enums.ColumnCategory;
 import org.apache.tsfile.enums.TSDataType;
@@ -1875,4 +1877,40 @@ public class IoTDBSessionRelationalIT {
       EnvFactory.getEnv().initClusterEnvironment();
     }
   }
+
+  @Test
+  public void testSqlInsertWithExpiredTTL()
+      throws IoTDBConnectionException, StatementExecutionException {
+    SimpleEnv simpleEnv = new SimpleEnv();
+    simpleEnv.getConfig().getCommonConfig().setDataReplicationFactor(2);
+    simpleEnv
+        .getConfig()
+        .getCommonConfig()
+        
.setDataRegionConsensusProtocolClass("org.apache.iotdb.consensus.iot.IoTConsensus");
+    simpleEnv.initClusterEnvironment(1, 3);
+
+    try (ITableSession session = simpleEnv.getTableSessionConnection()) {
+      session.executeNonQueryStatement("CREATE DATABASE IF NOT EXISTS test");
+      session.executeNonQueryStatement("USE test");
+
+      session.executeNonQueryStatement("CREATE TABLE test_sql_ttl (s1 INT32)");
+      session.executeNonQueryStatement("ALTER TABLE test_sql_ttl SET 
PROPERTIES TTL=1");
+
+      for (DataNodeWrapper dataNodeWrapper : 
simpleEnv.getDataNodeWrapperList()) {
+        TableSessionBuilder tableSessionBuilder = new TableSessionBuilder();
+        tableSessionBuilder.nodeUrls(
+            Collections.singletonList(dataNodeWrapper.getIpAndPortString()));
+        tableSessionBuilder.database("test");
+        try (ITableSession subSession = tableSessionBuilder.build()) {
+          subSession.executeNonQueryStatement("INSERT INTO test_sql_ttl (time, 
s1) VALUES (10, 1)");
+        } catch (StatementExecutionException e) {
+          if (!e.getMessage().contains("is less than ttl time bound")) {
+            throw e;
+          }
+        }
+      }
+    } finally {
+      simpleEnv.cleanClusterEnvironment();
+    }
+  }
 }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
index d587e25ee19..e190c27a908 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
@@ -62,10 +62,10 @@ public class DispatchLogHandler implements 
AsyncMethodCallback<TSyncLogEntriesRe
   @Override
   public void onComplete(TSyncLogEntriesRes response) {
     if (response.getStatuses().stream()
-        .anyMatch(status -> 
RetryUtils.needRetryForConsensus(status.getCode()))) {
+        .anyMatch(status -> RetryUtils.needRetryForWrite(status.getCode()))) {
       List<String> retryStatusMessages =
           response.getStatuses().stream()
-              .filter(status -> 
RetryUtils.needRetryForConsensus(status.getCode()))
+              .filter(status -> RetryUtils.needRetryForWrite(status.getCode()))
               .map(TSStatus::getMessage)
               .collect(Collectors.toList());
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index a4f21d2f20f..8065fb28397 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
+import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.exception.RatisReadUnavailableException;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -66,6 +67,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
@@ -284,6 +286,16 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
           dispatchWriteOnce(shouldDispatch);
 
       // 3. decide if we need retry (we may decide the retry condition 
instance-wise, if needed)
+      Iterator<FailedFragmentInstanceWithStatus> iterator = 
failedInstances.iterator();
+      while (iterator.hasNext()) {
+        FailedFragmentInstanceWithStatus failedFragmentInstanceWithStatus = 
iterator.next();
+        if (!RetryUtils.needRetryForWrite(
+            failedFragmentInstanceWithStatus.getFailureStatus().getCode())) {
+          
dispatchFailures.add(failedFragmentInstanceWithStatus.getFailureStatus());
+          iterator.remove();
+        }
+      }
+
       final boolean shouldRetry =
           !failedInstances.isEmpty() && maxRetryDurationInNs > 0 && replicaNum 
> 1;
       if (!shouldRetry) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 4bb9bc2b7cb..1997a995f12 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -3798,6 +3798,7 @@ public class DataRegion implements IDataRegionForQuery {
                           DateTimeUtils.convertLongToDate(
                               CommonDateTimeUtils.currentTime() - ttl))));
           
insertRowNode.setFailedMeasurementNumber(insertRowNode.getMeasurements().length);
+          insertRowNode.setMeasurements(null);
           continue;
         }
         // init map
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index 779956b21d0..94325e9c969 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -96,7 +96,7 @@ public class PipeReceiverStatusHandler {
   public void handle(
       final TSStatus status, final String exceptionMessage, final String 
recordMessage) {
 
-    if (RetryUtils.needRetryForConsensus(status.getCode())) {
+    if (RetryUtils.needRetryForWrite(status.getCode())) {
       LOGGER.info("IoTConsensusV2: will retry with increasing interval. 
status: {}", status);
       throw new PipeConsensusRetryWithIncreasingIntervalException(
           exceptionMessage, Integer.MAX_VALUE);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
index 2bd4b7ba395..62b656364a5 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
@@ -30,7 +30,7 @@ public class RetryUtils {
     T call() throws E;
   }
 
-  public static boolean needRetryForConsensus(int statusCode) {
+  public static boolean needRetryForWrite(int statusCode) {
     return statusCode == TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()
         || statusCode == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
         || statusCode == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode();

Reply via email to