This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty/TableModelGrammar by this
push:
new 69af53e57a1 [Table Model] Implement table column extension
69af53e57a1 is described below
commit 69af53e57a1975ad4eebee263b90fed5685bcd21
Author: Marcos_Zyk <[email protected]>
AuthorDate: Thu Jun 13 09:37:44 2024 +0800
[Table Model] Implement table column extension
---
.../consensus/request/ConfigPhysicalPlan.java | 4 +
.../consensus/request/ConfigPhysicalPlanType.java | 1 +
.../request/write/table/AddTableColumnPlan.java | 93 ++++++
.../iotdb/confignode/manager/ConfigManager.java | 16 +
.../iotdb/confignode/manager/ProcedureManager.java | 54 ++++
.../manager/schema/ClusterSchemaManager.java | 59 ++++
.../persistence/executor/ConfigPlanExecutor.java | 3 +
.../persistence/schema/ClusterSchemaInfo.java | 36 +++
.../confignode/persistence/schema/ConfigMTree.java | 39 ++-
.../impl/schema/table/AddTableColumnProcedure.java | 329 +++++++++++++++++++++
.../state/schema/AddTableColumnState.java} | 18 +-
.../procedure/store/ProcedureFactory.java | 6 +
.../confignode/procedure/store/ProcedureType.java | 2 +
.../thrift/ConfigNodeRPCServiceProcessor.java | 6 +
.../iotdb/db/protocol/client/ConfigNodeClient.java | 7 +
.../impl/DataNodeInternalRPCServiceImpl.java | 22 ++
.../config/executor/ClusterConfigTaskExecutor.java | 49 +++
.../config/executor/IConfigTaskExecutor.java | 9 +
.../relational/AlterTableAddColumnTask.java | 96 ++++++
.../db/schemaengine/table/DataNodeTableCache.java | 64 ++++
.../iotdb/db/schemaengine/table/ITableCache.java | 12 +
.../schema/table/AlterTableOperationType.java} | 26 +-
.../apache/iotdb/commons/schema/table/TsTable.java | 14 +
.../schema/table/TsTableInternalRPCType.java | 12 +-
.../table/column/TsTableColumnSchemaUtil.java | 37 +++
.../src/main/thrift/confignode.thrift | 11 +
26 files changed, 1003 insertions(+), 22 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index 1ddd33aca48..7ac66ac2219 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -115,6 +115,7 @@ import
org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePla
import
org.apache.iotdb.confignode.consensus.request.write.sync.RecordPipeMessagePlan;
import
org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlanV1;
import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlanV1;
+import
org.apache.iotdb.confignode.consensus.request.write.table.AddTableColumnPlan;
import
org.apache.iotdb.confignode.consensus.request.write.table.CommitCreateTablePlan;
import
org.apache.iotdb.confignode.consensus.request.write.table.PreCreateTablePlan;
import
org.apache.iotdb.confignode.consensus.request.write.table.RollbackCreateTablePlan;
@@ -408,6 +409,9 @@ public abstract class ConfigPhysicalPlan implements
IConsensusRequest {
case CommitCreateTable:
plan = new CommitCreateTablePlan();
break;
+ case AddTableColumn:
+ plan = new AddTableColumnPlan();
+ break;
case GetNodePathsPartition:
plan = new GetNodePathsPartitionPlan();
break;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index 66edc095bbc..dd91aef20c2 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -152,6 +152,7 @@ public enum ConfigPhysicalPlanType {
PreCreateTable((short) 850),
RollbackCreateTable((short) 851),
CommitCreateTable((short) 852),
+ AddTableColumn((short) 853),
/** Deprecated types for sync, restored them for upgrade. */
@Deprecated
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/table/AddTableColumnPlan.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/table/AddTableColumnPlan.java
new file mode 100644
index 00000000000..eda7a26af74
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/table/AddTableColumnPlan.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.write.table;
+
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchemaUtil;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class AddTableColumnPlan extends ConfigPhysicalPlan {
+
+ private String database;
+
+ private String tableName;
+
+ private List<TsTableColumnSchema> columnSchemaList;
+
+ private boolean isRollback;
+
+ public AddTableColumnPlan() {
+ super(ConfigPhysicalPlanType.AddTableColumn);
+ }
+
+ public AddTableColumnPlan(
+ String database,
+ String tableName,
+ List<TsTableColumnSchema> columnSchemaList,
+ boolean isRollback) {
+ super(ConfigPhysicalPlanType.AddTableColumn);
+ this.database = database;
+ this.tableName = tableName;
+ this.columnSchemaList = columnSchemaList;
+ this.isRollback = isRollback;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public List<TsTableColumnSchema> getColumnSchemaList() {
+ return columnSchemaList;
+ }
+
+ public boolean isRollback() {
+ return isRollback;
+ }
+
+ @Override
+ protected void serializeImpl(DataOutputStream stream) throws IOException {
+ stream.writeShort(getType().getPlanType());
+
+ ReadWriteIOUtils.write(database, stream);
+ ReadWriteIOUtils.write(tableName, stream);
+ TsTableColumnSchemaUtil.serialize(columnSchemaList, stream);
+ ReadWriteIOUtils.write(isRollback, stream);
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ this.database = ReadWriteIOUtils.readString(buffer);
+ this.tableName = ReadWriteIOUtils.readString(buffer);
+ this.columnSchemaList =
TsTableColumnSchemaUtil.deserializeColumnSchemaList(buffer);
+ this.isRollback = ReadWriteIOUtils.readBool(buffer);
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 4a3a0f7cf3b..c50977afab7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.path.PathPatternUtil;
import
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.commons.schema.table.AlterTableOperationType;
import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.schema.table.TsTableInternalRPCUtil;
import org.apache.iotdb.commons.schema.ttl.TTLCache;
@@ -123,6 +124,7 @@ import
org.apache.iotdb.confignode.procedure.impl.schema.SchemaUtils;
import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterSchemaTemplateReq;
+import org.apache.iotdb.confignode.rpc.thrift.TAlterTableReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthizedPatternTreeResp;
import org.apache.iotdb.confignode.rpc.thrift.TCloseConsumerReq;
import org.apache.iotdb.confignode.rpc.thrift.TClusterParameters;
@@ -2385,4 +2387,18 @@ public class ConfigManager implements IManager {
return status;
}
}
+
+ public TSStatus alterTable(TAlterTableReq req) {
+ TSStatus status = confirmLeader();
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ switch (AlterTableOperationType.getType(req.operationType)) {
+ case ADD_COLUMN:
+ return procedureManager.alterTableAddColumn(req);
+ default:
+ throw new IllegalArgumentException();
+ }
+ } else {
+ return status;
+ }
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 1df15ebb0e7..7e5b74163ce 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -34,6 +34,8 @@ import org.apache.iotdb.commons.path.PathDeserializeUtil;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
import org.apache.iotdb.commons.schema.table.TsTable;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchemaUtil;
import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.trigger.TriggerInformation;
@@ -78,6 +80,7 @@ import
org.apache.iotdb.confignode.procedure.impl.schema.DeleteTimeSeriesProcedu
import org.apache.iotdb.confignode.procedure.impl.schema.SetTTLProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.SetTemplateProcedure;
import
org.apache.iotdb.confignode.procedure.impl.schema.UnsetTemplateProcedure;
+import
org.apache.iotdb.confignode.procedure.impl.schema.table.AddTableColumnProcedure;
import
org.apache.iotdb.confignode.procedure.impl.schema.table.CreateTableProcedure;
import
org.apache.iotdb.confignode.procedure.impl.subscription.consumer.CreateConsumerProcedure;
import
org.apache.iotdb.confignode.procedure.impl.subscription.consumer.DropConsumerProcedure;
@@ -100,6 +103,7 @@ import
org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TAlterTableReq;
import org.apache.iotdb.confignode.rpc.thrift.TCloseConsumerReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
@@ -1290,6 +1294,56 @@ public class ProcedureManager {
}
}
+ public TSStatus alterTableAddColumn(TAlterTableReq req) {
+ String database = req.database;
+ String tableName = req.tableName;
+ String queryId = req.queryId;
+ List<TsTableColumnSchema> columnSchemaList =
+ TsTableColumnSchemaUtil.deserializeColumnSchemaList(req.updateInfo);
+
+ long procedureId = -1;
+ synchronized (this) {
+ boolean hasOverlappedTask = false;
+ ProcedureType type;
+ AddTableColumnProcedure addTableColumnProcedure;
+ for (Procedure<?> procedure : executor.getProcedures().values()) {
+ type = ProcedureFactory.getProcedureType(procedure);
+ if (type == null ||
!type.equals(ProcedureType.ADD_TABLE_COLUMN_PROCEDURE)) {
+ continue;
+ }
+ addTableColumnProcedure = (AddTableColumnProcedure) procedure;
+ if (queryId.equals(addTableColumnProcedure.getQueryId())) {
+ procedureId = addTableColumnProcedure.getProcId();
+ break;
+ }
+ if (database.equals(addTableColumnProcedure.getDatabase())
+ && tableName.equals(addTableColumnProcedure.getTableName())) {
+ hasOverlappedTask = true;
+ break;
+ }
+ }
+
+ if (procedureId == -1) {
+ if (hasOverlappedTask) {
+ return RpcUtils.getStatus(
+ TSStatusCode.OVERLAP_WITH_EXISTING_TASK,
+ "Some other task dropping table with same name.");
+ }
+ procedureId =
+ this.executor.submitProcedure(
+ new AddTableColumnProcedure(database, tableName, queryId,
columnSchemaList));
+ }
+ }
+ List<TSStatus> procedureStatus = new ArrayList<>();
+ boolean isSucceed =
+ waitingProcedureFinished(Collections.singletonList(procedureId),
procedureStatus);
+ if (isSucceed) {
+ return StatusUtils.OK;
+ } else {
+ return procedureStatus.get(0);
+ }
+ }
+
// ======================================================
/*
GET-SET Region
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
index 8faf8595ab0..36c8db18564 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
@@ -28,7 +28,9 @@ import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.schema.table.TsTableInternalRPCUtil;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.commons.utils.PathUtils;
@@ -52,6 +54,7 @@ import
org.apache.iotdb.confignode.consensus.request.write.database.SetDataRepli
import
org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import
org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeEnrichedPlan;
+import
org.apache.iotdb.confignode.consensus.request.write.table.AddTableColumnPlan;
import
org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
import
org.apache.iotdb.confignode.consensus.request.write.template.DropSchemaTemplatePlan;
import
org.apache.iotdb.confignode.consensus.request.write.template.ExtendSchemaTemplatePlan;
@@ -1110,6 +1113,62 @@ public class ClusterSchemaManager {
schemaQuotaStatistics.setSeriesThreshold(seriesThreshold);
}
+ public TsTable getTable(String database, String tableName) {
+ return clusterSchemaInfo.getTsTable(database, tableName);
+ }
+
+ public synchronized Pair<TSStatus, List<TsTableColumnSchema>>
tableColumnCheckForColumnExtension(
+ String database, String tableName, List<TsTableColumnSchema>
columnSchemaList) {
+ Map<String, List<TsTable>> currentUsingTable =
clusterSchemaInfo.getAllUsingTables();
+ TsTable targetTable = null;
+ for (TsTable table : currentUsingTable.get(database)) {
+ if (table.getTableName().equals(tableName)) {
+ targetTable = table;
+ break;
+ }
+ }
+
+ if (targetTable == null) {
+ return new Pair<>(
+ RpcUtils.getStatus(
+ TSStatusCode.TABLE_NOT_EXISTS,
+ String.format("Table %s.%s not exist", database, tableName)),
+ null);
+ }
+
+ List<TsTableColumnSchema> copiedList = new ArrayList<>();
+ for (TsTableColumnSchema columnSchema : columnSchemaList) {
+ if (targetTable.getColumnSchema(columnSchema.getColumnName()) == null) {
+ copiedList.add(columnSchema);
+ }
+ }
+ return new Pair<>(RpcUtils.SUCCESS_STATUS, copiedList);
+ }
+
+ public synchronized TSStatus addTableColumn(
+ String database, String tableName, List<TsTableColumnSchema>
columnSchemaList) {
+ AddTableColumnPlan addTableColumnPlan =
+ new AddTableColumnPlan(database, tableName, columnSchemaList, false);
+ try {
+ return getConsensusManager().write(addTableColumnPlan);
+ } catch (ConsensusException e) {
+ LOGGER.warn(e.getMessage(), e);
+ return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
+ }
+
+ public synchronized TSStatus rollbackAddTableColumn(
+ String database, String tableName, List<TsTableColumnSchema>
columnSchemaList) {
+ AddTableColumnPlan addTableColumnPlan =
+ new AddTableColumnPlan(database, tableName, columnSchemaList, true);
+ try {
+ return getConsensusManager().write(addTableColumnPlan);
+ } catch (ConsensusException e) {
+ LOGGER.warn(e.getMessage(), e);
+ return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR,
e.getMessage());
+ }
+ }
+
public void clearSchemaQuotaCache() {
schemaQuotaStatistics.clear();
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 9079c1ee55b..5d258d2b1a5 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -98,6 +98,7 @@ import
org.apache.iotdb.confignode.consensus.request.write.subscription.topic.Al
import
org.apache.iotdb.confignode.consensus.request.write.subscription.topic.CreateTopicPlan;
import
org.apache.iotdb.confignode.consensus.request.write.subscription.topic.DropTopicPlan;
import
org.apache.iotdb.confignode.consensus.request.write.subscription.topic.runtime.TopicHandleMetaChangePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.table.AddTableColumnPlan;
import
org.apache.iotdb.confignode.consensus.request.write.table.CommitCreateTablePlan;
import
org.apache.iotdb.confignode.consensus.request.write.table.PreCreateTablePlan;
import
org.apache.iotdb.confignode.consensus.request.write.table.RollbackCreateTablePlan;
@@ -467,6 +468,8 @@ public class ConfigPlanExecutor {
return clusterSchemaInfo.rollbackCreateTable((RollbackCreateTablePlan)
physicalPlan);
case CommitCreateTable:
return clusterSchemaInfo.commitCreateTable((CommitCreateTablePlan)
physicalPlan);
+ case AddTableColumn:
+ return clusterSchemaInfo.addTableColumn((AddTableColumnPlan)
physicalPlan);
case CreatePipeV2:
return pipeInfo.createPipe((CreatePipePlanV2) physicalPlan);
case SetPipeStatusV2:
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfo.java
index db3487c8344..3056b5b0012 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfo.java
@@ -44,6 +44,7 @@ import
org.apache.iotdb.confignode.consensus.request.write.database.DeleteDataba
import
org.apache.iotdb.confignode.consensus.request.write.database.SetDataReplicationFactorPlan;
import
org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import
org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
+import
org.apache.iotdb.confignode.consensus.request.write.table.AddTableColumnPlan;
import
org.apache.iotdb.confignode.consensus.request.write.table.CommitCreateTablePlan;
import
org.apache.iotdb.confignode.consensus.request.write.table.PreCreateTablePlan;
import
org.apache.iotdb.confignode.consensus.request.write.table.RollbackCreateTablePlan;
@@ -1091,6 +1092,41 @@ public class ClusterSchemaInfo implements
SnapshotProcessor {
}
}
+ public TsTable getTsTable(String database, String tableName) {
+ databaseReadWriteLock.readLock().lock();
+ try {
+ return mTree.getTable(new PartialPath(new String[] {ROOT, database}),
tableName);
+ } catch (MetadataException e) {
+ LOGGER.warn(e.getMessage(), e);
+ throw new RuntimeException(e);
+ } finally {
+ databaseReadWriteLock.readLock().unlock();
+ }
+ }
+
+ public TSStatus addTableColumn(AddTableColumnPlan plan) {
+ databaseReadWriteLock.writeLock().lock();
+ try {
+ if (plan.isRollback()) {
+ mTree.rollbackAddTableColumn(
+ new PartialPath(new String[] {ROOT, plan.getDatabase()}),
+ plan.getTableName(),
+ plan.getColumnSchemaList());
+ } else {
+ mTree.addTableColumn(
+ new PartialPath(new String[] {ROOT, plan.getDatabase()}),
+ plan.getTableName(),
+ plan.getColumnSchemaList());
+ }
+ return RpcUtils.SUCCESS_STATUS;
+ } catch (MetadataException e) {
+ LOGGER.warn(e.getMessage(), e);
+ return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
+ } finally {
+ databaseReadWriteLock.writeLock().unlock();
+ }
+ }
+
// endregion
@TestOnly
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ConfigMTree.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ConfigMTree.java
index d3e7acb5c5e..76ded35c73f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ConfigMTree.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ConfigMTree.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
import org.apache.iotdb.commons.schema.node.utils.IMNodeIterator;
import org.apache.iotdb.commons.schema.table.TsTable;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
import org.apache.iotdb.commons.utils.ThriftConfigNodeSerDeUtils;
import org.apache.iotdb.confignode.persistence.schema.mnode.IConfigMNode;
import
org.apache.iotdb.confignode.persistence.schema.mnode.factory.ConfigMNodeFactory;
@@ -703,6 +704,42 @@ public class ConfigMTree {
return result;
}
+ public TsTable getTable(PartialPath database, String tableName) throws
MetadataException {
+ IConfigMNode databaseNode =
getDatabaseNodeByDatabasePath(database).getAsMNode();
+ if (!databaseNode.hasChild(tableName)) {
+ throw new TableNotExistsException(
+ database.getFullPath().substring(ROOT.length() + 1), tableName);
+ }
+ ConfigTableNode tableNode = (ConfigTableNode)
databaseNode.getChild(tableName);
+ return tableNode.getTable();
+ }
+
+ public void addTableColumn(
+ PartialPath database, String tableName, List<TsTableColumnSchema>
columnSchemaList)
+ throws MetadataException {
+ IConfigMNode databaseNode =
getDatabaseNodeByDatabasePath(database).getAsMNode();
+ if (!databaseNode.hasChild(tableName)) {
+ throw new TableNotExistsException(
+ database.getFullPath().substring(ROOT.length() + 1), tableName);
+ }
+ ConfigTableNode tableNode = (ConfigTableNode)
databaseNode.getChild(tableName);
+ TsTable table = tableNode.getTable();
+ columnSchemaList.forEach(table::addColumnSchema);
+ }
+
+ public void rollbackAddTableColumn(
+ PartialPath database, String tableName, List<TsTableColumnSchema>
columnSchemaList)
+ throws MetadataException {
+ IConfigMNode databaseNode =
getDatabaseNodeByDatabasePath(database).getAsMNode();
+ if (!databaseNode.hasChild(tableName)) {
+ throw new TableNotExistsException(
+ database.getFullPath().substring(ROOT.length() + 1), tableName);
+ }
+ ConfigTableNode tableNode = (ConfigTableNode)
databaseNode.getChild(tableName);
+ TsTable table = tableNode.getTable();
+ columnSchemaList.forEach(o -> table.removeColumnSchema(o.getColumnName()));
+ }
+
// endregion
// region Serialization and Deserialization
@@ -726,7 +763,7 @@ public class ConfigMTree {
if (child.isDatabase()) {
serializeDatabaseNode(child.getAsDatabaseMNode(), outputStream);
} else if (child instanceof ConfigTableNode) {
- serializeTableNode((ConfigTableNode) node, outputStream);
+ serializeTableNode((ConfigTableNode) child, outputStream);
} else {
serializeConfigBasicMNode(child, outputStream);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/AddTableColumnProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/AddTableColumnProcedure.java
new file mode 100644
index 00000000000..74fbfaa1728
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/AddTableColumnProcedure.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.schema.table;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.schema.table.TsTableInternalRPCType;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchemaUtil;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import
org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
+import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
+import org.apache.iotdb.confignode.procedure.state.schema.AddTableColumnState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.apache.iotdb.mpp.rpc.thrift.TUpdateTableReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class AddTableColumnProcedure
+ extends StateMachineProcedure<ConfigNodeProcedureEnv, AddTableColumnState>
{
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AddTableColumnProcedure.class);
+
+ private String database;
+
+ private String tableName;
+
+ private String queryId;
+
+ private List<TsTableColumnSchema> inputColumnList;
+
+ private List<TsTableColumnSchema> actualAddedColumnList;
+
+ public AddTableColumnProcedure() {}
+
+ public AddTableColumnProcedure(
+ String database,
+ String tableName,
+ String queryId,
+ List<TsTableColumnSchema> inputColumnList) {
+ this.database = database;
+ this.tableName = tableName;
+ this.queryId = queryId;
+ this.inputColumnList = inputColumnList;
+ }
+
+ @Override
+ protected Flow executeFromState(ConfigNodeProcedureEnv env,
AddTableColumnState state)
+ throws ProcedureSuspendedException, ProcedureYieldException,
InterruptedException {
+ long startTime = System.currentTimeMillis();
+ try {
+ switch (state) {
+ case COLUMN_CHECK:
+ LOGGER.info("Column check for table {}.{} when adding column",
database, tableName);
+ columnCheck(env);
+ break;
+ case PRE_RELEASE:
+ LOGGER.info("Pre release info of table {}.{} when adding column",
database, tableName);
+ preRelease(env);
+ break;
+ case COMMIT_RELEASE:
+ LOGGER.info("Commit release info of table {}.{} when adding column",
database, tableName);
+ commitRelease(env);
+ break;
+ case ADD_COLUMN:
+ LOGGER.info("Add column to table {}.{}", database, tableName);
+ addColumn(env);
+ return Flow.NO_MORE_STATE;
+ default:
+ setFailure(new ProcedureException("Unrecognized AddTableColumnState
" + state));
+ return Flow.NO_MORE_STATE;
+ }
+ return Flow.HAS_MORE_STATE;
+ } finally {
+ LOGGER.info(
+ "AddTableColumn-{}.{}-{} costs {}ms",
+ database,
+ tableName,
+ state,
+ (System.currentTimeMillis() - startTime));
+ }
+ }
+
+ private void columnCheck(ConfigNodeProcedureEnv env) {
+ Pair<TSStatus, List<TsTableColumnSchema>> result =
+ env.getConfigManager()
+ .getClusterSchemaManager()
+ .tableColumnCheckForColumnExtension(database, tableName,
inputColumnList);
+ TSStatus status = result.getLeft();
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ setFailure(new ProcedureException(new
IoTDBException(status.getMessage(), status.getCode())));
+ return;
+ }
+ actualAddedColumnList = result.getRight();
+ setNextState(AddTableColumnState.PRE_RELEASE);
+ }
+
+ private void preRelease(ConfigNodeProcedureEnv env) {
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
+
+ TUpdateTableReq req =
+ new TUpdateTableReq(
+ TsTableInternalRPCType.PRE_ADD_COLUMN.getOperationType(),
getCacheRequestInfo());
+
+ AsyncClientHandler<TUpdateTableReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(DataNodeRequestType.UPDATE_TABLE, req,
dataNodeLocationMap);
+
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
+ for (TSStatus status : statusMap.values()) {
+ // all dataNodes must clear the related schema cache
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.warn(
+ "Failed to pre-release column extension info of table {}.{}",
database, tableName);
+ setFailure(
+ new ProcedureException(
+ new MetadataException("Pre-release table column extension info
failed")));
+ return;
+ }
+ }
+ setNextState(AddTableColumnState.COMMIT_RELEASE);
+ }
+
+ private void commitRelease(ConfigNodeProcedureEnv env) {
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
+
+ TUpdateTableReq req =
+ new TUpdateTableReq(
+ TsTableInternalRPCType.COMMIT_ADD_COLUMN.getOperationType(),
getCacheRequestInfo());
+
+ AsyncClientHandler<TUpdateTableReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(DataNodeRequestType.UPDATE_TABLE, req,
dataNodeLocationMap);
+
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
+ for (TSStatus status : statusMap.values()) {
+ // all dataNodes must clear the related schema cache
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.warn("Failed to commit column extension info of table {}.{}",
database, tableName);
+ // todo async retry until success
+ return;
+ }
+ }
+ setNextState(AddTableColumnState.ADD_COLUMN);
+ }
+
+ private void addColumn(ConfigNodeProcedureEnv env) {
+ TSStatus status =
+ env.getConfigManager()
+ .getClusterSchemaManager()
+ .addTableColumn(database, tableName, inputColumnList);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ setFailure(new ProcedureException(new
IoTDBException(status.getMessage(), status.getCode())));
+ }
+ }
+
+ @Override
+ protected void rollbackState(ConfigNodeProcedureEnv env, AddTableColumnState
state)
+ throws IOException, InterruptedException, ProcedureException {
+ long startTime = System.currentTimeMillis();
+ try {
+ switch (state) {
+ case ADD_COLUMN:
+ rollbackAddColumn(env);
+ break;
+ case PRE_RELEASE:
+ rollbackUpdateCache(env);
+ break;
+ }
+ } finally {
+ LOGGER.info(
+ "Rollback DropTable-{} costs {}ms.", state,
(System.currentTimeMillis() - startTime));
+ }
+ }
+
+ private void rollbackAddColumn(ConfigNodeProcedureEnv env) {
+ if (actualAddedColumnList == null) {
+ return;
+ }
+ TSStatus status =
+ env.getConfigManager()
+ .getClusterSchemaManager()
+ .rollbackAddTableColumn(database, tableName,
actualAddedColumnList);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ setFailure(new ProcedureException(new
IoTDBException(status.getMessage(), status.getCode())));
+ }
+ }
+
+ private void rollbackUpdateCache(ConfigNodeProcedureEnv env) {
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
+
+ TUpdateTableReq req =
+ new TUpdateTableReq(
+ TsTableInternalRPCType.ROLLBACK_ADD_COLUMN.getOperationType(),
getCacheRequestInfo());
+
+ AsyncClientHandler<TUpdateTableReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(DataNodeRequestType.UPDATE_TABLE, req,
dataNodeLocationMap);
+
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
+ for (TSStatus status : statusMap.values()) {
+ // all dataNodes must clear the related schema cache
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.warn("Failed to rollback cache of table {}.{}", database,
tableName);
+ setFailure(new ProcedureException(new MetadataException("Rollback
table cache failed")));
+ return;
+ }
+ }
+ }
+
+ private ByteBuffer getCacheRequestInfo() {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ try {
+ ReadWriteIOUtils.write(database, stream);
+ ReadWriteIOUtils.write(tableName, stream);
+
+ TsTableColumnSchemaUtil.serialize(actualAddedColumnList, stream);
+ } catch (IOException ignored) {
+ // won't happen
+ }
+ return ByteBuffer.wrap(stream.toByteArray());
+ }
+
+ @Override
+ protected AddTableColumnState getState(int stateId) {
+ return AddTableColumnState.values()[stateId];
+ }
+
+ @Override
+ protected int getStateId(AddTableColumnState state) {
+ return state.ordinal();
+ }
+
+ @Override
+ protected AddTableColumnState getInitialState() {
+ return AddTableColumnState.ADD_COLUMN;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public String getQueryId() {
+ return queryId;
+ }
+
+ public List<TsTableColumnSchema> getInputColumnList() {
+ return inputColumnList;
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ stream.writeShort(ProcedureType.ADD_TABLE_COLUMN_PROCEDURE.getTypeCode());
+ super.serialize(stream);
+
+ ReadWriteIOUtils.write(database, stream);
+ ReadWriteIOUtils.write(tableName, stream);
+ ReadWriteIOUtils.write(queryId, stream);
+
+ TsTableColumnSchemaUtil.serialize(inputColumnList, stream);
+ TsTableColumnSchemaUtil.serialize(actualAddedColumnList, stream);
+ }
+
+ @Override
+ public void deserialize(ByteBuffer byteBuffer) {
+ super.deserialize(byteBuffer);
+ this.database = ReadWriteIOUtils.readString(byteBuffer);
+ this.tableName = ReadWriteIOUtils.readString(byteBuffer);
+ this.queryId = ReadWriteIOUtils.readString(byteBuffer);
+
+ this.inputColumnList =
TsTableColumnSchemaUtil.deserializeColumnSchemaList(byteBuffer);
+ this.actualAddedColumnList =
TsTableColumnSchemaUtil.deserializeColumnSchemaList(byteBuffer);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof AddTableColumnProcedure)) return false;
+ AddTableColumnProcedure that = (AddTableColumnProcedure) o;
+ return Objects.equals(database, that.database)
+ && Objects.equals(tableName, that.tableName)
+ && Objects.equals(queryId, that.queryId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(database, tableName, queryId);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/ITableCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/AddTableColumnState.java
similarity index 69%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/ITableCache.java
copy to
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/AddTableColumnState.java
index 16d26300473..3d4456e2de2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/ITableCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/AddTableColumnState.java
@@ -17,17 +17,11 @@
* under the License.
*/
-package org.apache.iotdb.db.schemaengine.table;
+package org.apache.iotdb.confignode.procedure.state.schema;
-import org.apache.iotdb.commons.schema.table.TsTable;
-
-public interface ITableCache {
-
- void init(byte[] tableInitializationBytes);
-
- void preCreateTable(String database, TsTable table);
-
- void rollbackCreateTable(String database, String tableName);
-
- void commitCreateTable(String database, String tableName);
+public enum AddTableColumnState {
+ COLUMN_CHECK,
+ PRE_RELEASE,
+ COMMIT_RELEASE,
+ ADD_COLUMN
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index 30caa199e03..607e812ea4d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -46,6 +46,7 @@ import
org.apache.iotdb.confignode.procedure.impl.schema.DeleteTimeSeriesProcedu
import org.apache.iotdb.confignode.procedure.impl.schema.SetTTLProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.SetTemplateProcedure;
import
org.apache.iotdb.confignode.procedure.impl.schema.UnsetTemplateProcedure;
+import
org.apache.iotdb.confignode.procedure.impl.schema.table.AddTableColumnProcedure;
import
org.apache.iotdb.confignode.procedure.impl.schema.table.CreateTableProcedure;
import
org.apache.iotdb.confignode.procedure.impl.subscription.consumer.AlterConsumerGroupProcedure;
import
org.apache.iotdb.confignode.procedure.impl.subscription.consumer.CreateConsumerProcedure;
@@ -182,6 +183,9 @@ public class ProcedureFactory implements IProcedureFactory {
case CREATE_TABLE_PROCEDURE:
procedure = new CreateTableProcedure();
break;
+ case ADD_TABLE_COLUMN_PROCEDURE:
+ procedure = new AddTableColumnProcedure();
+ break;
case CREATE_PIPE_PLUGIN_PROCEDURE:
procedure = new CreatePipePluginProcedure();
break;
@@ -315,6 +319,8 @@ public class ProcedureFactory implements IProcedureFactory {
return ProcedureType.UNSET_TEMPLATE_PROCEDURE;
} else if (procedure instanceof CreateTableProcedure) {
return ProcedureType.CREATE_TABLE_PROCEDURE;
+ } else if (procedure instanceof AddTableColumnProcedure) {
+ return ProcedureType.ADD_TABLE_COLUMN_PROCEDURE;
} else if (procedure instanceof CreatePipePluginProcedure) {
return ProcedureType.CREATE_PIPE_PLUGIN_PROCEDURE;
} else if (procedure instanceof DropPipePluginProcedure) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
index b67141f3796..637fb368b04 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
@@ -63,6 +63,8 @@ public enum ProcedureType {
SET_TEMPLATE_PROCEDURE((short) 702),
CREATE_TABLE_PROCEDURE((short) 750),
+ DROP_TABLE_PROCEDURE((short) 751),
+ ADD_TABLE_COLUMN_PROCEDURE((short) 752),
// ProcedureId 800-899 is used by IoTDB-Ml
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index a237c57d62c..d68430f3181 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -72,6 +72,7 @@ import
org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterSchemaTemplateReq;
+import org.apache.iotdb.confignode.rpc.thrift.TAlterTableReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthizedPatternTreeResp;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
@@ -1122,4 +1123,9 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
public TSStatus createTable(ByteBuffer tableInfo) throws TException {
return configManager.createTable(tableInfo);
}
+
+ @Override
+ public TSStatus alterTable(TAlterTableReq req) throws TException {
+ return configManager.alterTable(req);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index 4fff64feb2e..b8bb9a26f21 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -40,6 +40,7 @@ import
org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterSchemaTemplateReq;
+import org.apache.iotdb.confignode.rpc.thrift.TAlterTableReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthizedPatternTreeResp;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
@@ -1102,6 +1103,12 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
() -> client.createTable(tableInfo), status ->
!updateConfigNodeLeader(status));
}
+ @Override
+ public TSStatus alterTable(TAlterTableReq req) throws TException {
+ return executeRemoteCallWithRetry(
+ () -> client.alterTable(req), status ->
!updateConfigNodeLeader(status));
+ }
+
public static class Factory extends ThriftClientFactory<ConfigRegionId,
ConfigNodeClient> {
public Factory(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 174d91350d8..3fd8a43e064 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -51,6 +51,7 @@ import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.schema.table.TsTableInternalRPCType;
import org.apache.iotdb.commons.schema.table.TsTableInternalRPCUtil;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchemaUtil;
import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Tag;
@@ -1443,6 +1444,27 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
ReadWriteIOUtils.readString(req.tableInfo),
ReadWriteIOUtils.readString(req.tableInfo));
break;
+ case PRE_ADD_COLUMN:
+ DataNodeTableCache.getInstance()
+ .preAddTableColumn(
+ ReadWriteIOUtils.readString(req.tableInfo),
+ ReadWriteIOUtils.readString(req.tableInfo),
+
TsTableColumnSchemaUtil.deserializeColumnSchemaList(req.tableInfo));
+ break;
+ case COMMIT_ADD_COLUMN:
+ DataNodeTableCache.getInstance()
+ .commitAddTableColumn(
+ ReadWriteIOUtils.readString(req.tableInfo),
+ ReadWriteIOUtils.readString(req.tableInfo),
+
TsTableColumnSchemaUtil.deserializeColumnSchemaList(req.tableInfo));
+ break;
+ case ROLLBACK_ADD_COLUMN:
+ DataNodeTableCache.getInstance()
+ .rollbackAddColumn(
+ ReadWriteIOUtils.readString(req.tableInfo),
+ ReadWriteIOUtils.readString(req.tableInfo),
+
TsTableColumnSchemaUtil.deserializeColumnSchemaList(req.tableInfo));
+ break;
default:
LOGGER.warn("Unsupported type {} when updating table", req.type);
return RpcUtils.getStatus(TSStatusCode.ILLEGAL_PARAMETER);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 63ae1cf2a20..f8d7071fa8a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -45,8 +45,11 @@ import
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeT
import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoader;
import
org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.schema.table.AlterTableOperationType;
import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.schema.table.TsTableInternalRPCUtil;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchemaUtil;
import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
@@ -58,6 +61,7 @@ import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterSchemaTemplateReq;
+import org.apache.iotdb.confignode.rpc.thrift.TAlterTableReq;
import org.apache.iotdb.confignode.rpc.thrift.TCountDatabaseResp;
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListResp;
@@ -2900,6 +2904,51 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
return future;
}
+ @Override
+ public SettableFuture<ConfigTaskResult> alterTableAddColumn(
+ String database,
+ String tableName,
+ List<TsTableColumnSchema> columnSchemaList,
+ String queryId) {
+ final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try (ConfigNodeClient client =
+
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+ TSStatus tsStatus;
+ TAlterTableReq req = new TAlterTableReq();
+ req.setDatabase(database);
+ req.setTableName(tableName);
+ req.setQueryId(queryId);
+ req.setOperationType(AlterTableOperationType.ADD_COLUMN.getTypeValue());
+ req.setUpdateInfo(TsTableColumnSchemaUtil.serialize(columnSchemaList));
+
+ do {
+ try {
+ tsStatus = client.alterTable(req);
+ } catch (TTransportException e) {
+ if (e.getType() == TTransportException.TIMED_OUT
+ || e.getCause() instanceof SocketTimeoutException) {
+ // time out mainly caused by slow execution, wait until
+ tsStatus =
RpcUtils.getStatus(TSStatusCode.OVERLAP_WITH_EXISTING_TASK);
+ } else {
+ throw e;
+ }
+ }
+ // keep waiting until task ends
+ } while (TSStatusCode.OVERLAP_WITH_EXISTING_TASK.getStatusCode() ==
tsStatus.getCode());
+
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
+ LOGGER.warn(
+ "Failed to add column to table {}.{}, status is {}.", database,
tableName, tsStatus);
+ future.setException(new IoTDBException(tsStatus.getMessage(),
tsStatus.getCode()));
+ } else {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ }
+ } catch (ClientManagerException | TException e) {
+ future.setException(e);
+ }
+ return future;
+ }
+
public void handlePipeConfigClientExit(String clientId) {
try (final ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
index 01716c2da9a..794ed9ff393 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.schema.table.TsTable;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp;
import org.apache.iotdb.db.protocol.session.IClientSession;
@@ -85,6 +86,8 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import com.google.common.util.concurrent.SettableFuture;
+import java.util.List;
+
public interface IConfigTaskExecutor {
SettableFuture<ConfigTaskResult> setDatabase(DatabaseSchemaStatement
databaseSchemaStatement);
@@ -264,4 +267,10 @@ public interface IConfigTaskExecutor {
SettableFuture<ConfigTaskResult> describeTable(String database, String
tableName);
SettableFuture<ConfigTaskResult> showTables(String database);
+
+ SettableFuture<ConfigTaskResult> alterTableAddColumn(
+ String database,
+ String tableName,
+ List<TsTableColumnSchema> columnSchemaList,
+ String queryId);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/relational/AlterTableAddColumnTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/relational/AlterTableAddColumnTask.java
new file mode 100644
index 00000000000..febd54cc8a7
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/relational/AlterTableAddColumnTask.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational;
+
+import org.apache.iotdb.commons.schema.table.column.AttributeColumnSchema;
+import org.apache.iotdb.commons.schema.table.column.IdColumnSchema;
+import org.apache.iotdb.commons.schema.table.column.MeasurementColumnSchema;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
+import
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.type.BinaryType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static
org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
+
+public class AlterTableAddColumnTask implements IConfigTask {
+
+ private final String database;
+
+ private final String tableName;
+
+ private final List<TsTableColumnSchema> columnList;
+
+ private final String queryId;
+
+ public AlterTableAddColumnTask(
+ String database, String tableName, List<ColumnSchema> columnList, String
queryId) {
+ this.database = database;
+ this.tableName = tableName;
+ this.columnList = parseInputColumnSchema(columnList);
+ this.queryId = queryId;
+ }
+
+ @Override
+ public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor
configTaskExecutor)
+ throws InterruptedException {
+ return configTaskExecutor.alterTableAddColumn(database, tableName,
columnList, queryId);
+ }
+
+ private List<TsTableColumnSchema> parseInputColumnSchema(List<ColumnSchema>
inputColumnList) {
+ List<TsTableColumnSchema> columnSchemaList = new
ArrayList<>(inputColumnList.size());
+ for (ColumnSchema inputColumn : inputColumnList) {
+ switch (inputColumn.getColumnCategory()) {
+ case ID:
+ if (!inputColumn.getType().equals(BinaryType.TEXT)) {
+ throw new SemanticException("Id column only support data type
TEXT.");
+ }
+ columnSchemaList.add(new IdColumnSchema(inputColumn.getName(),
TSDataType.TEXT));
+ break;
+ case ATTRIBUTE:
+ if (!inputColumn.getType().equals(BinaryType.TEXT)) {
+ throw new SemanticException("Attribute column only support data
type TEXT.");
+ }
+ columnSchemaList.add(new
AttributeColumnSchema(inputColumn.getName(), TSDataType.TEXT));
+ break;
+ case MEASUREMENT:
+ TSDataType dataType =
InternalTypeManager.getTSDataType(inputColumn.getType());
+ columnSchemaList.add(
+ new MeasurementColumnSchema(
+ inputColumn.getName(),
+ dataType,
+ getDefaultEncoding(dataType),
+ TSFileDescriptor.getInstance().getConfig().getCompressor()));
+ break;
+ }
+ }
+ return columnSchemaList;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java
index 4b92c961c80..d1f4d75ba31 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.schemaengine.table;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.schema.table.TsTableInternalRPCUtil;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
@@ -45,6 +46,9 @@ public class DataNodeTableCache implements ITableCache {
private final Map<String, Map<String, TsTable>> preCreateTableMap = new
ConcurrentHashMap<>();
+ private final Map<String, Map<String, List<TsTableColumnSchema>>>
preAddColumnMap =
+ new ConcurrentHashMap<>();
+
private final ReentrantReadWriteLock readWriteLock = new
ReentrantReadWriteLock();
private DataNodeTableCache() {
@@ -146,6 +150,66 @@ public class DataNodeTableCache implements ITableCache {
}
}
+ @Override
+ public void preAddTableColumn(
+ String database, String tableName, List<TsTableColumnSchema>
columnSchemaList) {
+ readWriteLock.writeLock().lock();
+ try {
+ preAddColumnMap
+ .computeIfAbsent(database, k -> new ConcurrentHashMap<>())
+ .computeIfAbsent(tableName, k -> new ArrayList<>())
+ .addAll(columnSchemaList);
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void commitAddTableColumn(
+ String database, String tableName, List<TsTableColumnSchema>
columnSchemaList) {
+ readWriteLock.writeLock().lock();
+ try {
+ TsTable table = databaseTableMap.get(database).get(tableName);
+ columnSchemaList.forEach(table::addColumnSchema);
+ preAddColumnMap.compute(
+ database,
+ (k, v) -> {
+ if (v == null) {
+ throw new IllegalStateException();
+ }
+ v.remove(tableName);
+ if (v.isEmpty()) {
+ return null;
+ }
+ return v;
+ });
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void rollbackAddColumn(
+ String database, String tableName, List<TsTableColumnSchema>
columnSchemaList) {
+ readWriteLock.writeLock().lock();
+ try {
+ preAddColumnMap.compute(
+ database,
+ (k, v) -> {
+ if (v == null) {
+ throw new IllegalStateException();
+ }
+ v.remove(tableName);
+ if (v.isEmpty()) {
+ return null;
+ }
+ return v;
+ });
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ }
+
public TsTable getTable(String database, String tableName) {
readWriteLock.readLock().lock();
try {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/ITableCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/ITableCache.java
index 16d26300473..fdc1e38fa88 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/ITableCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/ITableCache.java
@@ -20,6 +20,9 @@
package org.apache.iotdb.db.schemaengine.table;
import org.apache.iotdb.commons.schema.table.TsTable;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
+
+import java.util.List;
public interface ITableCache {
@@ -30,4 +33,13 @@ public interface ITableCache {
void rollbackCreateTable(String database, String tableName);
void commitCreateTable(String database, String tableName);
+
+ void preAddTableColumn(
+ String database, String tableName, List<TsTableColumnSchema>
columnSchemaList);
+
+ void commitAddTableColumn(
+ String database, String tableName, List<TsTableColumnSchema>
columnSchemaList);
+
+ void rollbackAddColumn(
+ String database, String tableName, List<TsTableColumnSchema>
columnSchemaList);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/ITableCache.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/AlterTableOperationType.java
similarity index 64%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/ITableCache.java
copy to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/AlterTableOperationType.java
index 16d26300473..fe9d8b7e4fa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/ITableCache.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/AlterTableOperationType.java
@@ -17,17 +17,27 @@
* under the License.
*/
-package org.apache.iotdb.db.schemaengine.table;
+package org.apache.iotdb.commons.schema.table;
-import org.apache.iotdb.commons.schema.table.TsTable;
+public enum AlterTableOperationType {
+ ADD_COLUMN((byte) 0);
-public interface ITableCache {
+ private final byte type;
- void init(byte[] tableInitializationBytes);
+ AlterTableOperationType(byte type) {
+ this.type = type;
+ }
- void preCreateTable(String database, TsTable table);
+ public byte getTypeValue() {
+ return type;
+ }
- void rollbackCreateTable(String database, String tableName);
-
- void commitCreateTable(String database, String tableName);
+ public static AlterTableOperationType getType(byte value) {
+ switch (value) {
+ case 0:
+ return ADD_COLUMN;
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
index 054928cca63..dd0d4662cc7 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.commons.schema.table;
import org.apache.iotdb.commons.schema.table.column.TimeColumnSchema;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchemaUtil;
@@ -52,6 +53,8 @@ public class TsTable {
private Map<String, String> props = null;
+ private transient int idNums = 0;
+
public TsTable(String tableName) {
this.tableName = tableName;
columnSchemaMap.put(TIME_COLUMN_NAME, TIME_COLUMN_SCHEMA);
@@ -67,12 +70,23 @@ public class TsTable {
public void addColumnSchema(TsTableColumnSchema columnSchema) {
columnSchemaMap.put(columnSchema.getColumnName(), columnSchema);
+ if (columnSchema.getColumnCategory().equals(TsTableColumnCategory.ID)) {
+ idNums++;
+ }
+ }
+
+ public void removeColumnSchema(String columnName) {
+ columnSchemaMap.remove(columnName);
}
public int getColumnNum() {
return columnSchemaMap.size();
}
+ public int getIdNums() {
+ return idNums;
+ }
+
public List<TsTableColumnSchema> getColumnList() {
return new ArrayList<>(columnSchemaMap.values());
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTableInternalRPCType.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTableInternalRPCType.java
index 06e08bb41be..2ff0edcc633 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTableInternalRPCType.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTableInternalRPCType.java
@@ -28,7 +28,11 @@ import java.nio.ByteBuffer;
public enum TsTableInternalRPCType {
PRE_CREATE((byte) 0),
ROLLBACK_CREATE((byte) 1),
- COMMIT_CREATE((byte) 2);
+ COMMIT_CREATE((byte) 2),
+
+ PRE_ADD_COLUMN((byte) 6),
+ COMMIT_ADD_COLUMN((byte) 7),
+ ROLLBACK_ADD_COLUMN((byte) 8);
private final byte operationType;
@@ -57,6 +61,12 @@ public enum TsTableInternalRPCType {
return ROLLBACK_CREATE;
case 2:
return COMMIT_CREATE;
+ case 6:
+ return PRE_ADD_COLUMN;
+ case 7:
+ return COMMIT_ADD_COLUMN;
+ case 8:
+ return ROLLBACK_ADD_COLUMN;
default:
throw new IllegalArgumentException("Unknown table update operation
type" + type);
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnSchemaUtil.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnSchemaUtil.java
index 4ebb8a25231..eac67b2b3b2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnSchemaUtil.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnSchemaUtil.java
@@ -21,10 +21,13 @@ package org.apache.iotdb.commons.schema.table.column;
import org.apache.tsfile.utils.ReadWriteIOUtils;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
public class TsTableColumnSchemaUtil {
@@ -77,4 +80,38 @@ public class TsTableColumnSchemaUtil {
throw new IllegalArgumentException();
}
}
+
+ public static byte[] serialize(List<TsTableColumnSchema> columnSchemaList) {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ try {
+ serialize(columnSchemaList, stream);
+ } catch (IOException ignored) {
+ // won't happen
+ }
+ return stream.toByteArray();
+ }
+
+ public static void serialize(List<TsTableColumnSchema> columnSchemaList,
OutputStream stream)
+ throws IOException {
+ if (columnSchemaList == null) {
+ ReadWriteIOUtils.write(-1, stream);
+ return;
+ }
+ ReadWriteIOUtils.write(columnSchemaList.size(), stream);
+ for (TsTableColumnSchema columnSchema : columnSchemaList) {
+ serialize(columnSchema, stream);
+ }
+ }
+
+ public static List<TsTableColumnSchema>
deserializeColumnSchemaList(ByteBuffer buffer) {
+ int size = ReadWriteIOUtils.readInt(buffer);
+ if (size == -1) {
+ return null;
+ }
+ List<TsTableColumnSchema> columnSchemaList = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ columnSchemaList.add(deserialize(buffer));
+ }
+ return columnSchemaList;
+ }
}
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index b25b9486710..a05a095d18d 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -926,6 +926,15 @@ enum TTestOperation {
TEST_SUB_PROCEDURE,
}
+// Table
+struct TAlterTableReq{
+ 1: required string database
+ 2: required string tableName
+ 3: required string queryId
+ 4: required byte operationType
+ 5: required binary updateInfo
+}
+
service IConfigNodeRPCService {
// ======================================================
@@ -1584,5 +1593,7 @@ service IConfigNodeRPCService {
// ======================================================
common.TSStatus createTable(binary tableInfo)
+
+ common.TSStatus alterTable(TAlterTableReq req)
}