This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/object_type by this push:
new e8efdeb01f1 Fix improper retries in dispatcher & measurements not set
to null when inserting row out-of-ttl (#16297)
e8efdeb01f1 is described below
commit e8efdeb01f1bea1fda3fd32cd6a3debe47d6c505
Author: Jiang Tian <[email protected]>
AuthorDate: Fri Aug 29 09:34:25 2025 +0800
Fix improper retries in dispatcher & measurements not set to null when
inserting row out-of-ttl (#16297)
---
.gitignore | 1 +
.../it/session/IoTDBSessionRelationalIT.java | 38 ++++++++++++++++++++++
.../consensus/iot/client/DispatchLogHandler.java | 4 +--
.../scheduler/FragmentInstanceDispatcherImpl.java | 12 +++++++
.../db/storageengine/dataregion/DataRegion.java | 1 +
.../pipe/receiver/PipeReceiverStatusHandler.java | 2 +-
.../org/apache/iotdb/commons/utils/RetryUtils.java | 2 +-
7 files changed, 56 insertions(+), 4 deletions(-)
diff --git a/.gitignore b/.gitignore
index 111f5922f1a..a489dd048da 100644
--- a/.gitignore
+++ b/.gitignore
@@ -123,3 +123,4 @@
iotdb-core/tsfile/src/main/antlr4/org/apache/tsfile/parser/gen/
# Develocity
.mvn/.gradle-enterprise/
.mvn/.develocity/
+.run/
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
index 88b92dbdf04..c95b8cc9fb3 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
@@ -27,12 +27,14 @@ import org.apache.iotdb.isession.ISession;
import org.apache.iotdb.isession.ITableSession;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.env.SimpleEnv;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.TableClusterIT;
import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.TableSessionBuilder;
import org.apache.tsfile.enums.ColumnCategory;
import org.apache.tsfile.enums.TSDataType;
@@ -1875,4 +1877,40 @@ public class IoTDBSessionRelationalIT {
EnvFactory.getEnv().initClusterEnvironment();
}
}
+
+ @Test
+ public void testSqlInsertWithExpiredTTL()
+ throws IoTDBConnectionException, StatementExecutionException {
+ SimpleEnv simpleEnv = new SimpleEnv();
+ simpleEnv.getConfig().getCommonConfig().setDataReplicationFactor(2);
+ simpleEnv
+ .getConfig()
+ .getCommonConfig()
+
.setDataRegionConsensusProtocolClass("org.apache.iotdb.consensus.iot.IoTConsensus");
+ simpleEnv.initClusterEnvironment(1, 3);
+
+ try (ITableSession session = simpleEnv.getTableSessionConnection()) {
+ session.executeNonQueryStatement("CREATE DATABASE IF NOT EXISTS test");
+ session.executeNonQueryStatement("USE test");
+
+ session.executeNonQueryStatement("CREATE TABLE test_sql_ttl (s1 INT32)");
+ session.executeNonQueryStatement("ALTER TABLE test_sql_ttl SET
PROPERTIES TTL=1");
+
+ for (DataNodeWrapper dataNodeWrapper :
simpleEnv.getDataNodeWrapperList()) {
+ TableSessionBuilder tableSessionBuilder = new TableSessionBuilder();
+ tableSessionBuilder.nodeUrls(
+ Collections.singletonList(dataNodeWrapper.getIpAndPortString()));
+ tableSessionBuilder.database("test");
+ try (ITableSession subSession = tableSessionBuilder.build()) {
+ subSession.executeNonQueryStatement("INSERT INTO test_sql_ttl (time,
s1) VALUES (10, 1)");
+ } catch (StatementExecutionException e) {
+ if (!e.getMessage().contains("is less than ttl time bound")) {
+ throw e;
+ }
+ }
+ }
+ } finally {
+ simpleEnv.cleanClusterEnvironment();
+ }
+ }
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
index d587e25ee19..e190c27a908 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
@@ -62,10 +62,10 @@ public class DispatchLogHandler implements
AsyncMethodCallback<TSyncLogEntriesRe
@Override
public void onComplete(TSyncLogEntriesRes response) {
if (response.getStatuses().stream()
- .anyMatch(status ->
RetryUtils.needRetryForConsensus(status.getCode()))) {
+ .anyMatch(status -> RetryUtils.needRetryForWrite(status.getCode()))) {
List<String> retryStatusMessages =
response.getStatuses().stream()
- .filter(status ->
RetryUtils.needRetryForConsensus(status.getCode()))
+ .filter(status -> RetryUtils.needRetryForWrite(status.getCode()))
.map(TSStatus::getMessage)
.collect(Collectors.toList());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index a4f21d2f20f..8065fb28397 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
+import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.consensus.exception.RatisReadUnavailableException;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -66,6 +67,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -284,6 +286,16 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
dispatchWriteOnce(shouldDispatch);
// 3. decide if we need retry (we may decide the retry condition
instance-wise, if needed)
+ Iterator<FailedFragmentInstanceWithStatus> iterator =
failedInstances.iterator();
+ while (iterator.hasNext()) {
+ FailedFragmentInstanceWithStatus failedFragmentInstanceWithStatus =
iterator.next();
+ if (!RetryUtils.needRetryForWrite(
+ failedFragmentInstanceWithStatus.getFailureStatus().getCode())) {
+
dispatchFailures.add(failedFragmentInstanceWithStatus.getFailureStatus());
+ iterator.remove();
+ }
+ }
+
final boolean shouldRetry =
!failedInstances.isEmpty() && maxRetryDurationInNs > 0 && replicaNum
> 1;
if (!shouldRetry) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 4bb9bc2b7cb..1997a995f12 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -3798,6 +3798,7 @@ public class DataRegion implements IDataRegionForQuery {
DateTimeUtils.convertLongToDate(
CommonDateTimeUtils.currentTime() - ttl))));
insertRowNode.setFailedMeasurementNumber(insertRowNode.getMeasurements().length);
+ insertRowNode.setMeasurements(null);
continue;
}
// init map
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index 779956b21d0..94325e9c969 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -96,7 +96,7 @@ public class PipeReceiverStatusHandler {
public void handle(
final TSStatus status, final String exceptionMessage, final String
recordMessage) {
- if (RetryUtils.needRetryForConsensus(status.getCode())) {
+ if (RetryUtils.needRetryForWrite(status.getCode())) {
LOGGER.info("IoTConsensusV2: will retry with increasing interval.
status: {}", status);
throw new PipeConsensusRetryWithIncreasingIntervalException(
exceptionMessage, Integer.MAX_VALUE);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
index 2bd4b7ba395..62b656364a5 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
@@ -30,7 +30,7 @@ public class RetryUtils {
T call() throws E;
}
- public static boolean needRetryForConsensus(int statusCode) {
+ public static boolean needRetryForWrite(int statusCode) {
return statusCode == TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()
|| statusCode == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
|| statusCode == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode();