This is an automated email from the ASF dual-hosted git repository.
CRZbulabula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 112bcd828d5 ConfigNode supports filter region groups via specified
database and time range (#17545)
112bcd828d5 is described below
commit 112bcd828d55f6b08645a8195b03f0d5b30c9fb2
Author: Yongzao <[email protected]>
AuthorDate: Thu Apr 23 17:25:40 2026 +0800
ConfigNode supports filter region groups via specified database and time
range (#17545)
---
.../it/partition/IoTDBPartitionGetterIT.java | 80 ++++++++++++++++++
.../consensus/request/ConfigPhysicalPlan.java | 4 +
.../consensus/request/ConfigPhysicalPlanType.java | 1 +
.../read/region/GetRegionGroupsByTimePlan.java | 98 ++++++++++++++++++++++
.../partition/GetRegionGroupsByTimeResp.java | 56 +++++++++++++
.../iotdb/confignode/manager/ConfigManager.java | 10 +++
.../apache/iotdb/confignode/manager/IManager.java | 9 ++
.../manager/partition/PartitionManager.java | 16 ++++
.../persistence/executor/ConfigPlanExecutor.java | 3 +
.../partition/DatabasePartitionTable.java | 11 +++
.../persistence/partition/PartitionInfo.java | 14 ++++
.../thrift/ConfigNodeRPCServiceProcessor.java | 7 ++
.../request/ConfigPhysicalPlanSerDeTest.java | 10 +++
.../iotdb/db/protocol/client/ConfigNodeClient.java | 9 ++
.../src/main/thrift/confignode.thrift | 14 ++++
15 files changed, 342 insertions(+)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java
index 37fec43648e..688bd682493 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java
@@ -33,6 +33,8 @@ import
org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
@@ -519,6 +521,84 @@ public class IoTDBPartitionGetterIT {
}
}
+ @Test
+ public void testGetRegionGroupsByTime() throws Exception {
+ final String sg0 = "root.sg0";
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+
+ // Get all region groups covering the full time range
+ TGetRegionGroupsByTimeReq req = new TGetRegionGroupsByTimeReq(sg0, 0L,
Long.MAX_VALUE);
+ TGetRegionGroupsByTimeResp resp = client.getRegionGroupsByTime(req);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
resp.getStatus().getCode());
+ Assert.assertNotNull(resp.getRegionReplicaSets());
+ Assert.assertFalse(resp.getRegionReplicaSets().isEmpty());
+ int allRegionGroupCount = resp.getRegionReplicaSetsSize();
+
+ // Each replica set should have testReplicationFactor replicas
+ resp.getRegionReplicaSets()
+ .forEach(
+ replicaSet -> {
+ Assert.assertEquals(testReplicationFactor,
replicaSet.getDataNodeLocationsSize());
+ Assert.assertEquals(
+ TConsensusGroupType.DataRegion,
replicaSet.getRegionId().getType());
+ });
+
+ // Query with a single time slot range should return a subset
+ TGetRegionGroupsByTimeReq singleSlotReq =
+ new TGetRegionGroupsByTimeReq(sg0, 0L, testTimePartitionInterval -
1);
+ TGetRegionGroupsByTimeResp singleSlotResp =
client.getRegionGroupsByTime(singleSlotReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
singleSlotResp.getStatus().getCode());
+ Assert.assertNotNull(singleSlotResp.getRegionReplicaSets());
+ Assert.assertFalse(singleSlotResp.getRegionReplicaSets().isEmpty());
+ Assert.assertTrue(singleSlotResp.getRegionReplicaSetsSize() <=
allRegionGroupCount);
+
+ // Query with a disjoint time range should return empty result
+ TGetRegionGroupsByTimeReq disjointReq =
+ new TGetRegionGroupsByTimeReq(
+ sg0,
+ testTimePartitionSlotsNum * testTimePartitionInterval * 2,
+ testTimePartitionSlotsNum * testTimePartitionInterval * 3);
+ TGetRegionGroupsByTimeResp disjointResp =
client.getRegionGroupsByTime(disjointReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
disjointResp.getStatus().getCode());
+ Assert.assertTrue(
+ disjointResp.getRegionReplicaSets() == null
+ || disjointResp.getRegionReplicaSets().isEmpty());
+
+ // Query non-existent database should return empty result
+ TGetRegionGroupsByTimeReq nonExistReq =
+ new TGetRegionGroupsByTimeReq("root.nonexistent", 0L,
Long.MAX_VALUE);
+ TGetRegionGroupsByTimeResp nonExistResp =
client.getRegionGroupsByTime(nonExistReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
nonExistResp.getStatus().getCode());
+ Assert.assertTrue(
+ nonExistResp.getRegionReplicaSets() == null
+ || nonExistResp.getRegionReplicaSets().isEmpty());
+
+ // Verify consistency: union of per-slot queries should equal full-range
query
+ Set<TConsensusGroupId> unionRegionIds = new HashSet<>();
+ for (long t = 0; t < testTimePartitionSlotsNum; t++) {
+ TGetRegionGroupsByTimeReq perSlotReq =
+ new TGetRegionGroupsByTimeReq(
+ sg0, t * testTimePartitionInterval, t *
testTimePartitionInterval);
+ TGetRegionGroupsByTimeResp perSlotResp =
client.getRegionGroupsByTime(perSlotReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
perSlotResp.getStatus().getCode());
+ if (perSlotResp.getRegionReplicaSets() != null) {
+ perSlotResp
+ .getRegionReplicaSets()
+ .forEach(replicaSet ->
unionRegionIds.add(replicaSet.getRegionId()));
+ }
+ }
+ Set<TConsensusGroupId> allRegionIds = new HashSet<>();
+ resp.getRegionReplicaSets().forEach(replicaSet ->
allRegionIds.add(replicaSet.getRegionId()));
+ Assert.assertEquals(allRegionIds, unionRegionIds);
+ }
+ }
+
@Test
public void testGetSchemaNodeManagementPartition() throws Exception {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index ffe333b56dd..275a0a50261 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.consensus.request;
import
org.apache.iotdb.commons.exception.runtime.SerializationRunTimeException;
import
org.apache.iotdb.confignode.consensus.request.read.ainode.GetAINodeConfigurationPlan;
+import
org.apache.iotdb.confignode.consensus.request.read.region.GetRegionGroupsByTimePlan;
import
org.apache.iotdb.confignode.consensus.request.read.subscription.ShowTopicPlan;
import
org.apache.iotdb.confignode.consensus.request.write.ainode.RegisterAINodePlan;
import
org.apache.iotdb.confignode.consensus.request.write.ainode.RemoveAINodePlan;
@@ -248,6 +249,9 @@ public abstract class ConfigPhysicalPlan implements
IConsensusRequest {
case RemoveRegionLocation:
plan = new RemoveRegionLocationPlan();
break;
+ case GetRegionGroupsByTime:
+ plan = new GetRegionGroupsByTimePlan();
+ break;
case OfferRegionMaintainTasks:
plan = new OfferRegionMaintainTasksPlan();
break;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index fe04b93d9ad..3eee7caf37a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -71,6 +71,7 @@ public enum ConfigPhysicalPlanType {
CountTimeSlotList((short) 310),
AddRegionLocation((short) 311),
RemoveRegionLocation((short) 312),
+ GetRegionGroupsByTime((short) 313),
/** Partition. */
GetSchemaPartition((short) 400),
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/region/GetRegionGroupsByTimePlan.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/region/GetRegionGroupsByTimePlan.java
new file mode 100644
index 00000000000..d946022771d
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/region/GetRegionGroupsByTimePlan.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.read.region;
+
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
+import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import
org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class GetRegionGroupsByTimePlan extends ConfigPhysicalReadPlan {
+
+ private String database;
+
+ private TTimePartitionSlot startTimeSlot;
+
+ private TTimePartitionSlot endTimeSlot;
+
+ public GetRegionGroupsByTimePlan() {
+ super(ConfigPhysicalPlanType.GetRegionGroupsByTime);
+ }
+
+ public GetRegionGroupsByTimePlan(
+ final String database, final long startTime, final long endTime) {
+ super(ConfigPhysicalPlanType.GetRegionGroupsByTime);
+ this.database = database;
+ this.startTimeSlot = TimePartitionUtils.getTimePartitionSlot(startTime);
+ this.endTimeSlot = TimePartitionUtils.getTimePartitionSlot(endTime);
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public TTimePartitionSlot getStartTimeSlot() {
+ return startTimeSlot;
+ }
+
+ public TTimePartitionSlot getEndTimeSlot() {
+ return endTimeSlot;
+ }
+
+ @Override
+ protected void serializeImpl(final DataOutputStream stream) throws
IOException {
+ stream.writeShort(getType().getPlanType());
+ BasicStructureSerDeUtil.write(database, stream);
+ stream.writeLong(startTimeSlot.getStartTime());
+ stream.writeLong(endTimeSlot.getStartTime());
+ }
+
+ @Override
+ protected void deserializeImpl(final ByteBuffer buffer) throws IOException {
+ database = BasicStructureSerDeUtil.readString(buffer);
+ startTimeSlot = new TTimePartitionSlot(buffer.getLong());
+ endTimeSlot = new TTimePartitionSlot(buffer.getLong());
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final GetRegionGroupsByTimePlan that = (GetRegionGroupsByTimePlan) o;
+ return Objects.equals(database, that.database)
+ && Objects.equals(startTimeSlot, that.startTimeSlot)
+ && Objects.equals(endTimeSlot, that.endTimeSlot);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(database, startTimeSlot, endTimeSlot);
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/partition/GetRegionGroupsByTimeResp.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/partition/GetRegionGroupsByTimeResp.java
new file mode 100644
index 00000000000..3166ec30df1
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/partition/GetRegionGroupsByTimeResp.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.response.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeResp;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import java.util.Set;
+
+public class GetRegionGroupsByTimeResp implements DataSet {
+
+ private final TSStatus status;
+
+ private final Set<TRegionReplicaSet> regionReplicaSets;
+
+ public GetRegionGroupsByTimeResp(
+ final TSStatus status, final Set<TRegionReplicaSet> regionReplicaSets) {
+ this.status = status;
+ this.regionReplicaSets = regionReplicaSets;
+ }
+
+ public TSStatus getStatus() {
+ return status;
+ }
+
+ public TGetRegionGroupsByTimeResp convertToRpcResp() {
+ TGetRegionGroupsByTimeResp resp = new TGetRegionGroupsByTimeResp();
+ resp.setStatus(status);
+
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ resp.setRegionReplicaSets(regionReplicaSets);
+ }
+
+ return resp;
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 182dc2f9fb2..b13acd8d958 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -199,6 +199,8 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
@@ -2569,6 +2571,14 @@ public class ConfigManager implements IManager {
: new TGetSeriesSlotListResp(status);
}
+ @Override
+ public TGetRegionGroupsByTimeResp
getRegionGroupsByTime(TGetRegionGroupsByTimeReq req) {
+ TSStatus status = confirmLeader();
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ ? partitionManager.getRegionGroupsByTime(req).convertToRpcResp()
+ : new TGetRegionGroupsByTimeResp(status);
+ }
+
@Override
public TSStatus migrateRegion(TMigrateRegionReq req) {
TSStatus status = confirmLeader();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 02c82164595..50601dff9ac 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -118,6 +118,8 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
@@ -854,6 +856,13 @@ public interface IManager {
*/
TGetSeriesSlotListResp getSeriesSlotList(TGetSeriesSlotListReq req);
+ /**
+ * Get DataRegion groups that overlap a time range for the given database.
+ *
+ * @return TGetRegionGroupsByTimeResp.
+ */
+ TGetRegionGroupsByTimeResp getRegionGroupsByTime(TGetRegionGroupsByTimeReq
req);
+
TSStatus migrateRegion(TMigrateRegionReq req);
TSStatus reconstructRegion(TReconstructRegionReq req);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 576d805c786..607d670006c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -49,6 +49,7 @@ import
org.apache.iotdb.confignode.consensus.request.read.partition.GetOrCreateS
import
org.apache.iotdb.confignode.consensus.request.read.partition.GetSchemaPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.read.partition.GetSeriesSlotListPlan;
import
org.apache.iotdb.confignode.consensus.request.read.partition.GetTimeSlotListPlan;
+import
org.apache.iotdb.confignode.consensus.request.read.region.GetRegionGroupsByTimePlan;
import
org.apache.iotdb.confignode.consensus.request.read.region.GetRegionIdPlan;
import
org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan;
import
org.apache.iotdb.confignode.consensus.request.write.database.PreDeleteDatabasePlan;
@@ -60,6 +61,7 @@ import
org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGr
import
org.apache.iotdb.confignode.consensus.request.write.region.PollSpecificRegionMaintainTaskPlan;
import
org.apache.iotdb.confignode.consensus.response.partition.CountTimeSlotListResp;
import
org.apache.iotdb.confignode.consensus.response.partition.DataPartitionResp;
+import
org.apache.iotdb.confignode.consensus.response.partition.GetRegionGroupsByTimeResp;
import
org.apache.iotdb.confignode.consensus.response.partition.GetRegionIdResp;
import
org.apache.iotdb.confignode.consensus.response.partition.GetSeriesSlotListResp;
import
org.apache.iotdb.confignode.consensus.response.partition.GetTimeSlotListResp;
@@ -82,6 +84,7 @@ import
org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDelete
import
org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainTask;
import
org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainType;
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
@@ -1180,6 +1183,19 @@ public class PartitionManager {
}
}
+ public GetRegionGroupsByTimeResp getRegionGroupsByTime(final
TGetRegionGroupsByTimeReq req) {
+ final GetRegionGroupsByTimePlan plan =
+ new GetRegionGroupsByTimePlan(req.getDatabase(), req.getStartTime(),
req.getEndTime());
+ try {
+ return (GetRegionGroupsByTimeResp) getConsensusManager().read(plan);
+ } catch (final ConsensusException e) {
+ LOGGER.warn(CONSENSUS_READ_ERROR, e);
+ final TSStatus res = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+ res.setMessage(e.getMessage());
+ return new GetRegionGroupsByTimeResp(res, Collections.emptySet());
+ }
+ }
+
public GetTimeSlotListResp getTimeSlotList(TGetTimeSlotListReq req) {
long startTime = req.isSetStartTime() ? req.getStartTime() :
Long.MIN_VALUE;
long endTime = req.isSetEndTime() ? req.getEndTime() : Long.MAX_VALUE;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 6c9351e881a..7253eae3279 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -43,6 +43,7 @@ import
org.apache.iotdb.confignode.consensus.request.read.partition.GetSchemaPar
import
org.apache.iotdb.confignode.consensus.request.read.partition.GetSeriesSlotListPlan;
import
org.apache.iotdb.confignode.consensus.request.read.partition.GetTimeSlotListPlan;
import
org.apache.iotdb.confignode.consensus.request.read.pipe.plugin.GetPipePluginJarPlan;
+import
org.apache.iotdb.confignode.consensus.request.read.region.GetRegionGroupsByTimePlan;
import
org.apache.iotdb.confignode.consensus.request.read.region.GetRegionIdPlan;
import
org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan;
import org.apache.iotdb.confignode.consensus.request.read.table.DescTablePlan;
@@ -355,6 +356,8 @@ public class ConfigPlanExecutor {
return partitionInfo.countTimeSlotList((CountTimeSlotListPlan) req);
case GetSeriesSlotList:
return partitionInfo.getSeriesSlotList((GetSeriesSlotListPlan) req);
+ case GetRegionGroupsByTime:
+ return partitionInfo.getRegionGroupsByTime((GetRegionGroupsByTimePlan)
req);
case SHOW_CQ:
return cqInfo.showCQ();
case ShowExternalService:
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
index 1d2d776c69b..5a80364f033 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
@@ -501,6 +501,17 @@ public class DatabasePartitionTable {
}
}
+ public Set<TRegionReplicaSet> getRegionGroupsByTime(
+ TTimePartitionSlot startTimeSlot, TTimePartitionSlot endTimeSlot) {
+ List<TConsensusGroupId> regionIds =
+ dataPartitionTable.getRegionId(new TSeriesPartitionSlot(-1),
startTimeSlot, endTimeSlot);
+ return regionIds.stream()
+ .distinct()
+ .filter(regionGroupMap::containsKey)
+ .map(id -> regionGroupMap.get(id).getReplicaSet())
+ .collect(Collectors.toSet());
+ }
+
public List<TTimePartitionSlot> getTimeSlotList(
TSeriesPartitionSlot seriesSlotId, TConsensusGroupId regionId, long
startTime, long endTime) {
return dataPartitionTable.getTimeSlotList(seriesSlotId, regionId,
startTime, endTime);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index b907527416b..5c2c93daeab 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -36,6 +36,7 @@ import
org.apache.iotdb.confignode.consensus.request.read.partition.GetDataParti
import
org.apache.iotdb.confignode.consensus.request.read.partition.GetSchemaPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.read.partition.GetSeriesSlotListPlan;
import
org.apache.iotdb.confignode.consensus.request.read.partition.GetTimeSlotListPlan;
+import
org.apache.iotdb.confignode.consensus.request.read.region.GetRegionGroupsByTimePlan;
import
org.apache.iotdb.confignode.consensus.request.read.region.GetRegionIdPlan;
import
org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan;
import
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
@@ -53,6 +54,7 @@ import
org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMai
import
org.apache.iotdb.confignode.consensus.request.write.region.PollSpecificRegionMaintainTaskPlan;
import
org.apache.iotdb.confignode.consensus.response.partition.CountTimeSlotListResp;
import
org.apache.iotdb.confignode.consensus.response.partition.DataPartitionResp;
+import
org.apache.iotdb.confignode.consensus.response.partition.GetRegionGroupsByTimeResp;
import
org.apache.iotdb.confignode.consensus.response.partition.GetRegionIdResp;
import
org.apache.iotdb.confignode.consensus.response.partition.GetSeriesSlotListResp;
import
org.apache.iotdb.confignode.consensus.response.partition.GetTimeSlotListResp;
@@ -1105,6 +1107,18 @@ public class PartitionInfo implements SnapshotProcessor {
.collect(Collectors.toList()));
}
+ public DataSet getRegionGroupsByTime(GetRegionGroupsByTimePlan plan) {
+ if (!isDatabaseExisted(plan.getDatabase())) {
+ return new GetRegionGroupsByTimeResp(
+ new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), new
HashSet<>());
+ }
+ DatabasePartitionTable databasePartitionTable =
databasePartitionTables.get(plan.getDatabase());
+ return new GetRegionGroupsByTimeResp(
+ new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
+ databasePartitionTable.getRegionGroupsByTime(
+ plan.getStartTimeSlot(), plan.getEndTimeSlot()));
+ }
+
/**
* Get the timePartition of the specific Database or seriesSlotId(device) or
regionId.
*
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 4d01f3770c2..52f330bcdce 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -167,6 +167,8 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
@@ -1333,6 +1335,11 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
return configManager.getSeriesSlotList(req);
}
+ @Override
+ public TGetRegionGroupsByTimeResp
getRegionGroupsByTime(TGetRegionGroupsByTimeReq req) {
+ return configManager.getRegionGroupsByTime(req);
+ }
+
@Override
public TSStatus migrateRegion(TMigrateRegionReq req) {
return configManager.migrateRegion(req);
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index f203bd5d77f..5ff3fb1f33b 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -66,6 +66,7 @@ import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.commons.udf.UDFInformation;
import org.apache.iotdb.commons.udf.UDFType;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import
org.apache.iotdb.confignode.consensus.request.read.region.GetRegionGroupsByTimePlan;
import org.apache.iotdb.confignode.consensus.request.write.auth.AuthorPlan;
import
org.apache.iotdb.confignode.consensus.request.write.auth.AuthorRelationalPlan;
import org.apache.iotdb.confignode.consensus.request.write.auth.AuthorTreePlan;
@@ -2062,4 +2063,13 @@ public class ConfigPhysicalPlanSerDeTest {
(RemoveRegionLocationPlan)
ConfigPhysicalPlan.Factory.create(plan.serializeToByteBuffer());
Assert.assertEquals(plan, dePlan);
}
+
+ @Test
+ public void GetRegionGroupsByTimePlanTest() throws IOException {
+ GetRegionGroupsByTimePlan plan0 = new
GetRegionGroupsByTimePlan("root.sg0", 0L, 604800000L);
+ GetRegionGroupsByTimePlan plan1 =
+ (GetRegionGroupsByTimePlan)
+ ConfigPhysicalPlan.Factory.create(plan0.serializeToByteBuffer());
+ Assert.assertEquals(plan0, plan1);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index e2c04caedfb..e0001c448bb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -125,6 +125,8 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
@@ -1302,6 +1304,13 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
() -> client.getSeriesSlotList(req), resp ->
!updateConfigNodeLeader(resp.status));
}
+ @Override
+ public TGetRegionGroupsByTimeResp
getRegionGroupsByTime(TGetRegionGroupsByTimeReq req)
+ throws TException {
+ return executeRemoteCallWithRetry(
+ () -> client.getRegionGroupsByTime(req), resp ->
!updateConfigNodeLeader(resp.status));
+ }
+
@Override
public TSStatus migrateRegion(TMigrateRegionReq req) throws TException {
return executeRemoteCallWithRetry(
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 92312ee81a3..b1af203f4a1 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -313,6 +313,17 @@ struct TCountTimeSlotListResp {
2: optional i64 count
}
+struct TGetRegionGroupsByTimeReq {
+ 1: required string database
+ 2: required i64 startTime
+ 3: required i64 endTime
+}
+
+struct TGetRegionGroupsByTimeResp {
+ 1: required common.TSStatus status
+ 2: optional set<common.TRegionReplicaSet> regionReplicaSets
+}
+
struct TGetSeriesSlotListReq {
1: required string database
2: required common.TConsensusGroupType type
@@ -1971,6 +1982,9 @@ service IConfigNodeRPCService {
/** Get the given database's assigned SeriesSlots */
TGetSeriesSlotListResp getSeriesSlotList(TGetSeriesSlotListReq req)
+ /** Get a database's DataRegion groups that overlap a time range */
+ TGetRegionGroupsByTimeResp getRegionGroupsByTime(TGetRegionGroupsByTimeReq
req)
+
// ====================================================
// CQ
// ====================================================