This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch force_ci/alter_column_datatype
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/force_ci/alter_column_datatype 
by this push:
     new e58e94d7caa adapt write and add tests
e58e94d7caa is described below

commit e58e94d7caa6bc4b61fb60e5c787c02cd30302e9
Author: Tian Jiang <[email protected]>
AuthorDate: Tue Jan 7 17:30:20 2025 +0800

    adapt write and add tests
---
 .../it/schema/IoTDBAlterColumnTypeIT.java          | 145 +++++++++++++++++++++
 .../it/session/IoTDBSessionRelationalIT.java       |   2 +-
 .../client/async/CnToDnAsyncRequestType.java       |   3 +-
 .../rpc/DataNodeAsyncRequestRPCHandler.java        |   1 -
 .../procedure/store/ProcedureFactory.java          |   2 +
 .../exception/DataTypeInconsistentException.java   |  12 ++
 .../db/storageengine/dataregion/DataRegion.java    |  69 ++++++++--
 .../dataregion/memtable/AbstractMemTable.java      |  30 +++++
 .../memtable/AlignedWritableMemChunk.java          |  19 +++
 .../memtable/AlignedWritableMemChunkGroup.java     |   7 +
 .../dataregion/memtable/IMemTable.java             |   4 +
 .../memtable/IWritableMemChunkGroup.java           |   4 +
 .../dataregion/memtable/TsFileProcessor.java       |   4 +
 .../dataregion/memtable/WritableMemChunkGroup.java |  18 +++
 .../table/AlterOrDropTableOperationType.java       |   2 +
 15 files changed, 308 insertions(+), 14 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java
