This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch force_ci/support_schema_evolution in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1419259c741029f07a74cb5ca1f78fa839b8f586 Author: Tian Jiang <[email protected]> AuthorDate: Thu Jan 15 18:13:41 2026 +0800 suppor schema evolution in schema region --- .../iotdb/relational/it/schema/IoTDBTableIT.java | 151 ++++++++++++++++++--- .../client/async/CnToDnAsyncRequestType.java | 2 + .../CnToDnInternalServiceAsyncRequestManager.java | 12 ++ .../rpc/DataNodeAsyncRequestRPCHandler.java | 2 + .../iotdb/confignode/manager/ConfigManager.java | 49 ++++++- .../apache/iotdb/confignode/manager/IManager.java | 6 + .../manager/partition/PartitionManager.java | 28 ++-- .../manager/schema/ClusterSchemaManager.java | 11 ++ .../schema/table/RenameTableColumnProcedure.java | 44 +++++- .../impl/schema/table/RenameTableProcedure.java | 70 +++++++++- .../state/schema/RenameTableColumnState.java | 1 + .../procedure/state/schema/RenameTableState.java | 1 + .../hash/DeviceGroupHashExecutorManualTest.java | 3 +- .../schemaregion/SchemaExecutionVisitor.java | 12 ++ .../impl/DataNodeInternalRPCServiceImpl.java | 20 +++ .../plan/analyze/ClusterPartitionFetcher.java | 14 +- .../analyze/cache/partition/PartitionCache.java | 8 +- .../execution/config/TableConfigTaskVisitor.java | 50 ++++++- .../plan/planner/distribution/SourceRewriter.java | 7 +- .../planner/plan/node/write/EvolveSchemaNode.java | 21 ++- .../planner/plan/node/write/InsertRowNode.java | 3 +- .../planner/plan/node/write/InsertRowsNode.java | 3 +- .../plan/node/write/InsertRowsOfOneDeviceNode.java | 3 +- .../planner/plan/node/write/InsertTabletNode.java | 5 +- .../plan/node/write/RelationalInsertRowsNode.java | 7 +- .../node/write/RelationalInsertTabletNode.java | 6 +- .../distribute/TableDistributedPlanGenerator.java | 26 +++- .../node/schema/CreateOrUpdateTableDeviceNode.java | 8 +- .../plan/relational/sql/ast/BooleanLiteral.java | 4 + .../plan/relational/sql/ast/RenameColumn.java | 5 - .../plan/relational/sql/ast/RenameTable.java | 5 - .../plan/scheduler/load/LoadTsFileScheduler.java | 17 ++- .../schemaengine/schemaregion/ISchemaRegion.java | 3 + .../schemaregion/SchemaRegionPlanType.java | 1 + .../schemaregion/SchemaRegionPlanVisitor.java | 5 + .../schemaregion/impl/SchemaRegionMemoryImpl.java | 38 +++++- .../schemaregion/impl/SchemaRegionPBTreeImpl.java | 6 + .../visitor/SchemaRegionPlanDeserializer.java | 7 + .../visitor/SchemaRegionPlanSerializer.java | 7 + .../mtree/impl/mem/MTreeBelowSGMemoryImpl.java | 10 ++ .../write/req/SchemaRegionWritePlanFactory.java | 3 + .../db/storageengine/dataregion/DataRegion.java | 10 +- .../dataregion/tsfile/TsFileManager.java | 3 +- .../dataregion/tsfile/TsFileResource.java | 4 + .../tsfile/evolution/SchemaEvolution.java | 8 ++ .../org/apache/iotdb/db/utils/CommonUtils.java | 19 +++ .../iotdb/commons/partition/DataPartition.java | 59 +++++--- .../apache/iotdb/commons/partition/Partition.java | 7 +- .../iotdb/commons/partition/SchemaPartition.java | 11 +- .../executor/SeriesPartitionExecutor.java | 53 +++++++- .../partition/executor/hash/APHashExecutor.java | 8 +- .../partition/executor/hash/BKDRHashExecutor.java | 8 +- .../partition/executor/hash/JSHashExecutor.java | 8 +- .../partition/executor/hash/SDBMHashExecutor.java | 8 +- .../apache/iotdb/commons/schema/table/TsTable.java | 12 +- .../src/main/thrift/datanode.thrift | 8 ++ 56 files changed, 768 insertions(+), 142 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java index 7915d64a680..46e23c4b0b0 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java @@ -156,22 +156,6 @@ public class IoTDBTableIT { assertEquals(tableNames.length, cnt); } - // Test unsupported, to be deleted - try { - statement.execute("alter table test1.table1 rename to tableN"); - } catch (final SQLException e) { - assertEquals("701: The renaming for base table is currently unsupported", e.getMessage()); - } - - // Test unsupported, to be deleted - try { - statement.execute( - "alter table if exists test_db.table1 rename column if exists model to modelType"); - } catch (final SQLException e) { - assertEquals( - "701: The renaming for base table column is currently unsupported", e.getMessage()); - } - // Alter table properties statement.execute("alter table test1.table1 set properties ttl=1000000"); ttls = new String[] {"1000000"}; @@ -1254,4 +1238,139 @@ public class IoTDBTableIT { } } } + + @Test + public void testAlterTableName() throws Exception { + try (final Connection connection = + EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + final Statement statement = connection.createStatement()) { + statement.execute("DROP DATABASE IF EXISTS testdb"); + statement.execute("CREATE DATABASE IF NOT EXISTS testdb"); + statement.execute("USE testdb"); + + try { + statement.execute( + "CREATE TABLE IF NOT EXISTS alter_table_name_disabled () WITH (allow_alter_name=1)"); + fail("allow_alter_name must be boolean"); + } catch (SQLException e) { + assertEquals( + "701: allow_alter_name value must be a BooleanLiteral, but now is LongLiteral, value: 1", + e.getMessage()); + } + + statement.execute( + "CREATE TABLE IF NOT EXISTS alter_table_name_disabled () WITH (allow_alter_name=false)"); + + try { + statement.execute( + "ALTER TABLE alter_table_name_disabled SET PROPERTIES allow_alter_name=true"); + fail("allow_alter_name cannot be altered"); + } catch (SQLException e) { + assertEquals("701: The property allow_alter_name cannot be altered.", e.getMessage()); + } + + try { + statement.execute("ALTER TABLE alter_table_name_disabled RENAME TO alter_table_named"); + fail("the table cannot be renamed"); + } catch (SQLException e) { + assertEquals( + "701: Table 'testdb.alter_table_name_disabled' is created in a old version and cannot be renamed, please migrate its data to a new table manually", + e.getMessage()); + } + + // alter once + statement.execute("CREATE TABLE IF NOT EXISTS alter_table_name (s1 int32)"); + statement.execute("INSERT INTO alter_table_name (time, s1) VALUES (1, 1)"); + statement.execute("ALTER TABLE alter_table_name RENAME TO alter_table_named"); + try { + statement.execute("INSERT INTO alter_table_name (time, s1) VALUES (0, 0)"); + fail(); + } catch (SQLException e) { + assertEquals("550: Table 'testdb.alter_table_name' does not exist.", e.getMessage()); + } + statement.execute("INSERT INTO alter_table_named (time, s1) VALUES (2, 2)"); + + ResultSet resultSet = statement.executeQuery("SELECT * FROM alter_table_named"); + assertTrue(resultSet.next()); + assertEquals(1, resultSet.getLong(1)); + assertEquals(1, resultSet.getLong(2)); + assertTrue(resultSet.next()); + assertEquals(2, resultSet.getLong(1)); + assertEquals(2, resultSet.getLong(2)); + assertFalse(resultSet.next()); + + // alter twice + statement.execute("ALTER TABLE alter_table_named RENAME TO alter_table_named2"); + try { + statement.execute("INSERT INTO alter_table_named (time, s1) VALUES (0, 0)"); + fail(); + } catch (SQLException e) { + assertEquals("550: Table 'testdb.alter_table_named' does not exist.", e.getMessage()); + } + statement.execute("INSERT INTO alter_table_named2 (time, s1) VALUES (3, 3)"); + + resultSet = statement.executeQuery("SELECT * FROM alter_table_named2"); + for (int i = 1; i <= 3; i++) { + assertTrue(resultSet.next()); + assertEquals(i, resultSet.getLong(1)); + assertEquals(i, resultSet.getLong(2)); + } + assertFalse(resultSet.next()); + + // alter back + statement.execute("ALTER TABLE alter_table_named2 RENAME TO alter_table_name"); + try { + statement.execute("INSERT INTO alter_table_named2 (time, s1) VALUES (0, 0)"); + fail(); + } catch (SQLException e) { + assertEquals("550: Table 'testdb.alter_table_named2' does not exist.", e.getMessage()); + } + statement.execute("INSERT INTO alter_table_name (time, s1) VALUES (4, 4)"); + + resultSet = statement.executeQuery("SELECT * FROM alter_table_name"); + for (int i = 1; i <= 4; i++) { + assertTrue(resultSet.next()); + assertEquals(i, resultSet.getLong(1)); + assertEquals(i, resultSet.getLong(2)); + } + assertFalse(resultSet.next()); + } + } + + @Test + public void testAlterColumnName() throws Exception { + try (final Connection connection = + EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + final Statement statement = connection.createStatement()) { + statement.execute("DROP DATABASE IF EXISTS testdb"); + statement.execute("CREATE DATABASE IF NOT EXISTS testdb"); + statement.execute("USE testdb"); + + + statement.execute("CREATE TABLE IF NOT EXISTS alter_column_name (s1 int32)"); + statement.execute("INSERT INTO alter_column_name (time, s1) VALUES (1, 1)"); + // alter once + statement.execute("ALTER TABLE alter_column_name RENAME COLUMN s1 TO s2"); + try { + statement.execute("INSERT INTO alter_column_name (time, s1) VALUES (0, 0)"); + fail(); + } catch (SQLException e) { + assertEquals( + "616: Unknown column category for s1. Cannot auto create column.", e.getMessage()); + } + statement.execute("INSERT INTO alter_column_name (time, s2) VALUES (2, 2)"); + + ResultSet resultSet = statement.executeQuery("SELECT * FROM alter_column_name"); + ResultSetMetaData metaData = resultSet.getMetaData(); + assertEquals(2, metaData.getColumnCount()); + assertEquals("s2", metaData.getColumnName(2)); + + for (int i = 1; i <= 2; i++) { + assertTrue(resultSet.next()); + assertEquals(i, resultSet.getLong(1)); + assertEquals(i, resultSet.getInt(2)); + } + assertFalse(resultSet.next()); + } + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java index c2f8b1e9d13..5daf9977282 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java @@ -126,6 +126,8 @@ public enum CnToDnAsyncRequestType { DELETE_DATA_FOR_TABLE_DEVICE, DELETE_TABLE_DEVICE_IN_BLACK_LIST, DETECT_TREE_DEVICE_VIEW_FIELD_TYPE, + EVOLVE_DATA_REGION_SCHEMA, + EVOLVE_SCHEMA_REGION_SCHEMA, // audit log and event write-back INSERT_RECORD, diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java index 9227325596d..e8defcbd6d1 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java @@ -63,6 +63,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionInstanceReq; import org.apache.iotdb.mpp.rpc.thrift.TCreatePipePluginInstanceReq; import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq; import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq; +import org.apache.iotdb.mpp.rpc.thrift.TDataRegionEvolveSchemaReq; import org.apache.iotdb.mpp.rpc.thrift.TDeactivateTemplateReq; import org.apache.iotdb.mpp.rpc.thrift.TDeleteColumnDataReq; import org.apache.iotdb.mpp.rpc.thrift.TDeleteDataForDeleteSchemaReq; @@ -96,6 +97,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TResetPeerListReq; import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq; import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListWithTemplateReq; import org.apache.iotdb.mpp.rpc.thrift.TRollbackViewSchemaBlackListReq; +import org.apache.iotdb.mpp.rpc.thrift.TSchemaRegionEvolveSchemaReq; import org.apache.iotdb.mpp.rpc.thrift.TTableDeviceDeletionWithPatternAndFilterReq; import org.apache.iotdb.mpp.rpc.thrift.TTableDeviceDeletionWithPatternOrModReq; import org.apache.iotdb.mpp.rpc.thrift.TTableDeviceInvalidateCacheReq; @@ -439,6 +441,16 @@ public class CnToDnInternalServiceAsyncRequestManager (req, client, handler) -> client.deleteColumnData( (TDeleteColumnDataReq) req, (DataNodeTSStatusRPCHandler) handler)); + actionMapBuilder.put( + CnToDnAsyncRequestType.EVOLVE_DATA_REGION_SCHEMA, + (req, client, handler) -> + client.evolveSchemaInDataRegion( + (TDataRegionEvolveSchemaReq) req, (DataNodeTSStatusRPCHandler) handler)); + actionMapBuilder.put( + CnToDnAsyncRequestType.EVOLVE_SCHEMA_REGION_SCHEMA, + (req, client, handler) -> + client.evolveSchemaInSchemaRegion( + (TSchemaRegionEvolveSchemaReq) req, (DataNodeTSStatusRPCHandler) handler)); actionMapBuilder.put( CnToDnAsyncRequestType.CONSTRUCT_TABLE_DEVICE_BLACK_LIST, (req, client, handler) -> diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java index 4e3fdb09f7f..ad6c94e79ff 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java @@ -228,6 +228,8 @@ public abstract class DataNodeAsyncRequestRPCHandler<Response> case DELETE_DEVICES_FOR_DROP_TABLE: case INVALIDATE_COLUMN_CACHE: case DELETE_COLUMN_DATA: + case EVOLVE_DATA_REGION_SCHEMA: + case EVOLVE_SCHEMA_REGION_SCHEMA: case CONSTRUCT_TABLE_DEVICE_BLACK_LIST: case ROLLBACK_TABLE_DEVICE_BLACK_LIST: case INVALIDATE_MATCHED_TABLE_DEVICE_CACHE: diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 3c3d2304195..d85caa47dbc 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -47,7 +47,11 @@ import org.apache.iotdb.commons.conf.ConfigurationFileUtils; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.conf.TrimProperties; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor.FullDeviceIdKey; +import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor.NoTableNameDeviceIdKey; +import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor.SeriesPartitionKey; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathDeserializeUtil; @@ -107,6 +111,7 @@ import org.apache.iotdb.confignode.consensus.response.partition.SchemaPartitionR import org.apache.iotdb.confignode.consensus.response.template.TemplateSetInfoResp; import org.apache.iotdb.confignode.consensus.response.ttl.ShowTTLResp; import org.apache.iotdb.confignode.consensus.statemachine.ConfigRegionStateMachine; +import org.apache.iotdb.confignode.exception.DatabaseNotExistsException; import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.manager.cq.CQManager; import org.apache.iotdb.confignode.manager.load.LoadManager; @@ -255,6 +260,7 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.IDeviceID.Factory; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; @@ -274,6 +280,8 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -815,9 +823,43 @@ public class ConfigManager implements IManager { return Collections.emptyList(); } } + IDeviceID deviceID = Factory.DEFAULT_FACTORY.create(devicePath); + + SeriesPartitionKey seriesPartitionKey = getSeriesPartitionKey(deviceID, database.getFullPath()); return Collections.singletonList( - getPartitionManager() - .getSeriesPartitionSlot(IDeviceID.Factory.DEFAULT_FACTORY.create(devicePath))); + getPartitionManager().getSeriesPartitionSlot(seriesPartitionKey)); + } + + @SuppressWarnings("OptionalGetWithoutIsPresent") + @Override + public SeriesPartitionKey getSeriesPartitionKey(IDeviceID deviceID, String databaseName) { + SeriesPartitionKey seriesPartitionKey; + boolean isTableModel = false; + try { + TDatabaseSchema databaseSchema = + getClusterSchemaManager().getDatabaseSchemaByName(databaseName); + isTableModel = databaseSchema.isTableModel; + } catch (DatabaseNotExistsException e) { + throw new IoTDBRuntimeException(e, TSStatusCode.TABLE_NOT_EXISTS.getStatusCode()); + } + + if (isTableModel) { + try { + Optional<TsTable> tableOptional = + getClusterSchemaManager().getTableIfExists(databaseName, deviceID.getTableName()); + TsTable tsTable = tableOptional.get(); + boolean canAlterTableName = tsTable.canAlterName(); + seriesPartitionKey = + canAlterTableName + ? new NoTableNameDeviceIdKey(deviceID) + : new FullDeviceIdKey(deviceID); + } catch (NoSuchElementException | MetadataException e) { + throw new IoTDBRuntimeException(e, TSStatusCode.TABLE_NOT_EXISTS.getStatusCode()); + } + } else { + seriesPartitionKey = new FullDeviceIdKey(deviceID); + } + return seriesPartitionKey; } @Override @@ -911,9 +953,10 @@ public class ConfigManager implements IManager { for (final IDeviceID deviceID : devicePaths) { for (final String database : databases) { if (PathUtils.isStartWith(deviceID, database)) { + SeriesPartitionKey seriesPartitionKey = getSeriesPartitionKey(deviceID, database); partitionSlotsMap .computeIfAbsent(database, key -> new HashSet<>()) - .add(getPartitionManager().getSeriesPartitionSlot(deviceID)); + .add(getPartitionManager().getSeriesPartitionSlot(seriesPartitionKey)); break; } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java index caa189be815..61cd26fc8fe 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java @@ -32,6 +32,7 @@ import org.apache.iotdb.common.rpc.thrift.TShowAppliedConfigurationsResp; import org.apache.iotdb.common.rpc.thrift.TShowConfigurationResp; import org.apache.iotdb.commons.auth.entity.PrivilegeUnion; import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor.SeriesPartitionKey; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathPatternTree; import org.apache.iotdb.confignode.audit.CNAuditLogger; @@ -162,6 +163,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq; import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.tsfile.file.metadata.IDeviceID; + import java.nio.ByteBuffer; import java.util.List; import java.util.Map; @@ -419,6 +422,9 @@ public interface IManager { */ TSStatus deleteDatabases(TDeleteDatabasesReq tDeleteReq); + @SuppressWarnings("OptionalGetWithoutIsPresent") + SeriesPartitionKey getSeriesPartitionKey(IDeviceID deviceID, String databaseName); + /** * Get SchemaPartition. * diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index 576d805c786..f76fb542132 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -35,6 +35,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.partition.DataPartitionTable; import org.apache.iotdb.commons.partition.SchemaPartitionTable; import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; +import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor.SeriesPartitionKey; import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType; import org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager; import org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext; @@ -1055,11 +1056,11 @@ public class PartitionManager { /** * Get TSeriesPartitionSlot. * - * @param deviceID IDeviceID + * @param key IDeviceID * @return SeriesPartitionSlot */ - public TSeriesPartitionSlot getSeriesPartitionSlot(final IDeviceID deviceID) { - return executor.getSeriesPartitionSlot(deviceID); + public TSeriesPartitionSlot getSeriesPartitionSlot(final SeriesPartitionKey key) { + return executor.getSeriesPartitionSlot(key); } public RegionInfoListResp getRegionInfoList(final GetRegionInfoListPlan req) { @@ -1152,8 +1153,11 @@ public class PartitionManager { } else { final IDeviceID deviceID = Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(ByteBuffer.wrap(req.getDevice())); - plan.setDatabase(getClusterSchemaManager().getDatabaseNameByDevice(deviceID)); - plan.setSeriesSlotId(executor.getSeriesPartitionSlot(deviceID)); + String databaseName = getClusterSchemaManager().getDatabaseNameByDevice(deviceID); + plan.setDatabase(databaseName); + SeriesPartitionKey seriesPartitionKey = + configManager.getSeriesPartitionKey(deviceID, databaseName); + plan.setSeriesSlotId(executor.getSeriesPartitionSlot(seriesPartitionKey)); } if (Objects.equals(plan.getDatabase(), "")) { // Return empty result if Database not specified @@ -1189,8 +1193,11 @@ public class PartitionManager { } else if (req.isSetDevice()) { IDeviceID deviceID = Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(ByteBuffer.wrap(req.getDevice())); - plan.setDatabase(getClusterSchemaManager().getDatabaseNameByDevice(deviceID)); - plan.setSeriesSlotId(executor.getSeriesPartitionSlot(deviceID)); + String databaseName = getClusterSchemaManager().getDatabaseNameByDevice(deviceID); + plan.setDatabase(databaseName); + SeriesPartitionKey seriesPartitionKey = + configManager.getSeriesPartitionKey(deviceID, databaseName); + plan.setSeriesSlotId(executor.getSeriesPartitionSlot(seriesPartitionKey)); if (Objects.equals(plan.getDatabase(), "")) { // Return empty result if Database not specified return new GetTimeSlotListResp(RpcUtils.SUCCESS_STATUS, new ArrayList<>()); @@ -1218,8 +1225,11 @@ public class PartitionManager { } else if (req.isSetDevice()) { IDeviceID deviceID = Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(ByteBuffer.wrap(req.getDevice())); - plan.setDatabase(getClusterSchemaManager().getDatabaseNameByDevice(deviceID)); - plan.setSeriesSlotId(executor.getSeriesPartitionSlot(deviceID)); + String databaseName = getClusterSchemaManager().getDatabaseNameByDevice(deviceID); + plan.setDatabase(databaseName); + SeriesPartitionKey seriesPartitionKey = + configManager.getSeriesPartitionKey(deviceID, databaseName); + plan.setSeriesSlotId(executor.getSeriesPartitionSlot(seriesPartitionKey)); if (Objects.equals(plan.getDatabase(), "")) { // Return empty result if Database not specified return new CountTimeSlotListResp(RpcUtils.SUCCESS_STATUS, 0); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java index 490370e38c3..93910947a35 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java @@ -1486,6 +1486,17 @@ public class ClusterSchemaManager { null); } + if (!originalTable.canAlterName()) { + return new Pair<>( + RpcUtils.getStatus( + TSStatusCode.SEMANTIC_ERROR, + String.format( + "Table '%s.%s' is created in a old version and cannot be renamed, " + + "please migrate its data to a new table manually", + database, tableName)), + null); + } + final Optional<Pair<TSStatus, TsTable>> result = checkTable4View(database, originalTable, isTableView); if (result.isPresent()) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/RenameTableColumnProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/RenameTableColumnProcedure.java index da51dad2672..9bcb651dd42 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/RenameTableColumnProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/RenameTableColumnProcedure.java @@ -19,10 +19,13 @@ package org.apache.iotdb.confignode.procedure.impl.schema.table; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.schema.table.TsTable; +import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType; import org.apache.iotdb.confignode.consensus.request.write.table.RenameTableColumnPlan; import org.apache.iotdb.confignode.consensus.request.write.table.view.RenameViewColumnPlan; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; @@ -30,9 +33,13 @@ import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.impl.schema.table.view.RenameViewColumnProcedure; import org.apache.iotdb.confignode.procedure.state.schema.RenameTableColumnState; import org.apache.iotdb.confignode.procedure.store.ProcedureType; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.ColumnRename; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution; +import org.apache.iotdb.mpp.rpc.thrift.TDataRegionEvolveSchemaReq; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +47,10 @@ import org.slf4j.LoggerFactory; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.Objects; public class RenameTableColumnProcedure @@ -84,6 +95,10 @@ public class RenameTableColumnProcedure LOGGER.info("Rename column to table {}.{} on config node", database, tableName); renameColumn(env); break; + case EXECUTE_ON_REGION: + LOGGER.info("Rename column to table {}.{} on data regions", database, tableName); + executeOnRegions(env); + break; case COMMIT_RELEASE: LOGGER.info( "Commit release info of table {}.{} when renaming column", database, tableName); @@ -141,10 +156,37 @@ public class RenameTableColumnProcedure if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { setFailure(new ProcedureException(new IoTDBException(status))); } else { - setNextState(RenameTableColumnState.COMMIT_RELEASE); + setNextState(RenameTableColumnState.EXECUTE_ON_REGION); } } + private void executeOnRegions(final ConfigNodeProcedureEnv env) { + final Map<TConsensusGroupId, TRegionReplicaSet> relatedRegionGroup = + env.getConfigManager().getRelatedDataRegionGroup4TableModel(database); + + if (!relatedRegionGroup.isEmpty()) { + List<SchemaEvolution> schemaEvolutions = + Collections.singletonList(new ColumnRename(tableName, oldName, newName)); + PublicBAOS publicBAOS = new PublicBAOS(); + try { + SchemaEvolution.serializeList(schemaEvolutions, publicBAOS); + } catch (IOException ignored) { + } + ByteBuffer byteBuffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size()); + new TableRegionTaskExecutor<>( + "evolve data region schema", + env, + relatedRegionGroup, + CnToDnAsyncRequestType.EVOLVE_DATA_REGION_SCHEMA, + ((dataNodeLocation, consensusGroupIdList) -> + new TDataRegionEvolveSchemaReq( + new ArrayList<>(consensusGroupIdList), byteBuffer))) + .execute(); + } + + setNextState(RenameTableColumnState.COMMIT_RELEASE); + } + @Override protected void rollbackState(final ConfigNodeProcedureEnv env, final RenameTableColumnState state) throws IOException, InterruptedException, ProcedureException { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/RenameTableProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/RenameTableProcedure.java index 93d1035a761..bf91e168fe3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/RenameTableProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/RenameTableProcedure.java @@ -19,10 +19,13 @@ package org.apache.iotdb.confignode.procedure.impl.schema.table; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.schema.table.TsTable; +import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType; import org.apache.iotdb.confignode.consensus.request.write.table.RenameTablePlan; import org.apache.iotdb.confignode.consensus.request.write.table.view.RenameViewPlan; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; @@ -30,9 +33,14 @@ import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.impl.schema.table.view.RenameViewProcedure; import org.apache.iotdb.confignode.procedure.state.schema.RenameTableState; import org.apache.iotdb.confignode.procedure.store.ProcedureType; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.TableRename; +import org.apache.iotdb.mpp.rpc.thrift.TDataRegionEvolveSchemaReq; +import org.apache.iotdb.mpp.rpc.thrift.TSchemaRegionEvolveSchemaReq; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +48,10 @@ import org.slf4j.LoggerFactory; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; public class RenameTableProcedure extends AbstractAlterOrDropTableProcedure<RenameTableState> { private static final Logger LOGGER = LoggerFactory.getLogger(RenameTableProcedure.class); @@ -74,9 +86,13 @@ public class RenameTableProcedure extends AbstractAlterOrDropTableProcedure<Rena preRelease(env); break; case RENAME_TABLE: - LOGGER.info("Rename column to table {}.{} on config node", database, tableName); + LOGGER.info("Rename table {}.{} on config node", database, tableName); renameTable(env); break; + case EXECUTE_ON_REGIONS: + LOGGER.info("Rename table {}.{} on regions", database, tableName); + executeOnRegions(env); + break; case COMMIT_RELEASE: LOGGER.info( "Commit release info of table {}.{} when renaming table", database, tableName); @@ -134,8 +150,58 @@ public class RenameTableProcedure extends AbstractAlterOrDropTableProcedure<Rena if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { setFailure(new ProcedureException(new IoTDBException(status))); } else { - setNextState(RenameTableState.COMMIT_RELEASE); + setNextState(RenameTableState.EXECUTE_ON_REGIONS); + } + } + + private void executeOnRegions(final ConfigNodeProcedureEnv env) { + final Map<TConsensusGroupId, TRegionReplicaSet> relatedDataRegionGroup = + env.getConfigManager().getRelatedDataRegionGroup4TableModel(database); + + if (!relatedDataRegionGroup.isEmpty()) { + List<SchemaEvolution> schemaEvolutions = + Collections.singletonList(new TableRename(tableName, newName)); + PublicBAOS publicBAOS = new PublicBAOS(); + try { + SchemaEvolution.serializeList(schemaEvolutions, publicBAOS); + } catch (IOException ignored) { + } + ByteBuffer byteBuffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size()); + new TableRegionTaskExecutor<>( + "evolve data region schema", + env, + relatedDataRegionGroup, + CnToDnAsyncRequestType.EVOLVE_DATA_REGION_SCHEMA, + ((dataNodeLocation, consensusGroupIdList) -> + new TDataRegionEvolveSchemaReq( + new ArrayList<>(consensusGroupIdList), byteBuffer))) + .execute(); } + + final Map<TConsensusGroupId, TRegionReplicaSet> relatedSchemaRegionGroup = + env.getConfigManager().getRelatedSchemaRegionGroup4TableModel(database); + + if (!relatedSchemaRegionGroup.isEmpty()) { + List<SchemaEvolution> schemaEvolutions = + Collections.singletonList(new TableRename(tableName, newName)); + PublicBAOS publicBAOS = new PublicBAOS(); + try { + SchemaEvolution.serializeList(schemaEvolutions, publicBAOS); + } catch (IOException ignored) { + } + ByteBuffer byteBuffer = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size()); + new TableRegionTaskExecutor<>( + "evolve schema region schema", + env, + relatedSchemaRegionGroup, + CnToDnAsyncRequestType.EVOLVE_SCHEMA_REGION_SCHEMA, + ((dataNodeLocation, consensusGroupIdList) -> + new TSchemaRegionEvolveSchemaReq( + new ArrayList<>(consensusGroupIdList), byteBuffer))) + .execute(); + } + + setNextState(RenameTableState.COMMIT_RELEASE); } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/RenameTableColumnState.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/RenameTableColumnState.java index 398ef642224..83428eadd6d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/RenameTableColumnState.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/RenameTableColumnState.java @@ -23,5 +23,6 @@ public enum RenameTableColumnState { COLUMN_CHECK, PRE_RELEASE, RENAME_COLUMN, + EXECUTE_ON_REGION, COMMIT_RELEASE } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/RenameTableState.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/RenameTableState.java index 1c71cb50182..6f9e2c29558 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/RenameTableState.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/RenameTableState.java @@ -23,5 +23,6 @@ public enum RenameTableState { COLUMN_CHECK, PRE_RELEASE, RENAME_TABLE, + EXECUTE_ON_REGIONS, COMMIT_RELEASE } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java index 628cc02524c..3cd19850cb3 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.confignode.manager.hash; +import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor.FullDeviceIdKey; import org.apache.iotdb.confignode.manager.ConfigManager; import org.apache.iotdb.confignode.manager.partition.PartitionManager; import org.apache.iotdb.confignode.persistence.partition.PartitionInfo; @@ -74,7 +75,7 @@ public class DeviceGroupHashExecutorManualTest { List<IDeviceID> devices = genBatchDevices(); totalTime -= System.currentTimeMillis(); for (IDeviceID device : devices) { - bucket[manager.getSeriesPartitionSlot(device).getSlotId()] += 1; + bucket[manager.getSeriesPartitionSlot(new FullDeviceIdKey(device)).getSlotId()] += 1; } totalTime += System.currentTimeMillis(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java index 261b4908a90..63747b6c922 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java @@ -61,6 +61,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.vie import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedNonWritePlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWritePlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeOperateSchemaQueueNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.EvolveSchemaNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.ConstructTableDevicesBlackListNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.CreateOrUpdateTableDeviceNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.DeleteTableDeviceNode; @@ -854,6 +855,17 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion> return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } + @Override + public TSStatus visitEvolveSchemaNode(EvolveSchemaNode node, ISchemaRegion schemaRegion) { + try { + schemaRegion.applySchemaEvolution(node); + } catch (MetadataException e) { + logMetaDataException(e); + return RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); + } + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } + @Override public TSStatus visitPlan(final PlanNode node, final ISchemaRegion context) { return null; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index bb66664b4ed..e70679ef1e7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -298,6 +298,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListWithTemplateReq; import org.apache.iotdb.mpp.rpc.thrift.TRollbackViewSchemaBlackListReq; import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchRequest; import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchResponse; +import org.apache.iotdb.mpp.rpc.thrift.TSchemaRegionEvolveSchemaReq; import org.apache.iotdb.mpp.rpc.thrift.TSendBatchPlanNodeReq; import org.apache.iotdb.mpp.rpc.thrift.TSendBatchPlanNodeResp; import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq; @@ -816,6 +817,25 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface .getStatus()); } + @Override + public TSStatus evolveSchemaInSchemaRegion(final TSchemaRegionEvolveSchemaReq req) { + final List<SchemaEvolution> schemaEvolutions = + SchemaEvolution.createListFrom(req.schemaEvolutions); + return executeInternalSchemaTask( + req.getSchemaRegionIdList(), + consensusGroupId -> + new RegionWriteExecutor() + .execute( + new SchemaRegionId(consensusGroupId.getId()), + // Now the deletion plan may be re-collected here by pipe, resulting multiple + // transfer to delete time series plan. Now just ignore. + req.isSetIsGeneratedByPipe() && req.isIsGeneratedByPipe() + ? new PipeEnrichedEvolveSchemaNode( + new EvolveSchemaNode(new PlanNodeId(""), schemaEvolutions)) + : new EvolveSchemaNode(new PlanNodeId(""), schemaEvolutions)) + .getStatus()); + } + @Override public TSStatus deleteTimeSeries(final TDeleteTimeSeriesReq req) throws TException { final PathPatternTree patternTree = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java index 2274762341b..3333bb6fe2c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java @@ -47,6 +47,7 @@ import org.apache.iotdb.db.protocol.client.ConfigNodeClient; import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; import org.apache.iotdb.db.queryengine.plan.analyze.cache.partition.PartitionCache; +import org.apache.iotdb.db.utils.CommonUtils; import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq; import org.apache.iotdb.rpc.TSStatusCode; @@ -347,6 +348,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { final List<TSeriesPartitionSlot> partitionSlots = Objects.nonNull(deviceIDs) ? deviceIDs.stream() + .map(deviceID -> CommonUtils.getSeriesPartitionKey(deviceID, database)) .map(partitionExecutor::getSeriesPartitionSlot) .distinct() .collect(Collectors.toList()) @@ -458,6 +460,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { final Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = new HashMap<>(); for (final Map.Entry<String, List<DataPartitionQueryParam>> entry : sgNameToQueryParamsMap.entrySet()) { + String databaseName = entry.getKey(); // for each sg final Map<TSeriesPartitionSlot, TTimeSlotList> deviceToTimePartitionMap = new HashMap<>(); @@ -467,7 +470,8 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { for (final DataPartitionQueryParam queryParam : entry.getValue()) { seriesSlotTimePartitionMap .computeIfAbsent( - partitionExecutor.getSeriesPartitionSlot(queryParam.getDeviceID()), + partitionExecutor.getSeriesPartitionSlot( + CommonUtils.getSeriesPartitionKey(queryParam.getDeviceID(), databaseName)), k -> new ComplexTimeSlotList( queryParam.isNeedLeftAll(), queryParam.isNeedRightAll())) @@ -479,7 +483,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { k, new TTimeSlotList( new ArrayList<>(v.timeSlotList), v.needLeftAll, v.needRightAll))); - partitionSlotsMap.put(entry.getKey(), deviceToTimePartitionMap); + partitionSlotsMap.put(databaseName, deviceToTimePartitionMap); } return new TDataPartitionReq(partitionSlotsMap); } @@ -491,6 +495,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { TTimeSlotList sharedTTimeSlotList = null; for (final Map.Entry<String, List<DataPartitionQueryParam>> entry : sgNameToQueryParamsMap.entrySet()) { + String databaseName = entry.getKey(); // for each sg final Map<TSeriesPartitionSlot, TTimeSlotList> deviceToTimePartitionMap = new HashMap<>(); @@ -503,10 +508,11 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { queryParam.isNeedRightAll()); } deviceToTimePartitionMap.putIfAbsent( - partitionExecutor.getSeriesPartitionSlot(queryParam.getDeviceID()), + partitionExecutor.getSeriesPartitionSlot( + CommonUtils.getSeriesPartitionKey(queryParam.getDeviceID(), databaseName)), sharedTTimeSlotList); } - partitionSlotsMap.put(entry.getKey(), deviceToTimePartitionMap); + partitionSlotsMap.put(databaseName, deviceToTimePartitionMap); } return new TDataPartitionReq(partitionSlotsMap); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java index 3c76bd08c1d..36ba860ad2e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java @@ -60,6 +60,7 @@ import org.apache.iotdb.db.protocol.session.IClientSession; import org.apache.iotdb.db.protocol.session.SessionManager; import org.apache.iotdb.db.schemaengine.schemaregion.utils.MetaUtils; import org.apache.iotdb.db.service.metrics.CacheMetrics; +import org.apache.iotdb.db.utils.CommonUtils; import org.apache.iotdb.rpc.TSStatusCode; import com.github.benmanes.caffeine.cache.Cache; @@ -707,7 +708,8 @@ public class PartitionCache { List<TConsensusGroupId> consensusGroupIds = new ArrayList<>(entry.getValue().size()); for (final IDeviceID device : entry.getValue()) { final TSeriesPartitionSlot seriesPartitionSlot = - partitionExecutor.getSeriesPartitionSlot(device); + partitionExecutor.getSeriesPartitionSlot( + CommonUtils.getSeriesPartitionKey(device, databaseName)); if (!map.containsKey(seriesPartitionSlot)) { // if one device not find, then return cache miss. if (logger.isDebugEnabled()) { @@ -874,7 +876,9 @@ public class PartitionCache { for (DataPartitionQueryParam param : params) { TSeriesPartitionSlot seriesPartitionSlot; if (null != param.getDeviceID()) { - seriesPartitionSlot = partitionExecutor.getSeriesPartitionSlot(param.getDeviceID()); + seriesPartitionSlot = + partitionExecutor.getSeriesPartitionSlot( + CommonUtils.getSeriesPartitionKey(param.getDeviceID(), databaseName)); } else { return null; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java index 4b609aa87eb..3cdd5525f1f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java @@ -141,6 +141,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterColumnDataTy import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterDB; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterPipe; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ClearCache; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ColumnDefinition; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateDB; @@ -268,7 +269,7 @@ import static org.apache.iotdb.commons.conf.IoTDBConstant.MAX_DATABASE_NAME_LENG import static org.apache.iotdb.commons.conf.IoTDBConstant.TTL_INFINITE; import static org.apache.iotdb.commons.executable.ExecutableManager.getUnTrustedUriErrorMsg; import static org.apache.iotdb.commons.executable.ExecutableManager.isUriTrusted; -import static org.apache.iotdb.commons.schema.table.TsTable.TABLE_ALLOWED_PROPERTIES; +import static org.apache.iotdb.commons.schema.table.TsTable.ALLOW_ALTER_NAME_PROPERTY; import static org.apache.iotdb.commons.schema.table.TsTable.TIME_COLUMN_NAME; import static org.apache.iotdb.commons.schema.table.TsTable.TTL_PROPERTY; import static org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.CreateDBTask.DATA_REGION_GROUP_NUM_KEY; @@ -567,7 +568,10 @@ public class TableConfigTaskVisitor extends AstVisitor<IConfigTask, MPPQueryCont final TsTable table = new TsTable(tableName); - table.setProps(convertPropertiesToMap(node.getProperties(), false)); + Map<String, String> properties = convertPropertiesToMap(node.getProperties(), false); + // new tables' names can be altered by default + properties.putIfAbsent(ALLOW_ALTER_NAME_PROPERTY, "true"); + table.setProps(properties); if (Objects.nonNull(node.getComment())) { table.addProp(TsTable.COMMENT_KEY, node.getComment()); } @@ -779,10 +783,15 @@ public class TableConfigTaskVisitor extends AstVisitor<IConfigTask, MPPQueryCont accessControl.checkCanAlterTable( context.getSession().getUserName(), new QualifiedObjectName(database, tableName), context); + Map<String, String> properties = convertPropertiesToMap(node.getProperties(), true); + if (properties.containsKey(ALLOW_ALTER_NAME_PROPERTY)) { + throw new SemanticException( + "The property " + ALLOW_ALTER_NAME_PROPERTY + " cannot be altered."); + } return new AlterTableSetPropertiesTask( database, tableName, - convertPropertiesToMap(node.getProperties(), true), + properties, context.getQueryId().getId(), node.ifExists(), node.getType() == SetProperties.Type.TREE_VIEW); @@ -859,7 +868,7 @@ public class TableConfigTaskVisitor extends AstVisitor<IConfigTask, MPPQueryCont final Map<String, String> map = new HashMap<>(); for (final Property property : propertyList) { final String key = property.getName().getValue().toLowerCase(Locale.ENGLISH); - if (TABLE_ALLOWED_PROPERTIES.contains(key)) { + if (TTL_PROPERTY.equals(key)) { if (!property.isSetToDefault()) { final Expression value = property.getNonDefaultValue(); final Optional<String> strValue = parseStringFromLiteralIfBinary(value); @@ -876,6 +885,27 @@ public class TableConfigTaskVisitor extends AstVisitor<IConfigTask, MPPQueryCont } else if (serializeDefault) { map.put(key, null); } + } else if (ALLOW_ALTER_NAME_PROPERTY.equals(key)) { + if (property.isSetToDefault()) { + // no such property, the table is from an older version and its table name + // cannot be altered + map.put(key, "false"); + } else { + Expression value = property.getNonDefaultValue(); + final Optional<String> strValue = parseStringFromLiteralIfBinary(value); + if (strValue.isPresent()) { + try { + boolean ignored = Boolean.parseBoolean(strValue.get()); + } catch (Exception e) { + throw new SemanticException( + ALLOW_ALTER_NAME_PROPERTY + " value must be a boolean, but now is: " + value); + } + map.put(key, strValue.get()); + continue; + } + // TODO: support validation for other properties + map.put(key, String.valueOf(parseBooleanFromLiteral(value, ALLOW_ALTER_NAME_PROPERTY))); + } } else { throw new SemanticException("Table property '" + key + "' is currently not allowed."); } @@ -1073,6 +1103,18 @@ public class TableConfigTaskVisitor extends AstVisitor<IConfigTask, MPPQueryCont return parsedValue; } + private boolean parseBooleanFromLiteral(final Object value, final String name) { + if (!(value instanceof BooleanLiteral)) { + throw new SemanticException( + name + + " value must be a BooleanLiteral, but now is " + + (Objects.nonNull(value) ? value.getClass().getSimpleName() : null) + + ", value: " + + value); + } + return ((BooleanLiteral) value).getParsedValue(); + } + private int parseIntFromLiteral(final Object value, final String name) { if (!(value instanceof LongLiteral)) { throw new SemanticException( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java index bc6475b2323..c3839a2e630 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java @@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.partition.DataPartition; +import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor.SeriesPartitionKey; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathPatternTree; import org.apache.iotdb.commons.schema.SchemaConstant; @@ -84,6 +85,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.OrderByParame import org.apache.iotdb.db.queryengine.plan.statement.component.OrderByKey; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem; +import org.apache.iotdb.db.utils.CommonUtils; import org.apache.iotdb.db.utils.constant.SqlConstant; import com.google.common.collect.ImmutableList; @@ -1442,7 +1444,10 @@ public class SourceRewriter extends BaseSourceRewriter<DistributionPlanContext> Map<Integer, List<TRegionReplicaSet>> slot2ReplicasMap = cache.computeIfAbsent(db, k -> new HashMap<>()); - TSeriesPartitionSlot tSeriesPartitionSlot = dataPartition.calculateDeviceGroupId(deviceID); + + SeriesPartitionKey seriesPartitionKey = CommonUtils.getSeriesPartitionKey(deviceID, db); + TSeriesPartitionSlot tSeriesPartitionSlot = + dataPartition.calculateDeviceGroupId(seriesPartitionKey); Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> finalSeriesPartitionMap = seriesPartitionMap; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/EvolveSchemaNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/EvolveSchemaNode.java index b4fc82e0710..834fd05887d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/EvolveSchemaNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/EvolveSchemaNode.java @@ -27,6 +27,9 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; +import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegionPlan; +import org.apache.iotdb.db.schemaengine.schemaregion.SchemaRegionPlanType; +import org.apache.iotdb.db.schemaengine.schemaregion.SchemaRegionPlanVisitor; import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue; @@ -45,13 +48,17 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -public class EvolveSchemaNode extends SearchNode implements WALEntryValue { +public class EvolveSchemaNode extends SearchNode implements WALEntryValue, ISchemaRegionPlan { private static final Logger LOGGER = LoggerFactory.getLogger(EvolveSchemaNode.class); protected TRegionReplicaSet regionReplicaSet; protected ProgressIndex progressIndex; - private final List<SchemaEvolution> schemaEvolutions; + private List<SchemaEvolution> schemaEvolutions; + + public EvolveSchemaNode() { + super(new PlanNodeId("")); + } public EvolveSchemaNode(PlanNodeId id, List<SchemaEvolution> schemaEvolutions) { super(id); @@ -186,4 +193,14 @@ public class EvolveSchemaNode extends SearchNode implements WALEntryValue { public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { return visitor.visitEvolveSchemaNode(this, context); } + + @Override + public SchemaRegionPlanType getPlanType() { + return SchemaRegionPlanType.EVOLVE_SCHEMA; + } + + @Override + public <R, C> R accept(SchemaRegionPlanVisitor<R, C> visitor, C context) { + return visitor.visitEvolveSchema(this, context); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java index a2bd6cb1a00..ffe5a3b99cd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor.FullDeviceIdKey; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.commons.utils.TimePartitionUtils; @@ -129,7 +130,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue { analysis .getDataPartitionInfo() .getDataRegionReplicaSetForWriting( - getDeviceID(), timePartitionSlot, analysis.getDatabaseName()); + new FullDeviceIdKey(getDeviceID()), timePartitionSlot, analysis.getDatabaseName()); // collect redirectInfo analysis.setRedirectNodeList( Collections.singletonList( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java index 7392b761270..7494f62bc87 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java @@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.consensus.index.ProgressIndex; +import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor.FullDeviceIdKey; import org.apache.iotdb.commons.utils.StatusUtils; import org.apache.iotdb.commons.utils.TimePartitionUtils; import org.apache.iotdb.db.exception.DataTypeInconsistentException; @@ -276,7 +277,7 @@ public class InsertRowsNode extends InsertNode implements WALEntryValue { analysis .getDataPartitionInfo() .getDataRegionReplicaSetForWriting( - insertRowNode.targetPath.getIDeviceIDAsFullDevice(), + new FullDeviceIdKey(insertRowNode.targetPath.getIDeviceIDAsFullDevice()), TimePartitionUtils.getTimePartitionSlot(insertRowNode.getTime()), null); // Collect redirectInfo diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java index f1e28d32b10..f54d62a7a2b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java @@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor.FullDeviceIdKey; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.utils.StatusUtils; import org.apache.iotdb.commons.utils.TimePartitionUtils; @@ -173,7 +174,7 @@ public class InsertRowsOfOneDeviceNode extends InsertNode { analysis .getDataPartitionInfo() .getDataRegionReplicaSetForWriting( - targetPath.getIDeviceIDAsFullDevice(), + new FullDeviceIdKey(targetPath.getIDeviceIDAsFullDevice()), timePartitionSlot, analysis.getDatabaseName()); Map<TTimePartitionSlot, List<InsertRowNode>> tmpMap = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java index 39683e5d9f9..c6991316447 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java @@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor.FullDeviceIdKey; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.utils.CommonDateTimeUtils; import org.apache.iotdb.commons.utils.TestOnly; @@ -288,7 +289,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { analysis .getDataPartitionInfo() .getDataRegionReplicaSetForWriting( - deviceID, splitInfo.timePartitionSlots, analysis.getDatabaseName()); + new FullDeviceIdKey(deviceID), + splitInfo.timePartitionSlots, + analysis.getDatabaseName()); splitInfo.replicaSets = replicaSets; // collect redirectInfo analysis.addEndPointToRedirectNodeList( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java index cd53d65fe5a..de397125d7c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor.SeriesPartitionKey; import org.apache.iotdb.commons.utils.TimePartitionUtils; import org.apache.iotdb.db.exception.DataTypeInconsistentException; import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis; @@ -30,6 +31,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; import org.apache.iotdb.db.storageengine.dataregion.IObjectPath; import org.apache.iotdb.db.storageengine.dataregion.memtable.AbstractMemTable; +import org.apache.iotdb.db.utils.CommonUtils; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.IDeviceID; @@ -174,11 +176,14 @@ public class RelationalInsertRowsNode extends InsertRowsNode { InsertRowNode insertRowNode = getInsertRowNodeList().get(i); // Data region for insert row node // each row may belong to different database, pass null for auto-detection + SeriesPartitionKey seriesPartitionKey = + CommonUtils.getSeriesPartitionKey( + insertRowNode.getDeviceID(), analysis.getDatabaseName()); TRegionReplicaSet dataRegionReplicaSet = analysis .getDataPartitionInfo() .getDataRegionReplicaSetForWriting( - insertRowNode.getDeviceID(), + seriesPartitionKey, TimePartitionUtils.getTimePartitionSlot(insertRowNode.getTime()), analysis.getDatabaseName()); // handle object type diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java index 2bad744cfb9..3987818fff1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor.SeriesPartitionKey; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.commons.utils.TestOnly; @@ -37,6 +38,7 @@ import org.apache.iotdb.db.storageengine.dataregion.IObjectPath; import org.apache.iotdb.db.storageengine.dataregion.memtable.AbstractMemTable; import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGroup; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; +import org.apache.iotdb.db.utils.CommonUtils; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.IDeviceID; @@ -210,11 +212,13 @@ public class RelationalInsertTabletNode extends InsertTabletNode { for (final Map.Entry<IDeviceID, PartitionSplitInfo> entry : deviceIDSplitInfoMap.entrySet()) { final IDeviceID deviceID = entry.getKey(); final PartitionSplitInfo splitInfo = entry.getValue(); + SeriesPartitionKey seriesPartitionKey = + CommonUtils.getSeriesPartitionKey(deviceID, analysis.getDatabaseName()); final List<TRegionReplicaSet> replicaSets = analysis .getDataPartitionInfo() .getDataRegionReplicaSetForWriting( - deviceID, splitInfo.timePartitionSlots, analysis.getDatabaseName()); + seriesPartitionKey, splitInfo.timePartitionSlots, analysis.getDatabaseName()); splitInfo.replicaSets = replicaSets; // collect redirectInfo endPointMap.put( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index e13d09de4a5..81a67ddd991 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -26,6 +26,7 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.partition.DataPartition; import org.apache.iotdb.commons.partition.SchemaPartition; +import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor.SeriesPartitionKey; import org.apache.iotdb.commons.schema.table.TsTable; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.commons.utils.TimePartitionUtils; @@ -101,6 +102,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; import org.apache.iotdb.db.schemaengine.table.DataNodeTreeViewSchemaUtils; +import org.apache.iotdb.db.utils.CommonUtils; import org.apache.iotdb.rpc.TSStatusCode; import com.google.common.collect.ImmutableList; @@ -711,7 +713,8 @@ public class TableDistributedPlanGenerator seriesSlotMap, deviceEntry.getDeviceID(), node.getTimeFilter(), - cachedSeriesSlotWithRegions); + cachedSeriesSlotWithRegions, + dbName); regionReplicaSets.forEach( regionReplicaSet -> regionDeviceCount.put( @@ -802,7 +805,8 @@ public class TableDistributedPlanGenerator seriesSlotMap, deviceEntry.getDeviceID(), node.getTimeFilter(), - cachedSeriesSlotWithRegions); + cachedSeriesSlotWithRegions, + dbName); if (regionReplicaSets.size() > 1) { context.deviceCrossRegion = true; } @@ -892,7 +896,8 @@ public class TableDistributedPlanGenerator seriesSlotMap, deviceEntry.getDeviceID(), node.getTimeFilter(), - cachedSeriesSlotWithRegions); + cachedSeriesSlotWithRegions, + dbName); if (regionReplicaSets.size() > 1) { context.deviceCrossRegion = true; @@ -1212,7 +1217,8 @@ public class TableDistributedPlanGenerator seriesSlotMap, deviceEntry.getDeviceID(), node.getTimeFilter(), - cachedSeriesSlotWithRegions); + cachedSeriesSlotWithRegions, + dbName); if (regionReplicaSets.size() > 1) { needSplit = true; context.deviceCrossRegion = true; @@ -1295,10 +1301,14 @@ public class TableDistributedPlanGenerator Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> seriesSlotMap, IDeviceID deviceId, Filter timeFilter, - Map<Integer, List<TRegionReplicaSet>> cachedSeriesSlotWithRegions) { + Map<Integer, List<TRegionReplicaSet>> cachedSeriesSlotWithRegions, + String databaseName) { // given seriesPartitionSlot has already been calculated - final TSeriesPartitionSlot seriesPartitionSlot = dataPartition.calculateDeviceGroupId(deviceId); + SeriesPartitionKey seriesPartitionKey = + CommonUtils.getSeriesPartitionKey(deviceId, databaseName); + final TSeriesPartitionSlot seriesPartitionSlot = + dataPartition.calculateDeviceGroupId(seriesPartitionKey); List<TRegionReplicaSet> regionReplicaSets = cachedSeriesSlotWithRegions.get(seriesPartitionSlot.getSlotId()); if (regionReplicaSets != null) { @@ -1777,8 +1787,10 @@ public class TableDistributedPlanGenerator final List<IDeviceID> partitionKeyList = node.getPartitionKeyList(); final List<Object[]> deviceIDArray = node.getDeviceIdList(); for (int i = 0; i < node.getPartitionKeyList().size(); ++i) { + SeriesPartitionKey seriesPartitionKey = + CommonUtils.getSeriesPartitionKey(partitionKeyList.get(i), database); final TRegionReplicaSet regionReplicaSet = - databaseMap.get(schemaPartition.calculateDeviceGroupId(partitionKeyList.get(i))); + databaseMap.get(schemaPartition.calculateDeviceGroupId(seriesPartitionKey)); if (Objects.nonNull(regionReplicaSet)) { tableDeviceFetchMap .computeIfAbsent( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/schema/CreateOrUpdateTableDeviceNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/schema/CreateOrUpdateTableDeviceNode.java index be4b1337e7e..eb9e4a25b96 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/schema/CreateOrUpdateTableDeviceNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/schema/CreateOrUpdateTableDeviceNode.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor.SeriesPartitionKey; import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; @@ -29,6 +30,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegionPlan; import org.apache.iotdb.db.schemaengine.schemaregion.SchemaRegionPlanType; import org.apache.iotdb.db.schemaengine.schemaregion.SchemaRegionPlanVisitor; +import org.apache.iotdb.db.utils.CommonUtils; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -260,10 +262,10 @@ public class CreateOrUpdateTableDeviceNode extends WritePlanNode implements ISch final List<IDeviceID> partitionKeyList = getPartitionKeyList(); for (int i = 0; i < partitionKeyList.size(); i++) { // Use the string literal of deviceId as the partition key + SeriesPartitionKey seriesPartitionKey = + CommonUtils.getSeriesPartitionKey(partitionKeyList.get(i), database); final TRegionReplicaSet regionReplicaSet = - analysis - .getSchemaPartitionInfo() - .getSchemaRegionReplicaSet(database, partitionKeyList.get(i)); + analysis.getSchemaPartitionInfo().getSchemaRegionReplicaSet(database, seriesPartitionKey); splitMap.computeIfAbsent(regionReplicaSet, k -> new ArrayList<>()).add(i); } final List<WritePlanNode> result = new ArrayList<>(splitMap.size()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/BooleanLiteral.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/BooleanLiteral.java index 42f62559a72..c8a06a501be 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/BooleanLiteral.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/BooleanLiteral.java @@ -119,4 +119,8 @@ public class BooleanLiteral extends Literal { return INSTANCE_SIZE + AstMemoryEstimationHelper.getEstimatedSizeOfNodeLocation(getLocationInternal()); } + + public boolean getParsedValue() { + return value; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RenameColumn.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RenameColumn.java index 5e1277316a4..cbf2a561645 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RenameColumn.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RenameColumn.java @@ -19,8 +19,6 @@ package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; -import org.apache.iotdb.db.exception.sql.SemanticException; - import com.google.common.collect.ImmutableList; import org.apache.tsfile.utils.RamUsageEstimator; @@ -57,9 +55,6 @@ public final class RenameColumn extends Statement { this.tableIfExists = tableIfExists; this.columnIfNotExists = columnIfNotExists; this.view = view; - if (!view) { - throw new SemanticException("The renaming for base table column is currently unsupported"); - } } public QualifiedName getTable() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RenameTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RenameTable.java index 77c3296fee0..5919dec7d03 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RenameTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RenameTable.java @@ -19,8 +19,6 @@ package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; -import org.apache.iotdb.db.exception.sql.SemanticException; - import com.google.common.collect.ImmutableList; import org.apache.tsfile.utils.RamUsageEstimator; @@ -51,9 +49,6 @@ public class RenameTable extends Statement { this.target = requireNonNull(target, "target name is null"); this.tableIfExists = tableIfExists; this.view = view; - if (!view) { - throw new SemanticException("The renaming for base table is currently unsupported"); - } } public QualifiedName getSource() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java index 377f00664c0..2ee200c7454 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java @@ -34,6 +34,7 @@ import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.partition.DataPartition; import org.apache.iotdb.commons.partition.DataPartitionQueryParam; import org.apache.iotdb.commons.partition.StorageExecutor; +import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor.SeriesPartitionKey; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.service.metric.enums.Metric; import org.apache.iotdb.commons.service.metric.enums.Tag; @@ -72,6 +73,7 @@ import org.apache.iotdb.db.storageengine.load.splitter.ChunkData; import org.apache.iotdb.db.storageengine.load.splitter.DeletionData; import org.apache.iotdb.db.storageengine.load.splitter.TsFileData; import org.apache.iotdb.db.storageengine.load.splitter.TsFileSplitter; +import org.apache.iotdb.db.utils.CommonUtils; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq; import org.apache.iotdb.rpc.TSStatusCode; @@ -846,12 +848,15 @@ public class LoadTsFileScheduler implements IScheduler { subSlotList.stream() .map( pair -> - // (database != null) means this file will be loaded into table-model - database != null - ? dataPartition.getDataRegionReplicaSetForWriting( - pair.left, pair.right, database) - : dataPartition.getDataRegionReplicaSetForWriting( - pair.left, pair.right)) + // (database != null) means this file will be loaded into table-model + { + SeriesPartitionKey seriesPartitionKey = + CommonUtils.getSeriesPartitionKey(pair.left, database); + return database != null + ? dataPartition.getDataRegionReplicaSetForWriting( + seriesPartitionKey, pair.right, database) + : dataPartition.getDataRegionReplicaSetForWriting(pair.left, pair.right); + }) .collect(Collectors.toList())); } return replicaSets; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java index 4a11165353c..578b0a1e862 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/ISchemaRegion.java @@ -29,6 +29,7 @@ import org.apache.iotdb.commons.schema.template.Template; import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException; import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.AlterEncodingCompressorNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.EvolveSchemaNode; import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableId; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.ConstructTableDevicesBlackListNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.CreateOrUpdateTableDeviceNode; @@ -418,5 +419,7 @@ public interface ISchemaRegion { void addNodeLocation(final TableNodeLocationAddNode node) throws MetadataException; + void applySchemaEvolution(EvolveSchemaNode schemaEvolutions) throws MetadataException; + // endregion } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/SchemaRegionPlanType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/SchemaRegionPlanType.java index b0d41c725ee..8b058742ea3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/SchemaRegionPlanType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/SchemaRegionPlanType.java @@ -63,6 +63,7 @@ public enum SchemaRegionPlanType { ROLLBACK_TABLE_DEVICES_BLACK_LIST((byte) 106), DELETE_TABLE_DEVICES_IN_BLACK_LIST((byte) 107), DROP_TABLE_ATTRIBUTE((byte) 108), + EVOLVE_SCHEMA((byte) 109), // query plan doesn't need any ser/deSer, thus use one type to represent all READ_SCHEMA(Byte.MAX_VALUE); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/SchemaRegionPlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/SchemaRegionPlanVisitor.java index 0cf087a40e1..e97b0942918 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/SchemaRegionPlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/SchemaRegionPlanVisitor.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.schemaengine.schemaregion; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.AlterEncodingCompressorNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.EvolveSchemaNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.ConstructTableDevicesBlackListNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.CreateOrUpdateTableDeviceNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.DeleteTableDeviceNode; @@ -185,4 +186,8 @@ public abstract class SchemaRegionPlanVisitor<R, C> { final AlterEncodingCompressorNode alterEncodingCompressorNode, final C context) { return visitSchemaRegionPlan(alterEncodingCompressorNode, context); } + + public R visitEvolveSchema(final EvolveSchemaNode evolveSchemaNode, final C context) { + return visitSchemaRegionPlan(evolveSchemaNode, context); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java index 230ed8330ca..4baa250d786 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java @@ -49,10 +49,12 @@ import org.apache.iotdb.db.queryengine.execution.operator.schema.source.DeviceAt import org.apache.iotdb.db.queryengine.execution.operator.schema.source.DeviceBlackListConstructor; import org.apache.iotdb.db.queryengine.execution.operator.schema.source.TableDeviceQuerySource; import org.apache.iotdb.db.queryengine.execution.relational.ColumnTransformerBuilder; +import org.apache.iotdb.db.queryengine.execution.relational.ColumnTransformerBuilder.Context; import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider; import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.AlterEncodingCompressorNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateTimeSeriesNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.EvolveSchemaNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache; @@ -125,6 +127,8 @@ import org.apache.iotdb.db.schemaengine.schemaregion.write.req.view.IDeleteLogic import org.apache.iotdb.db.schemaengine.schemaregion.write.req.view.IPreDeleteLogicalViewPlan; import org.apache.iotdb.db.schemaengine.schemaregion.write.req.view.IRollbackPreDeleteLogicalViewPlan; import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.TableRename; import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; import org.apache.iotdb.db.utils.SchemaUtils; @@ -1555,7 +1559,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion { Objects.nonNull(predicate) ? visitor.process( predicate, - new ColumnTransformerBuilder.Context( + new Context( sessionInfo, filterLeafColumnTransformerList, inputLocations, @@ -1582,8 +1586,8 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion { // records common ColumnTransformer between filter and project expressions final List<ColumnTransformer> commonTransformerList = new ArrayList<>(); - final ColumnTransformerBuilder.Context projectColumnTransformerContext = - new ColumnTransformerBuilder.Context( + final Context projectColumnTransformerContext = + new Context( sessionInfo, projectLeafColumnTransformerList, inputLocations, @@ -1782,6 +1786,23 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion { } } + @Override + public void applySchemaEvolution(EvolveSchemaNode node) throws MetadataException { + for (SchemaEvolution schemaEvolution : node.getSchemaEvolutions()) { + if (schemaEvolution instanceof TableRename) { + TableRename tableRename = (TableRename) schemaEvolution; + applyTableRename(tableRename.getNameBefore(), tableRename.getNameAfter()); + } else { + logger.warn("Unsupported schemaEvolution {}, ignore it", schemaEvolution); + } + } + writeToMLog(node); + } + + public void applyTableRename(String oldName, String newName) { + mTree.renameTable(oldName, newName); + } + // endregion private static class RecoverOperationResult { @@ -2115,5 +2136,16 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion { return new RecoverOperationResult(e); } } + + @Override + public RecoverOperationResult visitEvolveSchema( + EvolveSchemaNode evolveSchemaNode, SchemaRegionMemoryImpl context) { + try { + applySchemaEvolution(evolveSchemaNode); + return RecoverOperationResult.SUCCESS; + } catch (final MetadataException e) { + return new RecoverOperationResult(e); + } + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java index 2f4bec896dd..5ebc9d52f9d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java @@ -41,6 +41,7 @@ import org.apache.iotdb.db.exception.metadata.SchemaDirCreationFailureException; import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException; import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.AlterEncodingCompressorNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.EvolveSchemaNode; import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableId; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.ConstructTableDevicesBlackListNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.CreateOrUpdateTableDeviceNode; @@ -1583,6 +1584,11 @@ public class SchemaRegionPBTreeImpl implements ISchemaRegion { throw new UnsupportedOperationException(); } + @Override + public void applySchemaEvolution(EvolveSchemaNode schemaEvolutions) { + throw new UnsupportedOperationException(); + } + // endregion private static class RecoverOperationResult { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/visitor/SchemaRegionPlanDeserializer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/visitor/SchemaRegionPlanDeserializer.java index 163ccb4e59d..eb2b0722526 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/visitor/SchemaRegionPlanDeserializer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/visitor/SchemaRegionPlanDeserializer.java @@ -26,6 +26,7 @@ import org.apache.iotdb.commons.path.PathDeserializeUtil; import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.AlterEncodingCompressorNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.EvolveSchemaNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.ConstructTableDevicesBlackListNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.CreateOrUpdateTableDeviceNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.DeleteTableDeviceNode; @@ -482,5 +483,11 @@ public class SchemaRegionPlanDeserializer implements IDeserializer<ISchemaRegion final AlterEncodingCompressorNode alterEncodingCompressorNode, final ByteBuffer buffer) { return (AlterEncodingCompressorNode) PlanNodeType.deserialize(buffer); } + + @Override + public ISchemaRegionPlan visitEvolveSchema( + EvolveSchemaNode evolveSchemaNode, ByteBuffer buffer) { + return (EvolveSchemaNode) PlanNodeType.deserialize(buffer); + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/visitor/SchemaRegionPlanSerializer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/visitor/SchemaRegionPlanSerializer.java index b7b7d9758ca..d65b18f18af 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/visitor/SchemaRegionPlanSerializer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/visitor/SchemaRegionPlanSerializer.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.AlterEncodingCompressorNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.EvolveSchemaNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.ConstructTableDevicesBlackListNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.CreateOrUpdateTableDeviceNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.DeleteTableDeviceNode; @@ -553,6 +554,12 @@ public class SchemaRegionPlanSerializer implements ISerializer<ISchemaRegionPlan return visitPlanNode(alterEncodingCompressorNode, outputStream); } + @Override + public SchemaRegionPlanSerializationResult visitEvolveSchema( + EvolveSchemaNode evolveSchemaNode, DataOutputStream outputStream) { + return visitPlanNode(evolveSchemaNode, outputStream); + } + private SchemaRegionPlanSerializationResult visitPlanNode( final PlanNode planNode, final DataOutputStream outputStream) { try { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java index 9aec147ace1..82dc65cd338 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java @@ -1703,6 +1703,16 @@ public class MTreeBelowSGMemoryImpl { return notExistNum; } + public synchronized void renameTable(final String oldTableName, final String newTableName) { + IMemMNode tableNode = databaseMNode.deleteChild(oldTableName); + if (tableNode == null) { + LOGGER.warn("Renaming a non-existing table {}.", oldTableName); + return; + } + tableNode.setName(newTableName); + databaseMNode.addChild(tableNode); + } + public void createOrUpdateTableDevice( final String tableName, final String[] devicePath, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/SchemaRegionWritePlanFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/SchemaRegionWritePlanFactory.java index 8a87a6f3829..5c653f42e9b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/SchemaRegionWritePlanFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/SchemaRegionWritePlanFactory.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.AlterEncodingCompressorNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.EvolveSchemaNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.ConstructTableDevicesBlackListNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.CreateOrUpdateTableDeviceNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.DeleteTableDeviceNode; @@ -124,6 +125,8 @@ public class SchemaRegionWritePlanFactory { return DeleteTableDevicesInBlackListNode.MOCK_INSTANCE; case DROP_TABLE_ATTRIBUTE: return TableAttributeColumnDropNode.MOCK_INSTANCE; + case EVOLVE_SCHEMA: + return new EvolveSchemaNode(); default: throw new UnsupportedOperationException( String.format( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 4f6ddb7f46d..be0f7c3e5ca 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -1263,7 +1263,6 @@ public class DataRegion implements IDataRegionForQuery { if (deleted) { return; } - DataNodeTableCache.getInstance().invalid(databaseName); syncCloseAllWorkingTsFileProcessors(); @@ -2942,14 +2941,7 @@ public class DataRegion implements IDataRegionForQuery { List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>(); for (TsFileResource tsFileResource : tsFileResources) { - EvolvedSchema evolvedSchema = tsFileResource.getMergedEvolvedSchema(); - IDeviceID deviceIdBackThen = singleDeviceId; - if (evolvedSchema != null) { - deviceIdBackThen = evolvedSchema.rewriteToOriginal(singleDeviceId); - } - - if (!tsFileResource.isSatisfied( - deviceIdBackThen, globalTimeFilter, isSeq, context.isDebug())) { + if (!tsFileResource.isSatisfied(singleDeviceId, globalTimeFilter, isSeq, context.isDebug())) { continue; } try { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java index 610365cbfb7..8f74c561a88 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java @@ -32,6 +32,7 @@ import org.apache.tsfile.utils.Pair; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -530,7 +531,7 @@ public class TsFileManager { long partitionId, long minFileVersionIncluded, long maxFileVersionExcluded) { readLock(); try { - List<TsFileSet> tsFileSetList = tsfileSets.get(partitionId); + List<TsFileSet> tsFileSetList = tsfileSets.getOrDefault(partitionId, Collections.emptyList()); return tsFileSetList.stream() .filter( s -> diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java index 0462002f1fb..576e4e5ade5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java @@ -261,6 +261,7 @@ public class TsFileResource implements PersistentResource, Cloneable { this.tsFileID = originTsFileResource.tsFileID; this.isSeq = originTsFileResource.isSeq; this.tierLevel = originTsFileResource.tierLevel; + this.tsFileManager = originTsFileResource.tsFileManager; } public synchronized void serialize(String targetFilePath) throws IOException { @@ -1047,6 +1048,9 @@ public class TsFileResource implements PersistentResource, Cloneable { return timeIndex.checkDeviceIdExist(deviceId); } + /** + * @param deviceId IDeviceId after schema evolution + */ public boolean isSatisfied(IDeviceID deviceId, Filter timeFilter, boolean isSeq, boolean debug) { return isSatisfied(deviceId, timeFilter, isSeq, debug, Long.MAX_VALUE); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolution.java index f1ac8edbcfa..109998dec27 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolution.java @@ -26,6 +26,7 @@ import org.apache.tsfile.utils.ReadWriteForEncodingUtils; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -88,4 +89,11 @@ public interface SchemaEvolution extends StreamSerializable, BufferSerializable } return list; } + + static void serializeList(List<SchemaEvolution> list, OutputStream stream) throws IOException { + ReadWriteForEncodingUtils.writeVarInt(list.size(), stream); + for (SchemaEvolution evolution : list) { + evolution.serialize(stream); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java index 784312f9f9c..e4cbfb87947 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java @@ -19,6 +19,10 @@ package org.apache.iotdb.db.utils; +import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor.FullDeviceIdKey; +import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor.NoTableNameDeviceIdKey; +import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor.SeriesPartitionKey; +import org.apache.iotdb.commons.schema.table.TsTable; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.service.metric.enums.Metric; @@ -30,6 +34,7 @@ import org.apache.iotdb.db.protocol.thrift.OperationType; import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; import org.apache.iotdb.db.queryengine.plan.statement.StatementType; import org.apache.iotdb.db.queryengine.plan.statement.literal.BinaryLiteral; +import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; import org.apache.iotdb.db.utils.constant.SqlConstant; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.service.rpc.thrift.TSAggregationQueryReq; @@ -456,4 +461,18 @@ public class CommonUtils { } return tsBlockBuilder.toString(); } + + public static SeriesPartitionKey getSeriesPartitionKey(IDeviceID deviceID, String databaseName) { + if (databaseName != null && deviceID.isTableModel()) { + TsTable table = + DataNodeTableCache.getInstance().getTable(databaseName, deviceID.getTableName()); + boolean canAlterName = table.canAlterName(); + if (canAlterName) { + return new NoTableNameDeviceIdKey(deviceID); + } else { + return new FullDeviceIdKey(deviceID); + } + } + return new FullDeviceIdKey(deviceID); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java index 100c40eddcc..99507d0ec0d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java @@ -22,9 +22,12 @@ package org.apache.iotdb.commons.partition; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor.FullDeviceIdKey; +import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor.SeriesPartitionKey; import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.commons.utils.TimePartitionUtils; +import org.apache.tsfile.annotations.TreeModel; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.read.filter.basic.Filter; import org.slf4j.Logger; @@ -95,18 +98,21 @@ public class DataPartition extends Partition { this.dataPartitionMap = dataPartitionMap; } + @TreeModel public List<List<TTimePartitionSlot>> getTimePartitionRange( IDeviceID deviceID, Filter timeFilter) { - String storageGroup = getDatabaseNameByDevice(deviceID); - TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceID); - if (!dataPartitionMap.containsKey(storageGroup) - || !dataPartitionMap.get(storageGroup).containsKey(seriesPartitionSlot)) { + String databaseName = getDatabaseNameByDevice(deviceID); + // since this method retrieves database from deviceId, it must only be used by the tree model + TSeriesPartitionSlot seriesPartitionSlot = + calculateDeviceGroupId(new FullDeviceIdKey(deviceID)); + if (!dataPartitionMap.containsKey(databaseName) + || !dataPartitionMap.get(databaseName).containsKey(seriesPartitionSlot)) { return Collections.emptyList(); } List<List<TTimePartitionSlot>> res = new ArrayList<>(); Map<TTimePartitionSlot, List<TRegionReplicaSet>> map = - dataPartitionMap.get(storageGroup).get(seriesPartitionSlot); + dataPartitionMap.get(databaseName).get(seriesPartitionSlot); List<TTimePartitionSlot> timePartitionSlotList = map.keySet().stream() .filter(key -> TimePartitionUtils.satisfyPartitionStartTime(timeFilter, key.startTime)) @@ -138,10 +144,13 @@ public class DataPartition extends Partition { return res; } + @TreeModel public List<TRegionReplicaSet> getDataRegionReplicaSetWithTimeFilter( final IDeviceID deviceId, final Filter timeFilter) { + // since this method retrieves database from deviceId, it must only be used by the tree model final String storageGroup = getDatabaseNameByDevice(deviceId); - final TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceId); + final TSeriesPartitionSlot seriesPartitionSlot = + calculateDeviceGroupId(new FullDeviceIdKey(deviceId)); Map<TTimePartitionSlot, List<TRegionReplicaSet>> regionReplicaSetMap = dataPartitionMap .getOrDefault(storageGroup, Collections.emptyMap()) @@ -166,8 +175,8 @@ public class DataPartition extends Partition { * <p>The device id shall be [table, seg1, ....] */ public List<TRegionReplicaSet> getDataRegionReplicaSetWithTimeFilter( - final String database, final IDeviceID deviceId, final Filter timeFilter) { - final TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceId); + final String database, final SeriesPartitionKey seriesPartitionKey, final Filter timeFilter) { + final TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(seriesPartitionKey); if (!dataPartitionMap.containsKey(database) || !dataPartitionMap.get(database).containsKey(seriesPartitionSlot)) { return Collections.singletonList(NOT_ASSIGNED); @@ -181,15 +190,18 @@ public class DataPartition extends Partition { .collect(toList()); } + @TreeModel public List<TRegionReplicaSet> getDataRegionReplicaSet( final IDeviceID deviceID, final TTimePartitionSlot tTimePartitionSlot) { + // since this method retrieves database from deviceId, it must only be used by the tree model final String storageGroup = getDatabaseNameByDevice(deviceID); final Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> dbMap = dataPartitionMap.get(storageGroup); if (dbMap == null) { return Collections.singletonList(NOT_ASSIGNED); } - final TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceID); + final TSeriesPartitionSlot seriesPartitionSlot = + calculateDeviceGroupId(new FullDeviceIdKey(deviceID)); final Map<TTimePartitionSlot, List<TRegionReplicaSet>> seriesSlotMap = dbMap.get(seriesPartitionSlot); if (seriesSlotMap == null) { @@ -206,16 +218,17 @@ public class DataPartition extends Partition { } public List<TRegionReplicaSet> getDataRegionReplicaSetForWriting( - final IDeviceID deviceID, + final SeriesPartitionKey key, final List<TTimePartitionSlot> timePartitionSlotList, String databaseName) { if (databaseName == null) { - databaseName = getDatabaseNameByDevice(deviceID); + // must be the tree model here + databaseName = getDatabaseNameByDevice(((FullDeviceIdKey) key).getDeviceID()); } // A list of data region replica sets will store data in a same time partition. // We will insert data to the last set in the list. // TODO return the latest dataRegionReplicaSet for each time partition - final TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceID); + final TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(key); // IMPORTANT TODO: (xingtanzjr) need to handle the situation for write operation that there are // more than 1 Regions for one timeSlot final List<TRegionReplicaSet> dataRegionReplicaSets = new ArrayList<>(); @@ -228,8 +241,7 @@ public class DataPartition extends Partition { if (targetRegionList == null || targetRegionList.isEmpty()) { throw new RuntimeException( String.format( - "targetRegionList is empty. device: %s, timeSlot: %s", - deviceID, timePartitionSlot)); + "targetRegionList is empty. device: %s, timeSlot: %s", key, timePartitionSlot)); } else { dataRegionReplicaSets.add(targetRegionList.get(targetRegionList.size() - 1)); } @@ -238,13 +250,16 @@ public class DataPartition extends Partition { } public TRegionReplicaSet getDataRegionReplicaSetForWriting( - final IDeviceID deviceID, final TTimePartitionSlot timePartitionSlot, String databaseName) { + final SeriesPartitionKey seriesPartitionKey, + final TTimePartitionSlot timePartitionSlot, + String databaseName) { // A list of data region replica sets will store data in a same time partition. // We will insert data to the last set in the list. // TODO return the latest dataRegionReplicaSet for each time partition - final TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceID); + final TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(seriesPartitionKey); if (databaseName == null) { - databaseName = getDatabaseNameByDevice(deviceID); + // must be the tree model here + databaseName = getDatabaseNameByDevice(((FullDeviceIdKey) seriesPartitionKey).getDeviceID()); } final Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> databasePartitionMap = dataPartitionMap.get(databaseName); @@ -261,10 +276,11 @@ public class DataPartition extends Partition { return regions.get(0); } + @TreeModel public TRegionReplicaSet getDataRegionReplicaSetForWriting( IDeviceID deviceID, TTimePartitionSlot timePartitionSlot) { return getDataRegionReplicaSetForWriting( - deviceID, timePartitionSlot, getDatabaseNameByDevice(deviceID)); + new FullDeviceIdKey(deviceID), timePartitionSlot, getDatabaseNameByDevice(deviceID)); } public String getDatabaseNameByDevice(IDeviceID deviceID) { @@ -302,15 +318,14 @@ public class DataPartition extends Partition { public void upsertDataPartition(DataPartition targetDataPartition) { requireNonNull(this.dataPartitionMap, "dataPartitionMap is null"); - for (Map.Entry< - String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>> + for (Entry<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>> targetDbEntry : targetDataPartition.getDataPartitionMap().entrySet()) { String database = targetDbEntry.getKey(); if (dataPartitionMap.containsKey(database)) { Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> sourceSeriesPartitionMap = dataPartitionMap.get(database); - for (Map.Entry<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> + for (Entry<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> targetSeriesSlotEntry : targetDbEntry.getValue().entrySet()) { TSeriesPartitionSlot targetSeriesSlot = targetSeriesSlotEntry.getKey(); @@ -319,7 +334,7 @@ public class DataPartition extends Partition { sourceSeriesPartitionMap.get(targetSeriesSlot); Map<TTimePartitionSlot, List<TRegionReplicaSet>> targetTimePartionMap = targetSeriesSlotEntry.getValue(); - for (Map.Entry<TTimePartitionSlot, List<TRegionReplicaSet>> targetEntry : + for (Entry<TTimePartitionSlot, List<TRegionReplicaSet>> targetEntry : targetTimePartionMap.entrySet()) { if (!sourceTimePartitionMap.containsKey(targetEntry.getKey())) { sourceTimePartitionMap.put(targetEntry.getKey(), targetEntry.getValue()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java index b94cf3d005d..5d32e33db61 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java @@ -21,8 +21,7 @@ package org.apache.iotdb.commons.partition; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; - -import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor.SeriesPartitionKey; import java.util.List; @@ -42,8 +41,8 @@ public abstract class Partition { seriesSlotExecutorName, seriesPartitionSlotNum); } - public TSeriesPartitionSlot calculateDeviceGroupId(IDeviceID deviceID) { - return executor.getSeriesPartitionSlot(deviceID); + public TSeriesPartitionSlot calculateDeviceGroupId(SeriesPartitionKey key) { + return executor.getSeriesPartitionSlot(key); } public abstract List<RegionReplicaSetInfo> getDistributionInfo(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java index 96abc749865..ff56dee46fe 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java @@ -22,10 +22,13 @@ package org.apache.iotdb.commons.partition; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor.FullDeviceIdKey; +import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor.SeriesPartitionKey; import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.tsfile.annotations.TreeModel; import org.apache.tsfile.file.metadata.IDeviceID; import java.util.ArrayList; @@ -75,18 +78,20 @@ public class SchemaPartition extends Partition { * * <p>The device id shall be [table, seg1, ....] */ - public TRegionReplicaSet getSchemaRegionReplicaSet(String database, IDeviceID deviceID) { - TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceID); + public TRegionReplicaSet getSchemaRegionReplicaSet(String database, SeriesPartitionKey key) { + TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(key); return schemaPartitionMap.get(database).get(seriesPartitionSlot); } + @TreeModel // [root, db, ....] public TRegionReplicaSet getSchemaRegionReplicaSet(final IDeviceID deviceID) { // A list of data region replica sets will store data in a same time partition. // We will insert data to the last set in the list. // TODO return the latest dataRegionReplicaSet for each time partition final String storageGroup = getStorageGroupByDevice(deviceID); - final TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceID); + final TSeriesPartitionSlot seriesPartitionSlot = + calculateDeviceGroupId(new FullDeviceIdKey(deviceID)); if (schemaPartitionMap.get(storageGroup) == null) { throw new RuntimeException( new IoTDBException("Path does not exist. ", TSStatusCode.PATH_NOT_EXIST.getStatusCode())); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/SeriesPartitionExecutor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/SeriesPartitionExecutor.java index d2666446e8e..8140cb259b1 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/SeriesPartitionExecutor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/SeriesPartitionExecutor.java @@ -45,7 +45,12 @@ public abstract class SeriesPartitionExecutor { @TestOnly public abstract TSeriesPartitionSlot getSeriesPartitionSlot(String device); - public abstract TSeriesPartitionSlot getSeriesPartitionSlot(IDeviceID deviceID); + @TestOnly + public TSeriesPartitionSlot getSeriesPartitionSlot(IDeviceID deviceID) { + return getSeriesPartitionSlot(new FullDeviceIdKey(deviceID)); + } + + public abstract TSeriesPartitionSlot getSeriesPartitionSlot(SeriesPartitionKey deviceID); public static SeriesPartitionExecutor getSeriesPartitionExecutor( String executorName, int seriesPartitionSlotNum) { @@ -73,4 +78,50 @@ public abstract class SeriesPartitionExecutor { } } } + + public interface SeriesPartitionKey { + int segmentNum(); + + Object segment(int index); + } + + public static class FullDeviceIdKey implements SeriesPartitionKey { + private final IDeviceID deviceID; + + public FullDeviceIdKey(IDeviceID deviceID) { + this.deviceID = deviceID; + } + + @Override + public int segmentNum() { + return deviceID.segmentNum(); + } + + @Override + public Object segment(int index) { + return deviceID.segment(index); + } + + public IDeviceID getDeviceID() { + return deviceID; + } + } + + public static class NoTableNameDeviceIdKey implements SeriesPartitionKey { + private final IDeviceID deviceID; + + public NoTableNameDeviceIdKey(IDeviceID deviceID) { + this.deviceID = deviceID; + } + + @Override + public int segmentNum() { + return deviceID.segmentNum() - 1; + } + + @Override + public Object segment(int index) { + return deviceID.segment(index + 1); + } + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/APHashExecutor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/APHashExecutor.java index 9390111d247..6b6c7976a35 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/APHashExecutor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/APHashExecutor.java @@ -22,8 +22,6 @@ package org.apache.iotdb.commons.partition.executor.hash; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; -import org.apache.tsfile.file.metadata.IDeviceID; - import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_SEPARATOR; public class APHashExecutor extends SeriesPartitionExecutor { @@ -49,13 +47,13 @@ public class APHashExecutor extends SeriesPartitionExecutor { } @Override - public TSeriesPartitionSlot getSeriesPartitionSlot(IDeviceID deviceID) { + public TSeriesPartitionSlot getSeriesPartitionSlot(SeriesPartitionKey key) { int hash = 0; - int segmentNum = deviceID.segmentNum(); + int segmentNum = key.segmentNum(); int index = 0; for (int segmentID = 0; segmentID < segmentNum; segmentID++) { - Object segment = deviceID.segment(segmentID); + Object segment = key.segment(segmentID); if (segment instanceof String) { String segmentStr = (String) segment; for (int i = 0; i < segmentStr.length(); i++) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/BKDRHashExecutor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/BKDRHashExecutor.java index c039e8ddd83..9502f8007df 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/BKDRHashExecutor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/BKDRHashExecutor.java @@ -22,8 +22,6 @@ package org.apache.iotdb.commons.partition.executor.hash; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; -import org.apache.tsfile.file.metadata.IDeviceID; - import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_SEPARATOR; public class BKDRHashExecutor extends SeriesPartitionExecutor { @@ -47,12 +45,12 @@ public class BKDRHashExecutor extends SeriesPartitionExecutor { } @Override - public TSeriesPartitionSlot getSeriesPartitionSlot(IDeviceID deviceID) { + public TSeriesPartitionSlot getSeriesPartitionSlot(SeriesPartitionKey key) { int hash = 0; - int segmentNum = deviceID.segmentNum(); + int segmentNum = key.segmentNum(); for (int segmentID = 0; segmentID < segmentNum; segmentID++) { - Object segment = deviceID.segment(segmentID); + Object segment = key.segment(segmentID); if (segment instanceof String) { String segmentStr = (String) segment; for (int i = 0; i < segmentStr.length(); i++) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/JSHashExecutor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/JSHashExecutor.java index 1e8c2031583..734da1ee397 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/JSHashExecutor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/JSHashExecutor.java @@ -21,8 +21,6 @@ package org.apache.iotdb.commons.partition.executor.hash; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; -import org.apache.tsfile.file.metadata.IDeviceID; - import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_SEPARATOR; public class JSHashExecutor extends SeriesPartitionExecutor { @@ -46,12 +44,12 @@ public class JSHashExecutor extends SeriesPartitionExecutor { } @Override - public TSeriesPartitionSlot getSeriesPartitionSlot(IDeviceID deviceID) { + public TSeriesPartitionSlot getSeriesPartitionSlot(SeriesPartitionKey key) { int hash = BASE; - int segmentNum = deviceID.segmentNum(); + int segmentNum = key.segmentNum(); for (int segmentID = 0; segmentID < segmentNum; segmentID++) { - Object segment = deviceID.segment(segmentID); + Object segment = key.segment(segmentID); if (segment instanceof String) { String segmentStr = (String) segment; for (int i = 0; i < segmentStr.length(); i++) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/SDBMHashExecutor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/SDBMHashExecutor.java index e2143c00c0c..ec68812cac6 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/SDBMHashExecutor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/SDBMHashExecutor.java @@ -21,8 +21,6 @@ package org.apache.iotdb.commons.partition.executor.hash; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; -import org.apache.tsfile.file.metadata.IDeviceID; - import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_SEPARATOR; public class SDBMHashExecutor extends SeriesPartitionExecutor { @@ -44,12 +42,12 @@ public class SDBMHashExecutor extends SeriesPartitionExecutor { } @Override - public TSeriesPartitionSlot getSeriesPartitionSlot(IDeviceID deviceID) { + public TSeriesPartitionSlot getSeriesPartitionSlot(SeriesPartitionKey key) { int hash = 0; - int segmentNum = deviceID.segmentNum(); + int segmentNum = key.segmentNum(); for (int segmentID = 0; segmentID < segmentNum; segmentID++) { - Object segment = deviceID.segment(segmentID); + Object segment = key.segment(segmentID); if (segment instanceof String) { String segmentStr = (String) segment; for (int i = 0; i < segmentStr.length(); i++) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java index 7dd64481a0e..d77d5c670f4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java @@ -46,8 +46,9 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collections; +import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -71,7 +72,9 @@ public class TsTable { new TimeColumnSchema(TIME_COLUMN_NAME, TSDataType.TIMESTAMP); public static final String TTL_PROPERTY = "ttl"; - public static final Set<String> TABLE_ALLOWED_PROPERTIES = Collections.singleton(TTL_PROPERTY); + public static final String ALLOW_ALTER_NAME_PROPERTY = "allow_alter_name"; + public static final Set<String> TABLE_ALLOWED_PROPERTIES = + new HashSet<>(Arrays.asList(TTL_PROPERTY, ALLOW_ALTER_NAME_PROPERTY)); private static final String OBJECT_STRING_ERROR = "When there are object fields, the %s %s shall not be '.', '..' or contain './', '.\\'."; protected String tableName; @@ -474,4 +477,9 @@ public class TsTable { + props + '}'; } + + public boolean canAlterName() { + return Boolean.parseBoolean( + getProps().getOrDefault(TsTable.ALLOW_ALTER_NAME_PROPERTY, "false")); + } } diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index fcc31bdafdf..ab3d973cd2f 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -465,6 +465,12 @@ struct TDataRegionEvolveSchemaReq { 3: optional bool isGeneratedByPipe } +struct TSchemaRegionEvolveSchemaReq { + 1: required list<common.TConsensusGroupId> schemaRegionIdList + 2: required binary schemaEvolutions + 3: optional bool isGeneratedByPipe +} + struct TDeleteTimeSeriesReq { 1: required list<common.TConsensusGroupId> schemaRegionIdList 2: required binary pathPatternTree @@ -1100,6 +1106,8 @@ service IDataNodeRPCService { common.TSStatus evolveSchemaInDataRegion(TDataRegionEvolveSchemaReq req) + common.TSStatus evolveSchemaInSchemaRegion(TSchemaRegionEvolveSchemaReq req) + /** * Delete matched timeseries and remove according schema black list in target schemRegion */
