This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch fix_retry_and_ttl_failure in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 88f98381eee90aadc2e2c31e734b0c1534dbfad2 Author: Tian Jiang <[email protected]> AuthorDate: Thu Aug 28 18:22:30 2025 +0800 Fix improper retries in dispatcher & measurements not set to null when inserting row out-of-ttl --- .gitignore | 1 + .../it/session/IoTDBSessionRelationalIT.java | 38 ++++++++++++++++++++++ .../consensus/iot/client/DispatchLogHandler.java | 4 +-- .../scheduler/FragmentInstanceDispatcherImpl.java | 12 +++++++ .../db/storageengine/dataregion/DataRegion.java | 1 + .../pipe/receiver/PipeReceiverStatusHandler.java | 2 +- .../org/apache/iotdb/commons/utils/RetryUtils.java | 2 +- 7 files changed, 56 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index 111f5922f1a..a489dd048da 100644 --- a/.gitignore +++ b/.gitignore @@ -123,3 +123,4 @@ iotdb-core/tsfile/src/main/antlr4/org/apache/tsfile/parser/gen/ # Develocity .mvn/.gradle-enterprise/ .mvn/.develocity/ +.run/ diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java index f94862c69f8..4e78947d77f 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java @@ -27,12 +27,14 @@ import org.apache.iotdb.isession.ISession; import org.apache.iotdb.isession.ITableSession; import org.apache.iotdb.isession.SessionDataSet; import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.env.cluster.env.SimpleEnv; import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.TableClusterIT; import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.TableSessionBuilder; import org.apache.tsfile.enums.ColumnCategory; import org.apache.tsfile.enums.TSDataType; @@ -1912,4 +1914,40 @@ public class IoTDBSessionRelationalIT { EnvFactory.getEnv().initClusterEnvironment(); } } + + @Test + public void testSqlInsertWithExpiredTTL() + throws IoTDBConnectionException, StatementExecutionException { + SimpleEnv simpleEnv = new SimpleEnv(); + simpleEnv.getConfig().getCommonConfig().setDataReplicationFactor(2); + simpleEnv + .getConfig() + .getCommonConfig() + .setDataRegionConsensusProtocolClass("org.apache.iotdb.consensus.iot.IoTConsensus"); + simpleEnv.initClusterEnvironment(1, 3); + + try (ITableSession session = simpleEnv.getTableSessionConnection()) { + session.executeNonQueryStatement("CREATE DATABASE IF NOT EXISTS test"); + session.executeNonQueryStatement("USE test"); + + session.executeNonQueryStatement("CREATE TABLE test_sql_ttl (s1 INT32)"); + session.executeNonQueryStatement("ALTER TABLE test_sql_ttl SET PROPERTIES TTL=1"); + + for (DataNodeWrapper dataNodeWrapper : simpleEnv.getDataNodeWrapperList()) { + TableSessionBuilder tableSessionBuilder = new TableSessionBuilder(); + tableSessionBuilder.nodeUrls( + Collections.singletonList(dataNodeWrapper.getIpAndPortString())); + tableSessionBuilder.database("test"); + try (ITableSession subSession = tableSessionBuilder.build()) { + subSession.executeNonQueryStatement("INSERT INTO test_sql_ttl (time, s1) VALUES (10, 1)"); + } catch (StatementExecutionException e) { + if (!e.getMessage().contains("is less than ttl time bound")) { + throw e; + } + } + } + } finally { + simpleEnv.cleanClusterEnvironment(); + } + } } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java index 379098582b6..fca228d3ae8 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java @@ -63,10 +63,10 @@ public class DispatchLogHandler implements AsyncMethodCallback<TSyncLogEntriesRe @Override public void onComplete(TSyncLogEntriesRes response) { if (response.getStatuses().stream() - .anyMatch(status -> RetryUtils.needRetryForConsensus(status.getCode()))) { + .anyMatch(status -> RetryUtils.needRetryForWrite(status.getCode()))) { List<String> retryStatusMessages = response.getStatuses().stream() - .filter(status -> RetryUtils.needRetryForConsensus(status.getCode())) + .filter(status -> RetryUtils.needRetryForWrite(status.getCode())) .map(TSStatus::getMessage) .collect(Collectors.toList()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java index a3a99473e4e..258cc627038 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -30,6 +30,7 @@ import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics; +import org.apache.iotdb.commons.utils.RetryUtils; import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException; import org.apache.iotdb.consensus.exception.RatisReadUnavailableException; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -66,6 +67,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -289,6 +291,16 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { dispatchWriteOnce(shouldDispatch); // 3. decide if we need retry (we may decide the retry condition instance-wise, if needed) + Iterator<FailedFragmentInstanceWithStatus> iterator = failedInstances.iterator(); + while (iterator.hasNext()) { + FailedFragmentInstanceWithStatus failedFragmentInstanceWithStatus = iterator.next(); + if (!RetryUtils.needRetryForWrite( + failedFragmentInstanceWithStatus.getFailureStatus().getCode())) { + dispatchFailures.add(failedFragmentInstanceWithStatus.getFailureStatus()); + iterator.remove(); + } + } + final boolean shouldRetry = !failedInstances.isEmpty() && maxRetryDurationInNs > 0 && replicaNum > 1; if (!shouldRetry) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 243cccd6a41..10b5d969ed3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -3970,6 +3970,7 @@ public class DataRegion implements IDataRegionForQuery { DateTimeUtils.convertLongToDate( CommonDateTimeUtils.currentTime() - ttl)))); insertRowNode.setFailedMeasurementNumber(insertRowNode.getMeasurements().length); + insertRowNode.setMeasurements(null); continue; } // init map diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java index c90ba66a71b..d862030847f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java @@ -96,7 +96,7 @@ public class PipeReceiverStatusHandler { public void handle( final TSStatus status, final String exceptionMessage, final String recordMessage) { - if (RetryUtils.needRetryForConsensus(status.getCode())) { + if (RetryUtils.needRetryForWrite(status.getCode())) { LOGGER.info("IoTConsensusV2: will retry with increasing interval. status: {}", status); throw new PipeConsensusRetryWithIncreasingIntervalException( exceptionMessage, Integer.MAX_VALUE); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java index 2bd4b7ba395..62b656364a5 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java @@ -30,7 +30,7 @@ public class RetryUtils { T call() throws E; } - public static boolean needRetryForConsensus(int statusCode) { + public static boolean needRetryForWrite(int statusCode) { return statusCode == TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode() || statusCode == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode() || statusCode == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode();