new file mode 100644
index 00000000000..019d218934e
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.schema;
+
+import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.RowRecord;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import static 
org.apache.iotdb.relational.it.session.IoTDBSessionRelationalIT.genValue;
+import static org.junit.Assert.assertEquals;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBAlterColumnTypeIT {
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv().initClusterEnvironment();
+    try (ITableSession session = 
EnvFactory.getEnv().getTableSessionConnection()) {
+      session.executeNonQueryStatement("CREATE DATABASE IF NOT EXISTS test");
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testWriteAndAlter() throws IoTDBConnectionException, 
StatementExecutionException {
+    for (TSDataType from : TSDataType.values()) {
+      for (TSDataType to : TSDataType.values()) {
+        System.out.printf("testing %s to %s%n", from, to);
+        doWriteAndAlter(from, to, false);
+        doWriteAndAlter(from, to, true);
+      }
+    }
+  }
+
+  private void doWriteAndAlter(TSDataType from, TSDataType to, boolean flush)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try (ITableSession session = 
EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) {
+      // create a table with type of "from"
+      session.executeNonQueryStatement(
+          "CREATE TABLE IF NOT EXISTS write_and_alter_column_type (s1 " + from 
+ ")");
+
+      // write a point of "from"
+      session.executeNonQueryStatement(
+          "INSERT INTO write_and_alter_column_type (time, s1) VALUES (1, "
+              + genValue(from, 1)
+              + ")");
+
+      if (flush) {
+        session.executeNonQueryStatement("FLUSH");
+      }
+
+      // alter the type to "to"
+      boolean isCompatible = to.isCompatible(from);
+      if (isCompatible) {
+        session.executeNonQueryStatement(
+            "ALTER TABLE write_and_alter_column_type ALTER COLUMN s1 SET DATA 
TYPE " + to);
+      } else {
+        try {
+          session.executeNonQueryStatement(
+              "ALTER TABLE write_and_alter_column_type ALTER COLUMN s1 SET 
DATA TYPE " + to);
+        } catch (StatementExecutionException e) {
+          assertEquals("", e.getMessage());
+        }
+      }
+
+      SessionDataSet dataSet =
+          session.executeQueryStatement("select * from 
write_and_alter_column_type order by time");
+      RowRecord rec = dataSet.next();
+      assertEquals(1, rec.getFields().get(0).getLongV());
+      if (to == TSDataType.BLOB) {
+        assertEquals(genValue(to, 1), rec.getFields().get(1).getBinaryV());
+      } else if (to == TSDataType.DATE) {
+        assertEquals(genValue(to, 1), rec.getFields().get(1).getDateV());
+      } else {
+        assertEquals(genValue(to, 1).toString(), 
rec.getFields().get(1).toString());
+      }
+
+      // write a point
+      session.executeNonQueryStatement(
+          "INSERT INTO write_and_alter_column_type (time, s1) VALUES (2, "
+              + genValue(isCompatible ? to : from, 2)
+              + ")");
+
+      dataSet =
+          session.executeQueryStatement("select * from 
write_and_alter_column_type order by time");
+      rec = dataSet.next();
+      assertEquals(1, rec.getFields().get(0).getLongV());
+      TSDataType newType = isCompatible ? to : from;
+      if (newType == TSDataType.BLOB) {
+        assertEquals(genValue(newType, 1), 
rec.getFields().get(1).getBinaryV());
+      } else if (newType == TSDataType.DATE) {
+        assertEquals(genValue(newType, 1), rec.getFields().get(1).getDateV());
+      } else {
+        assertEquals(genValue(newType, 1).toString(), 
rec.getFields().get(1).toString());
+      }
+
+      rec = dataSet.next();
+      assertEquals(2, rec.getFields().get(0).getLongV());
+      if (newType == TSDataType.BLOB) {
+        assertEquals(genValue(newType, 2), 
rec.getFields().get(1).getBinaryV());
+      } else if (newType == TSDataType.DATE) {
+        assertEquals(genValue(newType, 2), rec.getFields().get(1).getDateV());
+      } else {
+        assertEquals(genValue(newType, 2).toString(), 
rec.getFields().get(1).toString());
+      }
+
+      session.executeNonQueryStatement("DROP TABLE 
write_and_alter_column_type");
+    }
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
index 50ae3f55b6e..5514c966d9e 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
@@ -1324,7 +1324,7 @@ public class IoTDBSessionRelationalIT {
   }
 
   @SuppressWarnings("SameParameterValue")
-  private Object genValue(TSDataType dataType, int i) {
+  public static Object genValue(TSDataType dataType, int i) {
     switch (dataType) {
       case INT32:
         return i;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
index d982b3886e2..ffe95cd805e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
@@ -119,6 +119,5 @@ public enum CnToDnAsyncRequestType {
   ROLLBACK_TABLE_DEVICE_BLACK_LIST,
   INVALIDATE_MATCHED_TABLE_DEVICE_CACHE,
   DELETE_DATA_FOR_TABLE_DEVICE,
-  DELETE_TABLE_DEVICE_IN_BLACK_LIST,
-  ALTER_COLUMN_DATATYPE
+  DELETE_TABLE_DEVICE_IN_BLACK_LIST
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
index 1419482e895..b1e8753ab49 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
@@ -89,7 +89,6 @@ public abstract class DataNodeAsyncRequestRPCHandler<Response>
       case ROLLBACK_VIEW_SCHEMA_BLACK_LIST:
       case DELETE_VIEW:
       case ALTER_VIEW:
-      case ALTER_COLUMN_DATATYPE:
         return new SchemaUpdateRPCHandler(
             requestType,
             requestId,
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index 3805db28be4..0dbd4a33e1b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -437,6 +437,8 @@ public class ProcedureFactory implements IProcedureFactory {
       return ProcedureType.NEVER_FINISH_PROCEDURE;
     } else if (procedure instanceof AddNeverFinishSubProcedureProcedure) {
       return ProcedureType.ADD_NEVER_FINISH_SUB_PROCEDURE_PROCEDURE;
+    } else if (procedure instanceof AlterTableColumnDataTypeProcedure) {
+      return ProcedureType.ALTER_TABLE_COLUMN_DATATYPE_PROCEDURE;
     }
     throw new UnsupportedOperationException(
         "Procedure type " + procedure.getClass() + " is not supported");
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/DataTypeInconsistentException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/DataTypeInconsistentException.java
new file mode 100644
index 00000000000..ce0dca64cb9
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/DataTypeInconsistentException.java
@@ -0,0 +1,12 @@
+package org.apache.iotdb.db.exception;
+
+import org.apache.tsfile.enums.TSDataType;
+
+public class DataTypeInconsistentException extends WriteProcessException {
+
+  public DataTypeInconsistentException(TSDataType existing, TSDataType 
incoming) {
+    super(
+        String.format(
+            "Inconsistent data types, existing data type: %s, incoming: %s", 
existing, incoming));
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 794ece53842..f6ba264bf5a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.BatchProcessException;
 import org.apache.iotdb.db.exception.DataRegionException;
+import org.apache.iotdb.db.exception.DataTypeInconsistentException;
 import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.exception.WriteProcessException;
@@ -1052,8 +1053,14 @@ public class DataRegion implements IDataRegionForQuery {
                   > lastFlushTimeMap.getFlushedTime(timePartitionId, 
insertRowNode.getDeviceID());
 
       // insert to sequence or unSequence file
-      TsFileProcessor tsFileProcessor =
-          insertToTsFileProcessor(insertRowNode, isSequence, timePartitionId);
+      TsFileProcessor tsFileProcessor;
+      try {
+        tsFileProcessor = insertToTsFileProcessor(insertRowNode, isSequence, 
timePartitionId);
+      } catch (DataTypeInconsistentException e) {
+        tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, 
isSequence);
+        fileFlushPolicy.apply(this, tsFileProcessor, 
tsFileProcessor.isSequence());
+        tsFileProcessor = insertToTsFileProcessor(insertRowNode, isSequence, 
timePartitionId);
+      }
 
       // check memtable size and may asyncTryToFlush the work memtable
       if (tsFileProcessor != null && tsFileProcessor.shouldFlush()) {
@@ -1311,11 +1318,10 @@ public class DataRegion implements IDataRegionForQuery {
       return false;
     }
 
-    // register TableSchema (and maybe more) for table insertion
-    registerToTsFile(insertTabletNode, tsFileProcessor);
-
     try {
-      tsFileProcessor.insertTablet(insertTabletNode, rangeList, results, 
noFailure, infoForMetrics);
+      tsFileProcessor =
+          insertTabletWithTypeConsistencyCheck(
+              tsFileProcessor, insertTabletNode, rangeList, results, 
noFailure, infoForMetrics);
     } catch (WriteProcessRejectException e) {
       logger.warn("insert to TsFileProcessor rejected, {}", e.getMessage());
       return false;
@@ -1331,6 +1337,29 @@ public class DataRegion implements IDataRegionForQuery {
     return true;
   }
 
+  private TsFileProcessor insertTabletWithTypeConsistencyCheck(
+      TsFileProcessor tsFileProcessor,
+      InsertTabletNode insertTabletNode,
+      List<int[]> rangeList,
+      TSStatus[] results,
+      boolean noFailure,
+      long[] infoForMetrics)
+      throws WriteProcessException {
+    try {
+      // register TableSchema (and maybe more) for table insertion
+      registerToTsFile(insertTabletNode, tsFileProcessor);
+      tsFileProcessor.insertTablet(insertTabletNode, rangeList, results, 
noFailure, infoForMetrics);
+    } catch (DataTypeInconsistentException e) {
+      fileFlushPolicy.apply(this, tsFileProcessor, 
tsFileProcessor.isSequence());
+      tsFileProcessor =
+          getOrCreateTsFileProcessor(
+              tsFileProcessor.getTimeRangeId(), tsFileProcessor.isSequence());
+      registerToTsFile(insertTabletNode, tsFileProcessor);
+      tsFileProcessor.insertTablet(insertTabletNode, rangeList, results, 
noFailure, infoForMetrics);
+    }
+    return tsFileProcessor;
+  }
+
   private void registerToTsFile(InsertNode node, TsFileProcessor 
tsFileProcessor) {
     final String tableName = node.getTableName();
     if (tableName != null) {
@@ -1415,7 +1444,8 @@ public class DataRegion implements IDataRegionForQuery {
       TsFileProcessor tsFileProcessor = entry.getKey();
       InsertRowsNode subInsertRowsNode = entry.getValue();
       try {
-        tsFileProcessor.insertRows(subInsertRowsNode, infoForMetrics);
+        tsFileProcessor =
+            insertRowsWithTypeConsistencyCheck(tsFileProcessor, 
subInsertRowsNode, infoForMetrics);
       } catch (WriteProcessException e) {
         insertRowsNode
             .getResults()
@@ -1424,8 +1454,7 @@ public class DataRegion implements IDataRegionForQuery {
                 RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
       }
       
executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList());
-      // register TableSchema (and maybe more) for table insertion
-      registerToTsFile(subInsertRowsNode, tsFileProcessor);
+
       // check memtable size and may asyncTryToFlush the work memtable
       if (entry.getKey().shouldFlush()) {
         fileFlushPolicy.apply(this, tsFileProcessor, 
tsFileProcessor.isSequence());
@@ -1434,6 +1463,24 @@ public class DataRegion implements IDataRegionForQuery {
     return executedInsertRowNodeList;
   }
 
+  private TsFileProcessor insertRowsWithTypeConsistencyCheck(
+      TsFileProcessor tsFileProcessor, InsertRowsNode subInsertRowsNode, 
long[] infoForMetrics)
+      throws WriteProcessException {
+    try {
+      // register TableSchema (and maybe more) for table insertion
+      registerToTsFile(subInsertRowsNode, tsFileProcessor);
+      tsFileProcessor.insertRows(subInsertRowsNode, infoForMetrics);
+    } catch (DataTypeInconsistentException e) {
+      fileFlushPolicy.apply(this, tsFileProcessor, 
tsFileProcessor.isSequence());
+      tsFileProcessor =
+          getOrCreateTsFileProcessor(
+              tsFileProcessor.getTimeRangeId(), tsFileProcessor.isSequence());
+      registerToTsFile(subInsertRowsNode, tsFileProcessor);
+      tsFileProcessor.insertRows(subInsertRowsNode, infoForMetrics);
+    }
+    return tsFileProcessor;
+  }
+
   private void tryToUpdateInsertRowsLastCache(List<InsertRowNode> nodeList) {
     for (InsertRowNode node : nodeList) {
       node.updateLastCache(databaseName);
@@ -3384,7 +3431,9 @@ public class DataRegion implements IDataRegionForQuery {
         TsFileProcessor tsFileProcessor = entry.getKey();
         InsertRowsNode subInsertRowsNode = entry.getValue();
         try {
-          tsFileProcessor.insertRows(subInsertRowsNode, infoForMetrics);
+          tsFileProcessor =
+              insertRowsWithTypeConsistencyCheck(
+                  tsFileProcessor, subInsertRowsNode, infoForMetrics);
         } catch (WriteProcessException e) {
           insertRowsOfOneDeviceNode
               .getResults()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index 027de99166b..4076c2aad7c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -29,11 +29,15 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.DataTypeInconsistentException;
 import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils;
 import org.apache.iotdb.db.storageengine.dataregion.flush.FlushStatus;
@@ -1057,4 +1061,30 @@ public abstract class AbstractMemTable implements 
IMemTable {
   public boolean isTotallyGeneratedByPipe() {
     return this.isTotallyGeneratedByPipe.get();
   }
+
+  @Override
+  public void checkDataType(InsertNode node) throws 
DataTypeInconsistentException {
+    if (node instanceof InsertRowsNode) {
+      List<InsertRowNode> insertRowNodeList = ((InsertRowsNode) 
node).getInsertRowNodeList();
+      for (InsertRowNode insertRowNode : insertRowNodeList) {
+        doCheckDataType(insertRowNode);
+      }
+    } else if (node instanceof InsertMultiTabletsNode) {
+      List<InsertTabletNode> insertTabletNodeList =
+          ((InsertMultiTabletsNode) node).getInsertTabletNodeList();
+      for (InsertTabletNode insertTabletNode : insertTabletNodeList) {
+        doCheckDataType(insertTabletNode);
+      }
+    } else {
+      doCheckDataType(node);
+    }
+  }
+
+  private void doCheckDataType(InsertNode node) throws 
DataTypeInconsistentException {
+    IWritableMemChunkGroup memChunkGroup = memTableMap.get(node.getDeviceID());
+    if (memChunkGroup == null) {
+      return;
+    }
+    memChunkGroup.checkDataType(node);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index 3c906104c17..4beb010e287 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.exception.DataTypeInconsistentException;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
 import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
@@ -558,4 +560,21 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
   public boolean isAllDeleted() {
     return list.isAllDeleted();
   }
+
+  public void checkDataType(InsertNode node) throws 
DataTypeInconsistentException {
+    for (MeasurementSchema incomingSchema : node.getMeasurementSchemas()) {
+      if (incomingSchema == null) {
+        continue;
+      }
+
+      Integer index = 
measurementIndexMap.get(incomingSchema.getMeasurementName());
+      if (index != null) {
+        IMeasurementSchema existingSchema = schemaList.get(index);
+        if (existingSchema.getType() != incomingSchema.getType()) {
+          throw new DataTypeInconsistentException(
+              existingSchema.getType(), incomingSchema.getType());
+        }
+      }
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
index 609c37c3581..9d5acb038a0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.db.exception.DataTypeInconsistentException;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
 
@@ -162,4 +164,9 @@ public class AlignedWritableMemChunkGroup implements 
IWritableMemChunkGroup {
     memChunkGroup.memChunk = AlignedWritableMemChunk.deserialize(stream, 
isTableModel);
     return memChunkGroup;
   }
+
+  @Override
+  public void checkDataType(InsertNode node) throws 
DataTypeInconsistentException {
+    memChunk.checkDataType(node);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
index 35d9ad0e95c..e5494cf2391 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
@@ -21,9 +21,11 @@ package 
org.apache.iotdb.db.storageengine.dataregion.memtable;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.IFullPath;
+import org.apache.iotdb.db.exception.DataTypeInconsistentException;
 import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.storageengine.dataregion.flush.FlushStatus;
@@ -202,4 +204,6 @@ public interface IMemTable extends WALEntryValue {
   void markAsNotGeneratedByPipe();
 
   boolean isTotallyGeneratedByPipe();
+
+  void checkDataType(InsertNode node) throws DataTypeInconsistentException;
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
index a2d23dc9db0..b4bee0a3fae 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.exception.DataTypeInconsistentException;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
 
@@ -60,4 +62,6 @@ public interface IWritableMemChunkGroup extends WALEntryValue 
{
   long getCurrentTVListSize(String measurement);
 
   long getMaxTime();
+
+  void checkDataType(InsertNode node) throws DataTypeInconsistentException;
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 21e8b751550..4467b360b22 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -269,6 +269,8 @@ public class TsFileProcessor {
       throws WriteProcessException {
 
     ensureMemTable(infoForMetrics);
+    workMemTable.checkDataType(insertRowNode);
+
     long[] memIncrements;
 
     long memControlStartTime = System.nanoTime();
@@ -349,6 +351,7 @@ public class TsFileProcessor {
       throws WriteProcessException {
 
     ensureMemTable(infoForMetrics);
+    workMemTable.checkDataType(insertRowsNode);
 
     long[] memIncrements;
 
@@ -541,6 +544,7 @@ public class TsFileProcessor {
       throws WriteProcessException {
 
     ensureMemTable(infoForMetrics);
+    workMemTable.checkDataType(insertTabletNode);
 
     long[] memIncrements =
         scheduleMemoryBlock(insertTabletNode, rangeList, results, noFailure, 
infoForMetrics);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
index ac5d588eb4f..c271087f050 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.exception.DataTypeInconsistentException;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
@@ -27,6 +29,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
 import org.apache.tsfile.utils.BitMap;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
 
 import java.io.DataInputStream;
 import java.io.IOException;
@@ -199,4 +202,19 @@ public class WritableMemChunkGroup implements 
IWritableMemChunkGroup {
     }
     return memChunkGroup;
   }
+
+  @Override
+  public void checkDataType(InsertNode node) throws 
DataTypeInconsistentException {
+    for (MeasurementSchema incomingSchema : node.getMeasurementSchemas()) {
+      if (incomingSchema == null) {
+        continue;
+      }
+
+      IWritableMemChunk memChunk = 
memChunkMap.get(incomingSchema.getMeasurementName());
+      if (memChunk != null && memChunk.getTVList().getDataType() != 
incomingSchema.getType()) {
+        throw new DataTypeInconsistentException(
+            memChunk.getTVList().getDataType(), incomingSchema.getType());
+      }
+    }
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/AlterOrDropTableOperationType.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/AlterOrDropTableOperationType.java
index b22575f5c46..6dafc77d303 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/AlterOrDropTableOperationType.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/AlterOrDropTableOperationType.java
@@ -53,6 +53,8 @@ public enum AlterOrDropTableOperationType {
         return RENAME_TABLE;
       case 5:
         return DROP_TABLE;
+      case 6:
+        return ALTER_COLUMN_DATA_TYPE;
       default:
         throw new IllegalArgumentException();
     }

Reply via email to