This is an automated email from the ASF dual-hosted git repository. liyuheng pushed a commit to branch lyh/multi-region-extend-remove in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit aa87afe30ca6fe139f0ae38826220b6cd173ab9e Author: liyuheng <[email protected]> AuthorDate: Mon Aug 18 16:44:23 2025 +0800 done --- .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 4 +- .../iotdb/confignode/manager/ConfigManager.java | 4 +- .../iotdb/confignode/manager/ProcedureManager.java | 63 ++++++++++++++++++---- .../config/executor/ClusterConfigTaskExecutor.java | 4 +- .../config/metadata/region/ExtendRegionTask.java | 2 +- .../config/metadata/region/RemoveRegionTask.java | 2 +- .../db/queryengine/plan/parser/ASTVisitor.java | 11 ++-- .../plan/relational/sql/ast/ExtendRegion.java | 16 +++--- .../plan/relational/sql/ast/RemoveRegion.java | 16 +++--- .../plan/relational/sql/parser/AstBuilder.java | 10 ++-- .../metadata/region/ExtendRegionStatement.java | 10 ++-- .../metadata/region/RemoveRegionStatement.java | 10 ++-- .../db/relational/grammar/sql/RelationalSql.g4 | 4 +- .../src/main/thrift/confignode.thrift | 4 +- 14 files changed, 105 insertions(+), 55 deletions(-) diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 index 789fac304a6..475a40dd0ca 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 @@ -541,11 +541,11 @@ reconstructRegion ; extendRegion - : EXTEND REGION regionId=INTEGER_LITERAL TO targetDataNodeId=INTEGER_LITERAL + : EXTEND REGION regionIds+=INTEGER_LITERAL (COMMA regionIds+=INTEGER_LITERAL)* TO targetDataNodeId=INTEGER_LITERAL ; removeRegion - : REMOVE REGION regionId=INTEGER_LITERAL FROM targetDataNodeId=INTEGER_LITERAL + : REMOVE REGION regionIds+=INTEGER_LITERAL (COMMA regionIds+=INTEGER_LITERAL)* FROM targetDataNodeId=INTEGER_LITERAL ; verifyConnection diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 20d706a1252..3071830be60 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -2489,7 +2489,7 @@ public class ConfigManager implements IManager { public TSStatus extendRegion(TExtendRegionReq req) { TSStatus status = confirmLeader(); return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() - ? procedureManager.extendRegion(req) + ? procedureManager.extendRegions(req) : status; } @@ -2497,7 +2497,7 @@ public class ConfigManager implements IManager { public TSStatus removeRegion(TRemoveRegionReq req) { TSStatus status = confirmLeader(); return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() - ? procedureManager.removeRegion(req) + ? procedureManager.removeRegions(req) : status; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index 9a5feb0b407..3dcb01de273 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -176,6 +176,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiFunction; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -824,7 +825,7 @@ public class ProcedureManager { failMessage = String.format( "Target DataNode %s already contains region %s", - targetDataNode.getDataNodeId(), req.getRegionId()); + targetDataNode.getDataNodeId(), regionId); } if (failMessage != null) { @@ -1125,14 +1126,60 @@ public class ProcedureManager { return RpcUtils.SUCCESS_STATUS; } - public TSStatus extendRegion(TExtendRegionReq req) { + public TSStatus extendRegions(TExtendRegionReq req) { + return processExtendOrRemoveRegions( + req.getRegionId(), req, this::extendOneRegion, TSStatusCode.EXTEND_REGION_ERROR); + } + + public TSStatus removeRegions(TRemoveRegionReq req) { + return processExtendOrRemoveRegions( + req.getRegionId(), req, this::removeOneRegion, TSStatusCode.REMOVE_REGION_PEER_ERROR); + } + + private <R> TSStatus processExtendOrRemoveRegions( + Iterable<Integer> regionIds, + R req, + BiFunction<Integer, R, TSStatus> regionAction, + TSStatusCode errorCode) { + TSStatus resp = new TSStatus(); + StringBuilder messageBuilder = new StringBuilder(); + + int total = 0, success = 0; + for (int regionId : regionIds) { + total++; + TSStatus subStatus = regionAction.apply(regionId, req); + if (subStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + messageBuilder.append("region ").append(regionId).append(": Successfully submitted\n"); + success++; + } else { + messageBuilder + .append("region ") + .append(regionId) + .append(": ") + .append(subStatus.getMessage()) + .append('\n'); + } + resp.addToSubStatus(subStatus); + } + + messageBuilder.insert( + 0, + String.format( + "Total regions: %d, successfully submitted: %d, failed to submit: %d\n", + total, success, total - success)); + + resp.setCode( + total == success ? TSStatusCode.SUCCESS_STATUS.getStatusCode() : errorCode.getStatusCode()); + resp.setMessage(messageBuilder.toString()); + return resp; + } + + private TSStatus extendOneRegion(int theRegionId, TExtendRegionReq req) { try (AutoCloseableLock ignoredLock = AutoCloseableLock.acquire(env.getSubmitRegionMigrateLock())) { TConsensusGroupId regionId; Optional<TConsensusGroupId> optional = - configManager - .getPartitionManager() - .generateTConsensusGroupIdByRegionId(req.getRegionId()); + configManager.getPartitionManager().generateTConsensusGroupIdByRegionId(theRegionId); if (optional.isPresent()) { regionId = optional.get(); } else { @@ -1171,14 +1218,12 @@ public class ProcedureManager { } } - public TSStatus removeRegion(TRemoveRegionReq req) { + private TSStatus removeOneRegion(int theRegionId, TRemoveRegionReq req) { try (AutoCloseableLock ignoredLock = AutoCloseableLock.acquire(env.getSubmitRegionMigrateLock())) { TConsensusGroupId regionId; Optional<TConsensusGroupId> optional = - configManager - .getPartitionManager() - .generateTConsensusGroupIdByRegionId(req.getRegionId()); + configManager.getPartitionManager().generateTConsensusGroupIdByRegionId(theRegionId); if (optional.isPresent()) { regionId = optional.get(); } else { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index d5dc521292e..9e57532ad26 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -3213,7 +3213,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { final TExtendRegionReq req = new TExtendRegionReq( - extendRegionTask.getStatement().getRegionId(), + extendRegionTask.getStatement().getRegionIds(), extendRegionTask.getStatement().getDataNodeId(), extendRegionTask.getModel()); final TSStatus status = configNodeClient.extendRegion(req); @@ -3236,7 +3236,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { final TRemoveRegionReq req = new TRemoveRegionReq( - removeRegionTask.getStatement().getRegionId(), + removeRegionTask.getStatement().getRegionIds(), removeRegionTask.getStatement().getDataNodeId(), removeRegionTask.getModel()); final TSStatus status = configNodeClient.removeRegion(req); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/region/ExtendRegionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/region/ExtendRegionTask.java index 1ffce010927..8635c085ee1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/region/ExtendRegionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/region/ExtendRegionTask.java @@ -40,7 +40,7 @@ public class ExtendRegionTask implements IConfigTask { public ExtendRegionTask(ExtendRegion extendRegion) { this.statement = - new ExtendRegionStatement(extendRegion.getRegionId(), extendRegion.getDataNodeId()); + new ExtendRegionStatement(extendRegion.getRegionIds(), extendRegion.getDataNodeId()); this.model = Model.TABLE; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/region/RemoveRegionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/region/RemoveRegionTask.java index 86d4bafc9db..e1d32bff986 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/region/RemoveRegionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/region/RemoveRegionTask.java @@ -40,7 +40,7 @@ public class RemoveRegionTask implements IConfigTask { public RemoveRegionTask(RemoveRegion removeRegion) { this.statement = - new RemoveRegionStatement(removeRegion.getRegionId(), removeRegion.getDataNodeId()); + new RemoveRegionStatement(removeRegion.getRegionIds(), removeRegion.getDataNodeId()); this.model = Model.TABLE; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java index 706d14f052c..aa8a8621784 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java @@ -289,6 +289,7 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import static com.google.common.collect.ImmutableList.toImmutableList; +import static java.util.stream.Collectors.toList; import static org.apache.iotdb.commons.schema.SchemaConstant.ALL_RESULT_NODES; import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.FIELD; import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.TAG; @@ -4277,14 +4278,16 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> { @Override public Statement visitExtendRegion(IoTDBSqlParser.ExtendRegionContext ctx) { - return new ExtendRegionStatement( - Integer.parseInt(ctx.regionId.getText()), Integer.parseInt(ctx.targetDataNodeId.getText())); + List<Integer> regionIds = + ctx.regionIds.stream().map(token -> Integer.parseInt(token.getText())).collect(toList()); + return new ExtendRegionStatement(regionIds, Integer.parseInt(ctx.targetDataNodeId.getText())); } @Override public Statement visitRemoveRegion(IoTDBSqlParser.RemoveRegionContext ctx) { - return new RemoveRegionStatement( - Integer.parseInt(ctx.regionId.getText()), Integer.parseInt(ctx.targetDataNodeId.getText())); + List<Integer> regionIds = + ctx.regionIds.stream().map(token -> Integer.parseInt(token.getText())).collect(toList()); + return new RemoveRegionStatement(regionIds, Integer.parseInt(ctx.targetDataNodeId.getText())); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ExtendRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ExtendRegion.java index 687777c8558..b905e775130 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ExtendRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ExtendRegion.java @@ -26,12 +26,12 @@ import java.util.Objects; public class ExtendRegion extends Statement { - private final int regionId; + private final List<Integer> regionIds; private final int dataNodeId; - public ExtendRegion(int regionId, int dataNodeId) { + public ExtendRegion(List<Integer> regionIds, int dataNodeId) { super(null); - this.regionId = regionId; + this.regionIds = regionIds; this.dataNodeId = dataNodeId; } @@ -42,7 +42,7 @@ public class ExtendRegion extends Statement { @Override public int hashCode() { - return Objects.hash(ExtendRegion.class, regionId, dataNodeId); + return Objects.hash(ExtendRegion.class, regionIds, dataNodeId); } @Override @@ -54,12 +54,12 @@ public class ExtendRegion extends Statement { return false; } ExtendRegion another = (ExtendRegion) obj; - return regionId == another.regionId && dataNodeId == another.dataNodeId; + return regionIds == another.regionIds && dataNodeId == another.dataNodeId; } @Override public String toString() { - return String.format("extend region %d to datanode %d", regionId, dataNodeId); + return String.format("extend region %s to datanode %d", regionIds, dataNodeId); } @Override @@ -67,8 +67,8 @@ public class ExtendRegion extends Statement { return visitor.visitExtendRegion(this, context); } - public int getRegionId() { - return regionId; + public List<Integer> getRegionIds() { + return regionIds; } public int getDataNodeId() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RemoveRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RemoveRegion.java index 8c14433167a..eb63323b0d9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RemoveRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RemoveRegion.java @@ -26,12 +26,12 @@ import java.util.Objects; public class RemoveRegion extends Statement { - private final int regionId; + private final List<Integer> regionIds; private final int dataNodeId; - public RemoveRegion(int regionId, int dataNodeId) { + public RemoveRegion(List<Integer> regionId, int dataNodeId) { super(null); - this.regionId = regionId; + this.regionIds = regionId; this.dataNodeId = dataNodeId; } @@ -42,7 +42,7 @@ public class RemoveRegion extends Statement { @Override public int hashCode() { - return Objects.hash(RemoveRegion.class, regionId, dataNodeId); + return Objects.hash(RemoveRegion.class, regionIds, dataNodeId); } @Override @@ -54,12 +54,12 @@ public class RemoveRegion extends Statement { return false; } RemoveRegion another = (RemoveRegion) obj; - return regionId == another.regionId && dataNodeId == another.dataNodeId; + return regionIds == another.regionIds && dataNodeId == another.dataNodeId; } @Override public String toString() { - return String.format("remove region %d from %d", regionId, dataNodeId); + return String.format("remove region %d from %d", regionIds, dataNodeId); } @Override @@ -67,8 +67,8 @@ public class RemoveRegion extends Statement { return visitor.visitRemoveRegion(this, context); } - public int getRegionId() { - return regionId; + public List<Integer> getRegionIds() { + return regionIds; } public int getDataNodeId() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java index b445f23bff9..a69eead9a40 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java @@ -1438,14 +1438,16 @@ public class AstBuilder extends RelationalSqlBaseVisitor<Node> { @Override public Node visitExtendRegionStatement(RelationalSqlParser.ExtendRegionStatementContext ctx) { - return new ExtendRegion( - Integer.parseInt(ctx.regionId.getText()), Integer.parseInt(ctx.targetDataNodeId.getText())); + List<Integer> regionIds = + ctx.regionIds.stream().map(token -> Integer.parseInt(token.getText())).collect(toList()); + return new ExtendRegion(regionIds, Integer.parseInt(ctx.targetDataNodeId.getText())); } @Override public Node visitRemoveRegionStatement(RelationalSqlParser.RemoveRegionStatementContext ctx) { - return new RemoveRegion( - Integer.parseInt(ctx.regionId.getText()), Integer.parseInt(ctx.targetDataNodeId.getText())); + List<Integer> regionIds = + ctx.regionIds.stream().map(token -> Integer.parseInt(token.getText())).collect(toList()); + return new RemoveRegion(regionIds, Integer.parseInt(ctx.targetDataNodeId.getText())); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/region/ExtendRegionStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/region/ExtendRegionStatement.java index 0048a789f95..591c62c4b6b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/region/ExtendRegionStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/region/ExtendRegionStatement.java @@ -32,17 +32,17 @@ import java.util.List; public class ExtendRegionStatement extends Statement implements IConfigStatement { - private final int regionId; + private final List<Integer> regionIds; private final int dataNodeId; - public ExtendRegionStatement(int regionId, int dataNodeId) { + public ExtendRegionStatement(List<Integer> regionIds, int dataNodeId) { super(); - this.regionId = regionId; + this.regionIds = regionIds; this.dataNodeId = dataNodeId; } - public int getRegionId() { - return regionId; + public List<Integer> getRegionIds() { + return regionIds; } public int getDataNodeId() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/region/RemoveRegionStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/region/RemoveRegionStatement.java index aa185ad627e..f4d3b94c682 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/region/RemoveRegionStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/region/RemoveRegionStatement.java @@ -32,17 +32,17 @@ import java.util.List; public class RemoveRegionStatement extends Statement implements IConfigStatement { - private final int regionId; + private final List<Integer> regionIds; private final int dataNodeId; - public RemoveRegionStatement(int regionId, int dataNodeId) { + public RemoveRegionStatement(List<Integer> regionIds, int dataNodeId) { super(); - this.regionId = regionId; + this.regionIds = regionIds; this.dataNodeId = dataNodeId; } - public int getRegionId() { - return regionId; + public List<Integer> getRegionIds() { + return regionIds; } public int getDataNodeId() { diff --git a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 index 8aa8ac380d2..317944fddf5 100644 --- a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 +++ b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 @@ -577,11 +577,11 @@ reconstructRegionStatement ; extendRegionStatement - : EXTEND REGION regionId=INTEGER_VALUE TO targetDataNodeId=INTEGER_VALUE + : EXTEND REGION regionIds+=INTEGER_VALUE (COMMA regionIds+=INTEGER_VALUE)* TO targetDataNodeId=INTEGER_VALUE ; removeRegionStatement - : REMOVE REGION regionId=INTEGER_VALUE FROM targetDataNodeId=INTEGER_VALUE + : REMOVE REGION regionIds+=INTEGER_VALUE (COMMA regionIds+=INTEGER_VALUE)* FROM targetDataNodeId=INTEGER_VALUE ; removeDataNodeStatement diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index 2a5682926df..99e11d8116d 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -322,13 +322,13 @@ struct TReconstructRegionReq { } struct TExtendRegionReq { - 1: required i32 regionId + 1: required list<i32> regionId 2: required i32 dataNodeId 3: required common.Model model } struct TRemoveRegionReq { - 1: required i32 regionId + 1: required list<i32> regionId 2: required i32 dataNodeId 3: required common.Model model }
