This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e5fcba50ba Fix cross partition write after alter data type (#17082)
8e5fcba50ba is described below

commit 8e5fcba50ba91300ed36ce70b57cb45a35580095
Author: Jiang Tian <[email protected]>
AuthorDate: Tue Jan 27 09:29:37 2026 +0800

    Fix cross partition write after alter data type (#17082)
    
    * Fi
    
    * spotless
---
 .../it/env/cluster/node/AbstractNodeWrapper.java   |  2 +-
 .../db/it/schema/IoTDBAlterTimeSeriesTypeIT.java   | 36 +++++++++++++++++++
 .../treemodel/manual/IoTDBPipePermissionIT.java    | 42 ++++++++++++++++++++++
 .../db/storageengine/dataregion/DataRegion.java    | 13 +++----
 .../apache/iotdb/commons/conf/CommonConfig.java    |  3 +-
 5 files changed, 85 insertions(+), 11 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
index 1a349034469..cbcf4ca3fd5 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
@@ -572,7 +572,7 @@ public abstract class AbstractNodeWrapper implements 
BaseNodeWrapper {
       Thread.currentThread().interrupt();
       logger.error("Waiting node to shutdown error.", e);
     }
-    logger.info("In test {} {} started forcibly.", getTestLogDirName(), 
getId());
+    logger.info("In test {} {} stopped forcibly.", getTestLogDirName(), 
getId());
   }
 
   @Override
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBAlterTimeSeriesTypeIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBAlterTimeSeriesTypeIT.java
index f7d12f10b8f..181fd2da2b2 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBAlterTimeSeriesTypeIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBAlterTimeSeriesTypeIT.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.it.schema;
 
+import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.utils.MetadataUtils;
 import org.apache.iotdb.db.utils.SchemaUtils;
 import org.apache.iotdb.isession.ISession;
@@ -2749,4 +2750,39 @@ public class IoTDBAlterTimeSeriesTypeIT {
       throw new RuntimeException(e);
     }
   }
+
+  @Test
+  public void testCrossPartitionWrite()
+      throws IoTDBConnectionException, StatementExecutionException {
+    try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      session.executeNonQueryStatement("CREATE DATABASE root.cross_partition");
+      session.executeNonQueryStatement(
+          "CREATE TIMESERIES root.cross_partition.device1.sensor1 WITH 
DATATYPE=INT32,ENCODING=RLE");
+
+      // Insert data into two partitions
+      Tablet tablet =
+          new Tablet(
+              "root.cross_partition.device1",
+              Arrays.asList(new MeasurementSchema("sensor1", TSDataType.INT32, 
TSEncoding.RLE)));
+      tablet.addTimestamp(0, 0);
+      tablet.addValue("sensor1", 0, 0);
+      tablet.addTimestamp(1, CommonConfig.DEFAULT_TIME_PARTITION_INTERVAL);
+      tablet.addValue("sensor1", 1, 1);
+      session.insertTablet(tablet);
+
+      session.executeNonQueryStatement(
+          "ALTER TIMESERIES root.cross_partition.device1.sensor1 SET DATA TYPE 
INT64");
+
+      // Insert data with altered type
+      tablet =
+          new Tablet(
+              "root.cross_partition.device1",
+              Arrays.asList(new MeasurementSchema("sensor1", TSDataType.INT64, 
TSEncoding.RLE)));
+      tablet.addTimestamp(0, 0);
+      tablet.addValue("sensor1", 0, 0L);
+      tablet.addTimestamp(1, CommonConfig.DEFAULT_TIME_PARTITION_INTERVAL);
+      tablet.addValue("sensor1", 1, 1L);
+      session.insertTablet(tablet);
+    }
+  }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
index 7ff4ff75efe..35403c4dc6d 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
@@ -233,6 +233,48 @@ public class IoTDBPipePermissionIT extends 
AbstractPipeDualTreeModelManualIT {
     }
   }
 
