This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8e5fcba50ba Fix cross partition write after alter data type (#17082)
8e5fcba50ba is described below
commit 8e5fcba50ba91300ed36ce70b57cb45a35580095
Author: Jiang Tian <[email protected]>
AuthorDate: Tue Jan 27 09:29:37 2026 +0800
Fix cross partition write after alter data type (#17082)
* Fi
* spotless
---
.../it/env/cluster/node/AbstractNodeWrapper.java | 2 +-
.../db/it/schema/IoTDBAlterTimeSeriesTypeIT.java | 36 +++++++++++++++++++
.../treemodel/manual/IoTDBPipePermissionIT.java | 42 ++++++++++++++++++++++
.../db/storageengine/dataregion/DataRegion.java | 13 +++----
.../apache/iotdb/commons/conf/CommonConfig.java | 3 +-
5 files changed, 85 insertions(+), 11 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
index 1a349034469..cbcf4ca3fd5 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
@@ -572,7 +572,7 @@ public abstract class AbstractNodeWrapper implements
BaseNodeWrapper {
Thread.currentThread().interrupt();
logger.error("Waiting node to shutdown error.", e);
}
- logger.info("In test {} {} started forcibly.", getTestLogDirName(),
getId());
+ logger.info("In test {} {} stopped forcibly.", getTestLogDirName(),
getId());
}
@Override
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBAlterTimeSeriesTypeIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBAlterTimeSeriesTypeIT.java
index f7d12f10b8f..181fd2da2b2 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBAlterTimeSeriesTypeIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBAlterTimeSeriesTypeIT.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.it.schema;
+import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.utils.MetadataUtils;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.isession.ISession;
@@ -2749,4 +2750,39 @@ public class IoTDBAlterTimeSeriesTypeIT {
throw new RuntimeException(e);
}
}
+
+ @Test
+ public void testCrossPartitionWrite()
+ throws IoTDBConnectionException, StatementExecutionException {
+ try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
+ session.executeNonQueryStatement("CREATE DATABASE root.cross_partition");
+ session.executeNonQueryStatement(
+ "CREATE TIMESERIES root.cross_partition.device1.sensor1 WITH
DATATYPE=INT32,ENCODING=RLE");
+
+ // Insert data into two partitions
+ Tablet tablet =
+ new Tablet(
+ "root.cross_partition.device1",
+ Arrays.asList(new MeasurementSchema("sensor1", TSDataType.INT32,
TSEncoding.RLE)));
+ tablet.addTimestamp(0, 0);
+ tablet.addValue("sensor1", 0, 0);
+ tablet.addTimestamp(1, CommonConfig.DEFAULT_TIME_PARTITION_INTERVAL);
+ tablet.addValue("sensor1", 1, 1);
+ session.insertTablet(tablet);
+
+ session.executeNonQueryStatement(
+ "ALTER TIMESERIES root.cross_partition.device1.sensor1 SET DATA TYPE
INT64");
+
+ // Insert data with altered type
+ tablet =
+ new Tablet(
+ "root.cross_partition.device1",
+ Arrays.asList(new MeasurementSchema("sensor1", TSDataType.INT64,
TSEncoding.RLE)));
+ tablet.addTimestamp(0, 0);
+ tablet.addValue("sensor1", 0, 0L);
+ tablet.addTimestamp(1, CommonConfig.DEFAULT_TIME_PARTITION_INTERVAL);
+ tablet.addValue("sensor1", 1, 1L);
+ session.insertTablet(tablet);
+ }
+ }
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
index 7ff4ff75efe..35403c4dc6d 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
@@ -233,6 +233,48 @@ public class IoTDBPipePermissionIT extends
AbstractPipeDualTreeModelManualIT {
}
}
+ @Test
+ public void testSourcePermissionRestart() throws SQLException {
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ TestUtils.executeNonQuery(senderEnv, "create user `thulab`
'passwD@123456'", connection);
+ TestUtils.executeNonQueries(
+ senderEnv, Collections.singletonList("grant READ on root.** to user
thulab"));
+
+ statement.execute(
+ String.format(
+ "create pipe a2b"
+ + " with source ("
+ + "'user'='thulab'"
+ + ", 'password'='passwD@123456')"
+ + " with sink ("
+ + "'node-urls'='%s')",
+
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()));
+
+ TestUtils.executeNonQueries(
+ senderEnv,
+ Arrays.asList(
+ "create aligned timeSeries root.vehicle.plane(temperature
DOUBLE, pressure INT32)"));
+ TestUtils.executeNonQueries(
+ receiverEnv,
+ Arrays.asList(
+ "create aligned timeSeries root.vehicle.plane(temperature
DOUBLE, pressure INT32)"));
+
+ TestUtils.executeNonQueries(senderEnv, Collections.singletonList("start
pipe a2b"));
+
+ TestUtils.executeNonQueries(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.vehicle.plane(temperature, pressure) values
(36.5, 1103)"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select count(pressure) from root.vehicle.plane",
+ "count(root.vehicle.plane.pressure),",
+ Collections.singleton("1,"));
+ }
+ }
+
@Test
public void testSourcePermission() {
TestUtils.executeNonQuery(senderEnv, "create user `thulab`
'passwD@123456'", null);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 3768f05941b..0061f41f995 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1487,15 +1487,10 @@ public class DataRegion implements IDataRegionForQuery {
registerToTsFile(insertTabletNode, tsFileProcessor);
tsFileProcessor.insertTablet(insertTabletNode, rangeList, results,
noFailure, infoForMetrics);
} catch (DataTypeInconsistentException e) {
- // flush both MemTables so that the new type can be inserted into a new
MemTable
- TsFileProcessor workSequenceProcessor =
workSequenceTsFileProcessors.get(timePartitionId);
- if (workSequenceProcessor != null) {
- fileFlushPolicy.apply(this, workSequenceProcessor,
workSequenceProcessor.isSequence());
- }
- TsFileProcessor workUnsequenceProcessor =
workUnsequenceTsFileProcessors.get(timePartitionId);
- if (workUnsequenceProcessor != null) {
- fileFlushPolicy.apply(this, workUnsequenceProcessor,
workUnsequenceProcessor.isSequence());
- }
+ // flush all MemTables so that the new type can be inserted into a new
MemTable
+ // cannot just flush the current TsFileProcessor, because the new type
may be inserted into
+ // other TsFileProcessors of this region
+ asyncCloseAllWorkingTsFileProcessors();
throw e;
} catch (WriteProcessRejectException e) {
logger.warn("insert to TsFileProcessor rejected, {}", e.getMessage());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index e0225b698a9..b0657272fc5 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -55,6 +55,7 @@ public class CommonConfig {
public static final String SYSTEM_CONFIG_NAME = "iotdb-system.properties";
public static final String SYSTEM_CONFIG_TEMPLATE_NAME =
"iotdb-system.properties.template";
private static final Logger logger =
LoggerFactory.getLogger(CommonConfig.class);
+ public static final long DEFAULT_TIME_PARTITION_INTERVAL = 604_800_000L;
// Open ID Secret
private String openIdProviderUrl = "";
@@ -184,7 +185,7 @@ public class CommonConfig {
private long timePartitionOrigin = 0;
/** Time partition interval in milliseconds. */
- private long timePartitionInterval = 604_800_000;
+ private long timePartitionInterval = DEFAULT_TIME_PARTITION_INTERVAL;
/** This variable set timestamp precision as millisecond, microsecond or
nanosecond. */
private String timestampPrecision = "ms";