This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch force_ci/alter_column_datatype
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/force_ci/alter_column_datatype
by this push:
new e58e94d7caa adapt write and add tests
e58e94d7caa is described below
commit e58e94d7caa6bc4b61fb60e5c787c02cd30302e9
Author: Tian Jiang <[email protected]>
AuthorDate: Tue Jan 7 17:30:20 2025 +0800
adapt write and add tests
---
.../it/schema/IoTDBAlterColumnTypeIT.java | 145 +++++++++++++++++++++
.../it/session/IoTDBSessionRelationalIT.java | 2 +-
.../client/async/CnToDnAsyncRequestType.java | 3 +-
.../rpc/DataNodeAsyncRequestRPCHandler.java | 1 -
.../procedure/store/ProcedureFactory.java | 2 +
.../exception/DataTypeInconsistentException.java | 12 ++
.../db/storageengine/dataregion/DataRegion.java | 69 ++++++++--
.../dataregion/memtable/AbstractMemTable.java | 30 +++++
.../memtable/AlignedWritableMemChunk.java | 19 +++
.../memtable/AlignedWritableMemChunkGroup.java | 7 +
.../dataregion/memtable/IMemTable.java | 4 +
.../memtable/IWritableMemChunkGroup.java | 4 +
.../dataregion/memtable/TsFileProcessor.java | 4 +
.../dataregion/memtable/WritableMemChunkGroup.java | 18 +++
.../table/AlterOrDropTableOperationType.java | 2 +
15 files changed, 308 insertions(+), 14 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java
new file mode 100644
index 00000000000..019d218934e
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.schema;
+
+import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.RowRecord;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import static
org.apache.iotdb.relational.it.session.IoTDBSessionRelationalIT.genValue;
+import static org.junit.Assert.assertEquals;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBAlterColumnTypeIT {
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvFactory.getEnv().initClusterEnvironment();
+ try (ITableSession session =
EnvFactory.getEnv().getTableSessionConnection()) {
+ session.executeNonQueryStatement("CREATE DATABASE IF NOT EXISTS test");
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testWriteAndAlter() throws IoTDBConnectionException,
StatementExecutionException {
+ for (TSDataType from : TSDataType.values()) {
+ for (TSDataType to : TSDataType.values()) {
+ System.out.printf("testing %s to %s%n", from, to);
+ doWriteAndAlter(from, to, false);
+ doWriteAndAlter(from, to, true);
+ }
+ }
+ }
+
+ private void doWriteAndAlter(TSDataType from, TSDataType to, boolean flush)
+ throws IoTDBConnectionException, StatementExecutionException {
+ try (ITableSession session =
EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) {
+ // create a table with type of "from"
+ session.executeNonQueryStatement(
+ "CREATE TABLE IF NOT EXISTS write_and_alter_column_type (s1 " + from
+ ")");
+
+ // write a point of "from"
+ session.executeNonQueryStatement(
+ "INSERT INTO write_and_alter_column_type (time, s1) VALUES (1, "
+ + genValue(from, 1)
+ + ")");
+
+ if (flush) {
+ session.executeNonQueryStatement("FLUSH");
+ }
+
+ // alter the type to "to"
+ boolean isCompatible = to.isCompatible(from);
+ if (isCompatible) {
+ session.executeNonQueryStatement(
+ "ALTER TABLE write_and_alter_column_type ALTER COLUMN s1 SET DATA
TYPE " + to);
+ } else {
+ try {
+ session.executeNonQueryStatement(
+ "ALTER TABLE write_and_alter_column_type ALTER COLUMN s1 SET
DATA TYPE " + to);
+ } catch (StatementExecutionException e) {
+ assertEquals("", e.getMessage());
+ }
+ }
+
+ SessionDataSet dataSet =
+ session.executeQueryStatement("select * from
write_and_alter_column_type order by time");
+ RowRecord rec = dataSet.next();
+ assertEquals(1, rec.getFields().get(0).getLongV());
+ if (to == TSDataType.BLOB) {
+ assertEquals(genValue(to, 1), rec.getFields().get(1).getBinaryV());
+ } else if (to == TSDataType.DATE) {
+ assertEquals(genValue(to, 1), rec.getFields().get(1).getDateV());
+ } else {
+ assertEquals(genValue(to, 1).toString(),
rec.getFields().get(1).toString());
+ }
+
+ // write a point
+ session.executeNonQueryStatement(
+ "INSERT INTO write_and_alter_column_type (time, s1) VALUES (2, "
+ + genValue(isCompatible ? to : from, 2)
+ + ")");
+
+ dataSet =
+ session.executeQueryStatement("select * from
write_and_alter_column_type order by time");
+ rec = dataSet.next();
+ assertEquals(1, rec.getFields().get(0).getLongV());
+ TSDataType newType = isCompatible ? to : from;
+ if (newType == TSDataType.BLOB) {
+ assertEquals(genValue(newType, 1),
rec.getFields().get(1).getBinaryV());
+ } else if (newType == TSDataType.DATE) {
+ assertEquals(genValue(newType, 1), rec.getFields().get(1).getDateV());
+ } else {
+ assertEquals(genValue(newType, 1).toString(),
rec.getFields().get(1).toString());
+ }
+
+ rec = dataSet.next();
+ assertEquals(2, rec.getFields().get(0).getLongV());
+ if (newType == TSDataType.BLOB) {
+ assertEquals(genValue(newType, 2),
rec.getFields().get(1).getBinaryV());
+ } else if (newType == TSDataType.DATE) {
+ assertEquals(genValue(newType, 2), rec.getFields().get(1).getDateV());
+ } else {
+ assertEquals(genValue(newType, 2).toString(),
rec.getFields().get(1).toString());
+ }
+
+ session.executeNonQueryStatement("DROP TABLE
write_and_alter_column_type");
+ }
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
index 50ae3f55b6e..5514c966d9e 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
@@ -1324,7 +1324,7 @@ public class IoTDBSessionRelationalIT {
}
@SuppressWarnings("SameParameterValue")
- private Object genValue(TSDataType dataType, int i) {
+ public static Object genValue(TSDataType dataType, int i) {
switch (dataType) {
case INT32:
return i;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
index d982b3886e2..ffe95cd805e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
@@ -119,6 +119,5 @@ public enum CnToDnAsyncRequestType {
ROLLBACK_TABLE_DEVICE_BLACK_LIST,
INVALIDATE_MATCHED_TABLE_DEVICE_CACHE,
DELETE_DATA_FOR_TABLE_DEVICE,
- DELETE_TABLE_DEVICE_IN_BLACK_LIST,
- ALTER_COLUMN_DATATYPE
+ DELETE_TABLE_DEVICE_IN_BLACK_LIST
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
index 1419482e895..b1e8753ab49 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
@@ -89,7 +89,6 @@ public abstract class DataNodeAsyncRequestRPCHandler<Response>
case ROLLBACK_VIEW_SCHEMA_BLACK_LIST:
case DELETE_VIEW:
case ALTER_VIEW:
- case ALTER_COLUMN_DATATYPE:
return new SchemaUpdateRPCHandler(
requestType,
requestId,
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index 3805db28be4..0dbd4a33e1b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -437,6 +437,8 @@ public class ProcedureFactory implements IProcedureFactory {
return ProcedureType.NEVER_FINISH_PROCEDURE;
} else if (procedure instanceof AddNeverFinishSubProcedureProcedure) {
return ProcedureType.ADD_NEVER_FINISH_SUB_PROCEDURE_PROCEDURE;
+ } else if (procedure instanceof AlterTableColumnDataTypeProcedure) {
+ return ProcedureType.ALTER_TABLE_COLUMN_DATATYPE_PROCEDURE;
}
throw new UnsupportedOperationException(
"Procedure type " + procedure.getClass() + " is not supported");
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/DataTypeInconsistentException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/DataTypeInconsistentException.java
new file mode 100644
index 00000000000..ce0dca64cb9
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/DataTypeInconsistentException.java
@@ -0,0 +1,12 @@
+package org.apache.iotdb.db.exception;
+
+import org.apache.tsfile.enums.TSDataType;
+
+public class DataTypeInconsistentException extends WriteProcessException {
+
+ public DataTypeInconsistentException(TSDataType existing, TSDataType
incoming) {
+ super(
+ String.format(
+ "Inconsistent data types, existing data type: %s, incoming: %s",
existing, incoming));
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 794ece53842..f6ba264bf5a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.DataRegionException;
+import org.apache.iotdb.db.exception.DataTypeInconsistentException;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.WriteProcessException;
@@ -1052,8 +1053,14 @@ public class DataRegion implements IDataRegionForQuery {
> lastFlushTimeMap.getFlushedTime(timePartitionId,
insertRowNode.getDeviceID());
// insert to sequence or unSequence file
- TsFileProcessor tsFileProcessor =
- insertToTsFileProcessor(insertRowNode, isSequence, timePartitionId);
+ TsFileProcessor tsFileProcessor;
+ try {
+ tsFileProcessor = insertToTsFileProcessor(insertRowNode, isSequence,
timePartitionId);
+ } catch (DataTypeInconsistentException e) {
+ tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId,
isSequence);
+ fileFlushPolicy.apply(this, tsFileProcessor,
tsFileProcessor.isSequence());
+ tsFileProcessor = insertToTsFileProcessor(insertRowNode, isSequence,
timePartitionId);
+ }
// check memtable size and may asyncTryToFlush the work memtable
if (tsFileProcessor != null && tsFileProcessor.shouldFlush()) {
@@ -1311,11 +1318,10 @@ public class DataRegion implements IDataRegionForQuery {
return false;
}
- // register TableSchema (and maybe more) for table insertion
- registerToTsFile(insertTabletNode, tsFileProcessor);
-
try {
- tsFileProcessor.insertTablet(insertTabletNode, rangeList, results,
noFailure, infoForMetrics);
+ tsFileProcessor =
+ insertTabletWithTypeConsistencyCheck(
+ tsFileProcessor, insertTabletNode, rangeList, results,
noFailure, infoForMetrics);
} catch (WriteProcessRejectException e) {
logger.warn("insert to TsFileProcessor rejected, {}", e.getMessage());
return false;
@@ -1331,6 +1337,29 @@ public class DataRegion implements IDataRegionForQuery {
return true;
}
+ private TsFileProcessor insertTabletWithTypeConsistencyCheck(
+ TsFileProcessor tsFileProcessor,
+ InsertTabletNode insertTabletNode,
+ List<int[]> rangeList,
+ TSStatus[] results,
+ boolean noFailure,
+ long[] infoForMetrics)
+ throws WriteProcessException {
+ try {
+ // register TableSchema (and maybe more) for table insertion
+ registerToTsFile(insertTabletNode, tsFileProcessor);
+ tsFileProcessor.insertTablet(insertTabletNode, rangeList, results,
noFailure, infoForMetrics);
+ } catch (DataTypeInconsistentException e) {
+ fileFlushPolicy.apply(this, tsFileProcessor,
tsFileProcessor.isSequence());
+ tsFileProcessor =
+ getOrCreateTsFileProcessor(
+ tsFileProcessor.getTimeRangeId(), tsFileProcessor.isSequence());
+ registerToTsFile(insertTabletNode, tsFileProcessor);
+ tsFileProcessor.insertTablet(insertTabletNode, rangeList, results,
noFailure, infoForMetrics);
+ }
+ return tsFileProcessor;
+ }
+
private void registerToTsFile(InsertNode node, TsFileProcessor
tsFileProcessor) {
final String tableName = node.getTableName();
if (tableName != null) {
@@ -1415,7 +1444,8 @@ public class DataRegion implements IDataRegionForQuery {
TsFileProcessor tsFileProcessor = entry.getKey();
InsertRowsNode subInsertRowsNode = entry.getValue();
try {
- tsFileProcessor.insertRows(subInsertRowsNode, infoForMetrics);
+ tsFileProcessor =
+ insertRowsWithTypeConsistencyCheck(tsFileProcessor,
subInsertRowsNode, infoForMetrics);
} catch (WriteProcessException e) {
insertRowsNode
.getResults()
@@ -1424,8 +1454,7 @@ public class DataRegion implements IDataRegionForQuery {
RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
}
executedInsertRowNodeList.addAll(subInsertRowsNode.getInsertRowNodeList());
- // register TableSchema (and maybe more) for table insertion
- registerToTsFile(subInsertRowsNode, tsFileProcessor);
+
// check memtable size and may asyncTryToFlush the work memtable
if (entry.getKey().shouldFlush()) {
fileFlushPolicy.apply(this, tsFileProcessor,
tsFileProcessor.isSequence());
@@ -1434,6 +1463,24 @@ public class DataRegion implements IDataRegionForQuery {
return executedInsertRowNodeList;
}
+ private TsFileProcessor insertRowsWithTypeConsistencyCheck(
+ TsFileProcessor tsFileProcessor, InsertRowsNode subInsertRowsNode,
long[] infoForMetrics)
+ throws WriteProcessException {
+ try {
+ // register TableSchema (and maybe more) for table insertion
+ registerToTsFile(subInsertRowsNode, tsFileProcessor);
+ tsFileProcessor.insertRows(subInsertRowsNode, infoForMetrics);
+ } catch (DataTypeInconsistentException e) {
+ fileFlushPolicy.apply(this, tsFileProcessor,
tsFileProcessor.isSequence());
+ tsFileProcessor =
+ getOrCreateTsFileProcessor(
+ tsFileProcessor.getTimeRangeId(), tsFileProcessor.isSequence());
+ registerToTsFile(subInsertRowsNode, tsFileProcessor);
+ tsFileProcessor.insertRows(subInsertRowsNode, infoForMetrics);
+ }
+ return tsFileProcessor;
+ }
+
private void tryToUpdateInsertRowsLastCache(List<InsertRowNode> nodeList) {
for (InsertRowNode node : nodeList) {
node.updateLastCache(databaseName);
@@ -3384,7 +3431,9 @@ public class DataRegion implements IDataRegionForQuery {
TsFileProcessor tsFileProcessor = entry.getKey();
InsertRowsNode subInsertRowsNode = entry.getValue();
try {
- tsFileProcessor.insertRows(subInsertRowsNode, infoForMetrics);
+ tsFileProcessor =
+ insertRowsWithTypeConsistencyCheck(
+ tsFileProcessor, subInsertRowsNode, infoForMetrics);
} catch (WriteProcessException e) {
insertRowsOfOneDeviceNode
.getResults()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index 027de99166b..4076c2aad7c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -29,11 +29,15 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.DataTypeInconsistentException;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils;
import org.apache.iotdb.db.storageengine.dataregion.flush.FlushStatus;
@@ -1057,4 +1061,30 @@ public abstract class AbstractMemTable implements
IMemTable {
public boolean isTotallyGeneratedByPipe() {
return this.isTotallyGeneratedByPipe.get();
}
+
+ @Override
+ public void checkDataType(InsertNode node) throws
DataTypeInconsistentException {
+ if (node instanceof InsertRowsNode) {
+ List<InsertRowNode> insertRowNodeList = ((InsertRowsNode)
node).getInsertRowNodeList();
+ for (InsertRowNode insertRowNode : insertRowNodeList) {
+ doCheckDataType(insertRowNode);
+ }
+ } else if (node instanceof InsertMultiTabletsNode) {
+ List<InsertTabletNode> insertTabletNodeList =
+ ((InsertMultiTabletsNode) node).getInsertTabletNodeList();
+ for (InsertTabletNode insertTabletNode : insertTabletNodeList) {
+ doCheckDataType(insertTabletNode);
+ }
+ } else {
+ doCheckDataType(node);
+ }
+ }
+
+ private void doCheckDataType(InsertNode node) throws
DataTypeInconsistentException {
+ IWritableMemChunkGroup memChunkGroup = memTableMap.get(node.getDeviceID());
+ if (memChunkGroup == null) {
+ return;
+ }
+ memChunkGroup.checkDataType(node);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index 3c906104c17..4beb010e287 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -20,6 +20,8 @@
package org.apache.iotdb.db.storageengine.dataregion.memtable;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.exception.DataTypeInconsistentException;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
@@ -558,4 +560,21 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
public boolean isAllDeleted() {
return list.isAllDeleted();
}
+
+ public void checkDataType(InsertNode node) throws
DataTypeInconsistentException {
+ for (MeasurementSchema incomingSchema : node.getMeasurementSchemas()) {
+ if (incomingSchema == null) {
+ continue;
+ }
+
+ Integer index =
measurementIndexMap.get(incomingSchema.getMeasurementName());
+ if (index != null) {
+ IMeasurementSchema existingSchema = schemaList.get(index);
+ if (existingSchema.getType() != incomingSchema.getType()) {
+ throw new DataTypeInconsistentException(
+ existingSchema.getType(), incomingSchema.getType());
+ }
+ }
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
index 609c37c3581..9d5acb038a0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.db.exception.DataTypeInconsistentException;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
@@ -162,4 +164,9 @@ public class AlignedWritableMemChunkGroup implements
IWritableMemChunkGroup {
memChunkGroup.memChunk = AlignedWritableMemChunk.deserialize(stream,
isTableModel);
return memChunkGroup;
}
+
+ @Override
+ public void checkDataType(InsertNode node) throws
DataTypeInconsistentException {
+ memChunk.checkDataType(node);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
index 35d9ad0e95c..e5494cf2391 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
@@ -21,9 +21,11 @@ package
org.apache.iotdb.db.storageengine.dataregion.memtable;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.IFullPath;
+import org.apache.iotdb.db.exception.DataTypeInconsistentException;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.storageengine.dataregion.flush.FlushStatus;
@@ -202,4 +204,6 @@ public interface IMemTable extends WALEntryValue {
void markAsNotGeneratedByPipe();
boolean isTotallyGeneratedByPipe();
+
+ void checkDataType(InsertNode node) throws DataTypeInconsistentException;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
index a2d23dc9db0..b4bee0a3fae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
@@ -20,6 +20,8 @@
package org.apache.iotdb.db.storageengine.dataregion.memtable;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.exception.DataTypeInconsistentException;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
@@ -60,4 +62,6 @@ public interface IWritableMemChunkGroup extends WALEntryValue
{
long getCurrentTVListSize(String measurement);
long getMaxTime();
+
+ void checkDataType(InsertNode node) throws DataTypeInconsistentException;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 21e8b751550..4467b360b22 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -269,6 +269,8 @@ public class TsFileProcessor {
throws WriteProcessException {
ensureMemTable(infoForMetrics);
+ workMemTable.checkDataType(insertRowNode);
+
long[] memIncrements;
long memControlStartTime = System.nanoTime();
@@ -349,6 +351,7 @@ public class TsFileProcessor {
throws WriteProcessException {
ensureMemTable(infoForMetrics);
+ workMemTable.checkDataType(insertRowsNode);
long[] memIncrements;
@@ -541,6 +544,7 @@ public class TsFileProcessor {
throws WriteProcessException {
ensureMemTable(infoForMetrics);
+ workMemTable.checkDataType(insertTabletNode);
long[] memIncrements =
scheduleMemoryBlock(insertTabletNode, rangeList, results, noFailure,
infoForMetrics);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
index ac5d588eb4f..c271087f050 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
@@ -20,6 +20,8 @@
package org.apache.iotdb.db.storageengine.dataregion.memtable;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.exception.DataTypeInconsistentException;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
@@ -27,6 +29,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
import java.io.DataInputStream;
import java.io.IOException;
@@ -199,4 +202,19 @@ public class WritableMemChunkGroup implements
IWritableMemChunkGroup {
}
return memChunkGroup;
}
+
+ @Override
+ public void checkDataType(InsertNode node) throws
DataTypeInconsistentException {
+ for (MeasurementSchema incomingSchema : node.getMeasurementSchemas()) {
+ if (incomingSchema == null) {
+ continue;
+ }
+
+ IWritableMemChunk memChunk =
memChunkMap.get(incomingSchema.getMeasurementName());
+ if (memChunk != null && memChunk.getTVList().getDataType() !=
incomingSchema.getType()) {
+ throw new DataTypeInconsistentException(
+ memChunk.getTVList().getDataType(), incomingSchema.getType());
+ }
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/AlterOrDropTableOperationType.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/AlterOrDropTableOperationType.java
index b22575f5c46..6dafc77d303 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/AlterOrDropTableOperationType.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/AlterOrDropTableOperationType.java
@@ -53,6 +53,8 @@ public enum AlterOrDropTableOperationType {
return RENAME_TABLE;
case 5:
return DROP_TABLE;
+ case 6:
+ return ALTER_COLUMN_DATA_TYPE;
default:
throw new IllegalArgumentException();
}