+  @Test
+  public void testSourcePermissionRestart() throws SQLException {
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      TestUtils.executeNonQuery(senderEnv, "create user `thulab` 
'passwD@123456'", connection);
+      TestUtils.executeNonQueries(
+          senderEnv, Collections.singletonList("grant READ on root.** to user 
thulab"));
+
+      statement.execute(
+          String.format(
+              "create pipe a2b"
+                  + " with source ("
+                  + "'user'='thulab'"
+                  + ", 'password'='passwD@123456')"
+                  + " with sink ("
+                  + "'node-urls'='%s')",
+              
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()));
+
+      TestUtils.executeNonQueries(
+          senderEnv,
+          Arrays.asList(
+              "create aligned timeSeries root.vehicle.plane(temperature 
DOUBLE, pressure INT32)"));
+      TestUtils.executeNonQueries(
+          receiverEnv,
+          Arrays.asList(
+              "create aligned timeSeries root.vehicle.plane(temperature 
DOUBLE, pressure INT32)"));
+
+      TestUtils.executeNonQueries(senderEnv, Collections.singletonList("start 
pipe a2b"));
+
+      TestUtils.executeNonQueries(
+          senderEnv,
+          Arrays.asList(
+              "insert into root.vehicle.plane(temperature, pressure) values 
(36.5, 1103)"));
+
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv,
+          "select count(pressure) from root.vehicle.plane",
+          "count(root.vehicle.plane.pressure),",
+          Collections.singleton("1,"));
+    }
+  }
+
   @Test
   public void testSourcePermission() {
     TestUtils.executeNonQuery(senderEnv, "create user `thulab` 
'passwD@123456'", null);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 3768f05941b..0061f41f995 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1487,15 +1487,10 @@ public class DataRegion implements IDataRegionForQuery {
       registerToTsFile(insertTabletNode, tsFileProcessor);
       tsFileProcessor.insertTablet(insertTabletNode, rangeList, results, 
noFailure, infoForMetrics);
     } catch (DataTypeInconsistentException e) {
-      // flush both MemTables so that the new type can be inserted into a new 
MemTable
-      TsFileProcessor workSequenceProcessor = 
workSequenceTsFileProcessors.get(timePartitionId);
-      if (workSequenceProcessor != null) {
-        fileFlushPolicy.apply(this, workSequenceProcessor, 
workSequenceProcessor.isSequence());
-      }
-      TsFileProcessor workUnsequenceProcessor = 
workUnsequenceTsFileProcessors.get(timePartitionId);
-      if (workUnsequenceProcessor != null) {
-        fileFlushPolicy.apply(this, workUnsequenceProcessor, 
workUnsequenceProcessor.isSequence());
-      }
+      // flush all MemTables so that the new type can be inserted into a new 
MemTable
+      // cannot just flush the current TsFileProcessor, because the new type 
may be inserted into
+      // other TsFileProcessors of this region
+      asyncCloseAllWorkingTsFileProcessors();
       throw e;
     } catch (WriteProcessRejectException e) {
       logger.warn("insert to TsFileProcessor rejected, {}", e.getMessage());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index e0225b698a9..b0657272fc5 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -55,6 +55,7 @@ public class CommonConfig {
   public static final String SYSTEM_CONFIG_NAME = "iotdb-system.properties";
   public static final String SYSTEM_CONFIG_TEMPLATE_NAME = 
"iotdb-system.properties.template";
   private static final Logger logger = 
LoggerFactory.getLogger(CommonConfig.class);
+  public static final long DEFAULT_TIME_PARTITION_INTERVAL = 604_800_000L;
 
   // Open ID Secret
   private String openIdProviderUrl = "";
@@ -184,7 +185,7 @@ public class CommonConfig {
   private long timePartitionOrigin = 0;
 
   /** Time partition interval in milliseconds. */
-  private long timePartitionInterval = 604_800_000;
+  private long timePartitionInterval = DEFAULT_TIME_PARTITION_INTERVAL;
 
   /** This variable set timestamp precision as millisecond, microsecond or 
nanosecond. */
   private String timestampPrecision = "ms";

Reply via email to