This is an automated email from the ASF dual-hosted git repository.
CRZbulabula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a690f627f15 Balance per-database replicas in PGP and Greedy region
allocators (#17714)
a690f627f15 is described below
commit a690f627f158b597148a3d1900722345d9c91e73
Author: Yongzao <[email protected]>
AuthorDate: Tue May 19 15:56:24 2026 +0800
Balance per-database replicas in PGP and Greedy region allocators (#17714)
---
.../it/env/cluster/config/MppCommonConfig.java | 6 +
.../env/cluster/config/MppSharedCommonConfig.java | 7 +
.../it/env/remote/config/RemoteCommonConfig.java | 5 +
.../org/apache/iotdb/itbase/env/CommonConfig.java | 2 +
.../IoTDBPerDatabaseRegionGroupAllocationIT.java | 192 +++++++++++++++++
.../manager/load/balancer/RegionBalancer.java | 4 +-
.../region/GreedyRegionGroupAllocator.java | 43 +++-
.../PartiteGraphPlacementRegionGroupAllocator.java | 159 +++++++++++---
.../procedure/env/RemoveDataNodeHandler.java | 2 +-
.../region/GreedyRegionGroupAllocatorTest.java | 76 +++++++
...titeGraphPlacementRegionGroupAllocatorTest.java | 231 +++++++++++++++++++++
.../conf/iotdb-system.properties.template | 9 +
12 files changed, 696 insertions(+), 40 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index 1578e8a3fbc..4852b9d116e 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -203,6 +203,12 @@ public class MppCommonConfig extends MppBaseConfig
implements CommonConfig {
return this;
}
+ @Override
+ public CommonConfig setRegionGroupAllocatePolicy(String
regionGroupAllocatePolicy) {
+ setProperty("region_group_allocate_policy", regionGroupAllocatePolicy);
+ return this;
+ }
+
@Override
public CommonConfig setSchemaRegionGroupExtensionPolicy(String
schemaRegionGroupExtensionPolicy) {
setProperty("schema_region_group_extension_policy",
schemaRegionGroupExtensionPolicy);
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index c7d8becb373..582c9a049e4 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -202,6 +202,13 @@ public class MppSharedCommonConfig implements CommonConfig
{
return this;
}
+ @Override
+ public CommonConfig setRegionGroupAllocatePolicy(String
regionGroupAllocatePolicy) {
+ cnConfig.setRegionGroupAllocatePolicy(regionGroupAllocatePolicy);
+ dnConfig.setRegionGroupAllocatePolicy(regionGroupAllocatePolicy);
+ return this;
+ }
+
@Override
public CommonConfig setSchemaRegionGroupExtensionPolicy(String
schemaRegionGroupExtensionPolicy) {
cnConfig.setSchemaRegionGroupExtensionPolicy(schemaRegionGroupExtensionPolicy);
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index 4e0b0d2d727..48c157e957b 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -145,6 +145,11 @@ public class RemoteCommonConfig implements CommonConfig {
return this;
}
+ @Override
+ public CommonConfig setRegionGroupAllocatePolicy(String
regionGroupAllocatePolicy) {
+ return this;
+ }
+
@Override
public CommonConfig setSchemaRegionGroupExtensionPolicy(String
schemaRegionGroupExtensionPolicy) {
return this;
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index fb4c4f56ba5..dc21234e2ba 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -72,6 +72,8 @@ public interface CommonConfig {
CommonConfig setIoTConsensusV2Mode(String iotConsensusV2Mode);
+ CommonConfig setRegionGroupAllocatePolicy(String regionGroupAllocatePolicy);
+
CommonConfig setSchemaRegionGroupExtensionPolicy(String
schemaRegionGroupExtensionPolicy);
CommonConfig setDefaultSchemaRegionGroupNumPerDatabase(int
schemaRegionGroupPerDatabase);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBPerDatabaseRegionGroupAllocationIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBPerDatabaseRegionGroupAllocationIT.java
new file mode 100644
index 00000000000..94b67fa8f53
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBPerDatabaseRegionGroupAllocationIT.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.load;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
+import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
+import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/**
+ * Verifies that DataRegion allocators distribute each database's replicas
evenly across DataNodes.
+ * Regression for the case where {@code
PartiteGraphPlacementRegionGroupAllocator} ignored {@code
+ * databaseAllocatedRegionGroups} and ended up with one DataNode holding many
replicas of one
+ * database and few of another.
+ */
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBPerDatabaseRegionGroupAllocationIT {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(IoTDBPerDatabaseRegionGroupAllocationIT.class);
+
+ private static final int TEST_DATA_NODE_NUM = 4;
+ private static final int TEST_REPLICATION_FACTOR = 2;
+ private static final int TEST_DATABASE_NUM = 2;
+ private static final int TEST_DATA_REGION_GROUP_NUM_PER_DATABASE = 4;
+ private static final String DATABASE_PREFIX = "root.db";
+ private static final long TEST_TIME_PARTITION_INTERVAL = 604_800_000L;
+
+ private void initCluster(String regionGroupAllocatePolicy) {
+ EnvFactory.getEnv()
+ .getConfig()
+ .getCommonConfig()
+ .setRegionGroupAllocatePolicy(regionGroupAllocatePolicy)
+ .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+ .setDataReplicationFactor(TEST_REPLICATION_FACTOR)
+ .setDataRegionGroupExtensionPolicy("CUSTOM")
+
.setDefaultDataRegionGroupNumPerDatabase(TEST_DATA_REGION_GROUP_NUM_PER_DATABASE)
+ // Avoid auto leader balancing rearranging anything during the test
+ .setEnableAutoLeaderBalanceForIoTConsensus(false);
+ EnvFactory.getEnv().initClusterEnvironment(1, TEST_DATA_NODE_NUM);
+ }
+
+ @After
+ public void tearDown() {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testPgpPolicyPerDbReplicaBalance() throws Exception {
+ initCluster("PGP");
+ runPerDbBalanceCheck("PGP");
+ }
+
+ @Test
+ public void testGcrPolicyPerDbReplicaBalance() throws Exception {
+ initCluster("GCR");
+ runPerDbBalanceCheck("GCR");
+ }
+
+ @Test
+ public void testGreedyPolicyPerDbReplicaBalance() throws Exception {
+ initCluster("GREEDY");
+ runPerDbBalanceCheck("GREEDY");
+ }
+
+ private void runPerDbBalanceCheck(String policy) throws Exception {
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ // Create databases and trigger DataRegion materialization
+ for (int i = 0; i < TEST_DATABASE_NUM; i++) {
+ String currentDatabase = DATABASE_PREFIX + i;
+ TSStatus status = client.setDatabase(new
TDatabaseSchema(currentDatabase));
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>>
partitionSlotsMap =
+ ConfigNodeTestUtils.constructPartitionSlotsMap(
+ currentDatabase,
+ 0,
+ TEST_DATA_REGION_GROUP_NUM_PER_DATABASE,
+ 0,
+ 1,
+ TEST_TIME_PARTITION_INTERVAL);
+ TDataPartitionTableResp dataPartitionTableResp =
+ client.getOrCreateDataPartitionTable(new
TDataPartitionReq(partitionSlotsMap));
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ dataPartitionTableResp.getStatus().getCode());
+ }
+
+ // Collect DataRegion replicas grouped by database and DataNode
+ TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq());
+ List<TRegionInfo> dataRegionInfoList =
+ showRegionResp.getRegionInfoList().stream()
+ .filter(r ->
!r.database.startsWith(SystemConstant.SYSTEM_DATABASE))
+ .filter(r ->
!r.database.startsWith(SystemConstant.AUDIT_DATABASE))
+ // Only consider DataRegion since SchemaRegion is allocated by a
separate path
+ .filter(r -> r.getConsensusGroupId().getType() ==
TConsensusGroupType.DataRegion)
+ .collect(Collectors.toList());
+
+ Map<String, Map<Integer, Integer>> perDbReplicaCount = new TreeMap<>();
+ Map<Integer, Integer> globalReplicaCount = new TreeMap<>();
+ for (TRegionInfo info : dataRegionInfoList) {
+ perDbReplicaCount
+ .computeIfAbsent(info.getDatabase(), k -> new TreeMap<>())
+ .merge(info.getDataNodeId(), 1, Integer::sum);
+ globalReplicaCount.merge(info.getDataNodeId(), 1, Integer::sum);
+ }
+
+ int expectedPerDbPerDn =
+ TEST_DATA_REGION_GROUP_NUM_PER_DATABASE * TEST_REPLICATION_FACTOR /
TEST_DATA_NODE_NUM;
+ int expectedGlobalPerDn =
+ TEST_DATABASE_NUM
+ * TEST_DATA_REGION_GROUP_NUM_PER_DATABASE
+ * TEST_REPLICATION_FACTOR
+ / TEST_DATA_NODE_NUM;
+
+ for (int i = 0; i < TEST_DATABASE_NUM; i++) {
+ String currentDatabase = DATABASE_PREFIX + i;
+ Map<Integer, Integer> dnReplicaCount =
perDbReplicaCount.get(currentDatabase);
+ Assert.assertNotNull("No DataRegion replicas found for " +
currentDatabase, dnReplicaCount);
+ LOGGER.info("[{}] db {} replicas per DN: {}", policy, currentDatabase,
dnReplicaCount);
+ Assert.assertEquals(
+ "policy=" + policy + " db=" + currentDatabase + " should cover all
DataNodes",
+ TEST_DATA_NODE_NUM,
+ dnReplicaCount.size());
+ for (Map.Entry<Integer, Integer> entry : dnReplicaCount.entrySet()) {
+ Assert.assertEquals(
+ "policy=" + policy + " db=" + currentDatabase + " dn=" +
entry.getKey(),
+ expectedPerDbPerDn,
+ (int) entry.getValue());
+ }
+ }
+
+ LOGGER.info("[{}] global replicas per DN: {}", policy,
globalReplicaCount);
+ Assert.assertEquals(
+ "policy=" + policy + " global allocation should cover all DataNodes",
+ TEST_DATA_NODE_NUM,
+ globalReplicaCount.size());
+ for (Map.Entry<Integer, Integer> entry : globalReplicaCount.entrySet()) {
+ Assert.assertEquals(
+ "policy=" + policy + " global dn=" + entry.getKey(),
+ expectedGlobalPerDn,
+ (int) entry.getValue());
+ }
+ }
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index 10864e9fba9..1075fc1573d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -58,7 +58,7 @@ public class RegionBalancer {
case GREEDY:
this.regionGroupAllocator = new GreedyRegionGroupAllocator();
break;
- case PGR:
+ case PGP:
this.regionGroupAllocator = new
PartiteGraphPlacementRegionGroupAllocator();
break;
case GCR:
@@ -160,6 +160,6 @@ public class RegionBalancer {
public enum RegionGroupAllocatePolicy {
GREEDY,
GCR,
- PGR
+ PGP
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
index 33859233d90..4e940795418 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
@@ -45,12 +45,15 @@ public class GreedyRegionGroupAllocator implements
IRegionGroupAllocator {
public int dataNodeId;
public int regionCount;
+ public int databaseRegionCount;
public double freeDiskSpace;
public int randomWeight;
- public DataNodeEntry(int dataNodeId, int regionCount, double
freeDiskSpace) {
+ public DataNodeEntry(
+ int dataNodeId, int regionCount, int databaseRegionCount, double
freeDiskSpace) {
this.dataNodeId = dataNodeId;
this.regionCount = regionCount;
+ this.databaseRegionCount = databaseRegionCount;
this.freeDiskSpace = freeDiskSpace;
this.randomWeight = RANDOM.nextInt();
}
@@ -58,12 +61,15 @@ public class GreedyRegionGroupAllocator implements
IRegionGroupAllocator {
@Override
public int compareTo(DataNodeEntry other) {
if (this.regionCount != other.regionCount) {
- return this.regionCount - other.regionCount;
- } else if (this.freeDiskSpace != other.freeDiskSpace) {
- return (int) (other.freeDiskSpace - this.freeDiskSpace);
- } else {
- return this.randomWeight - other.randomWeight;
+ return Integer.compare(this.regionCount, other.regionCount);
}
+ if (this.databaseRegionCount != other.databaseRegionCount) {
+ return Integer.compare(this.databaseRegionCount,
other.databaseRegionCount);
+ }
+ if (this.freeDiskSpace != other.freeDiskSpace) {
+ return Double.compare(other.freeDiskSpace, this.freeDiskSpace);
+ }
+ return Integer.compare(this.randomWeight, other.randomWeight);
}
}
@@ -75,9 +81,13 @@ public class GreedyRegionGroupAllocator implements
IRegionGroupAllocator {
List<TRegionReplicaSet> databaseAllocatedRegionGroups,
int replicationFactor,
TConsensusGroupId consensusGroupId) {
- // Build weightList order by number of regions allocated asc
+ // Build weightList ordered by (regionCount asc, databaseRegionCount asc,
freeDiskSpace desc)
List<TDataNodeLocation> weightList =
- buildWeightList(availableDataNodeMap, freeDiskSpaceMap,
allocatedRegionGroups);
+ buildWeightList(
+ availableDataNodeMap,
+ freeDiskSpaceMap,
+ allocatedRegionGroups,
+ databaseAllocatedRegionGroups);
return new TRegionReplicaSet(
consensusGroupId,
weightList.stream().limit(replicationFactor).collect(Collectors.toList()));
@@ -99,7 +109,8 @@ public class GreedyRegionGroupAllocator implements
IRegionGroupAllocator {
private List<TDataNodeLocation> buildWeightList(
Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
Map<Integer, Double> freeDiskSpaceMap,
- List<TRegionReplicaSet> allocatedRegionGroups) {
+ List<TRegionReplicaSet> allocatedRegionGroups,
+ List<TRegionReplicaSet> databaseAllocatedRegionGroups) {
// Map<DataNodeId, Region count>
Map<Integer, Integer> regionCounter = new
HashMap<>(availableDataNodeMap.size());
@@ -111,6 +122,19 @@ public class GreedyRegionGroupAllocator implements
IRegionGroupAllocator {
dataNodeLocation ->
regionCounter.merge(dataNodeLocation.getDataNodeId(),
1, Integer::sum)));
+ // Map<DataNodeId, Region count within the same Database>
+ Map<Integer, Integer> databaseRegionCounter = new
HashMap<>(availableDataNodeMap.size());
+ if (databaseAllocatedRegionGroups != null) {
+ databaseAllocatedRegionGroups.forEach(
+ regionReplicaSet ->
+ regionReplicaSet
+ .getDataNodeLocations()
+ .forEach(
+ dataNodeLocation ->
+ databaseRegionCounter.merge(
+ dataNodeLocation.getDataNodeId(), 1,
Integer::sum)));
+ }
+
/* Construct priority map */
List<DataNodeEntry> entryList = new ArrayList<>();
availableDataNodeMap.forEach(
@@ -119,6 +143,7 @@ public class GreedyRegionGroupAllocator implements
IRegionGroupAllocator {
new DataNodeEntry(
datanodeId,
regionCounter.getOrDefault(datanodeId, 0),
+ databaseRegionCounter.getOrDefault(datanodeId, 0),
freeDiskSpaceMap.getOrDefault(datanodeId, 0d))));
// Sort weightList
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphPlacementRegionGroupAllocator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphPlacementRegionGroupAllocator.java
index f7be6554776..d036c6eda0a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphPlacementRegionGroupAllocator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphPlacementRegionGroupAllocator.java
@@ -26,10 +26,8 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.i18n.ManagerMessages;
-import
org.apache.iotdb.confignode.manager.load.balancer.region.GreedyRegionGroupAllocator.DataNodeEntry;
-
-import org.apache.tsfile.utils.Pair;
+import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -40,6 +38,7 @@ import java.util.stream.Collectors;
public class PartiteGraphPlacementRegionGroupAllocator implements
IRegionGroupAllocator {
+ private static final SecureRandom RANDOM = new SecureRandom();
private static final GreedyRegionGroupAllocator GREEDY_ALLOCATOR =
new GreedyRegionGroupAllocator();
@@ -48,15 +47,17 @@ public class PartiteGraphPlacementRegionGroupAllocator
implements IRegionGroupAl
private int regionPerDataNode;
private int dataNodeNum;
- // The number of allocated Regions in each DataNode
+ // The number of allocated Regions in each DataNode (fake-id indexed)
private int[] regionCounter;
- // The number of edges in current cluster
+ // The number of allocated Regions in each DataNode within the same Database
(fake-id indexed)
+ private int[] databaseRegionCounter;
+ // Whether there exists a region with both i and j as replicas (fake-id
indexed, binary)
private int[][] combinationCounter;
private Map<Integer, Integer> fakeToRealIdMap;
private int alphaDataNodeNum;
- // Pair<combinationSum, RegionSum>
- Pair<Integer, Integer> bestValue;
+ // The best valuation found so far
+ private Value bestValue;
private int[] bestAlphaNodes;
@Override
@@ -71,13 +72,17 @@ public class PartiteGraphPlacementRegionGroupAllocator
implements IRegionGroupAl
consensusGroupId.getType().equals(TConsensusGroupType.DataRegion)
?
ConfigNodeDescriptor.getInstance().getConf().getDataRegionPerDataNode()
:
ConfigNodeDescriptor.getInstance().getConf().getSchemaRegionPerDataNode();
- prepare(replicationFactor, availableDataNodeMap, allocatedRegionGroups);
+ prepare(
+ replicationFactor,
+ availableDataNodeMap,
+ allocatedRegionGroups,
+ databaseAllocatedRegionGroups);
// Select alpha nodes set
for (int i = 0; i < subGraphCount; i++) {
subGraphSearch(i, freeDiskSpaceMap);
}
- if (bestValue.left == Integer.MAX_VALUE) {
+ if (bestValue.regionSum == Integer.MAX_VALUE) {
// Use greedy allocator as alternative if no alpha nodes set is found
return GREEDY_ALLOCATOR.generateOptimalRegionReplicasDistribution(
availableDataNodeMap,
@@ -132,7 +137,8 @@ public class PartiteGraphPlacementRegionGroupAllocator
implements IRegionGroupAl
private void prepare(
int replicationFactor,
Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
- List<TRegionReplicaSet> allocatedRegionGroups) {
+ List<TRegionReplicaSet> allocatedRegionGroups,
+ List<TRegionReplicaSet> databaseAllocatedRegionGroups) {
this.subGraphCount = replicationFactor / 2 + (replicationFactor % 2 == 0 ?
0 : 1);
this.replicationFactor = replicationFactor;
@@ -150,9 +156,11 @@ public class PartiteGraphPlacementRegionGroupAllocator
implements IRegionGroupAl
realToFakeIdMap.put(dataNodeIdList.get(i), i);
}
- // Compute regionCounter and combinationCounter
+ // Compute regionCounter, databaseRegionCounter and combinationCounter
this.regionCounter = new int[dataNodeNum];
Arrays.fill(regionCounter, 0);
+ this.databaseRegionCounter = new int[dataNodeNum];
+ Arrays.fill(databaseRegionCounter, 0);
this.combinationCounter = new int[dataNodeNum][dataNodeNum];
for (int i = 0; i < dataNodeNum; i++) {
Arrays.fill(combinationCounter[i], 0);
@@ -160,42 +168,64 @@ public class PartiteGraphPlacementRegionGroupAllocator
implements IRegionGroupAl
for (TRegionReplicaSet regionReplicaSet : allocatedRegionGroups) {
List<TDataNodeLocation> dataNodeLocations =
regionReplicaSet.getDataNodeLocations();
for (int i = 0; i < dataNodeLocations.size(); i++) {
- int fakeIId =
realToFakeIdMap.get(dataNodeLocations.get(i).getDataNodeId());
+ Integer fakeIId =
realToFakeIdMap.get(dataNodeLocations.get(i).getDataNodeId());
+ if (fakeIId == null) {
+ // Skip nodes that are no longer available
+ continue;
+ }
regionCounter[fakeIId]++;
for (int j = i + 1; j < dataNodeLocations.size(); j++) {
- int fakeJId =
realToFakeIdMap.get(dataNodeLocations.get(j).getDataNodeId());
+ Integer fakeJId =
realToFakeIdMap.get(dataNodeLocations.get(j).getDataNodeId());
+ if (fakeJId == null) {
+ continue;
+ }
combinationCounter[fakeIId][fakeJId] = 1;
combinationCounter[fakeJId][fakeIId] = 1;
}
}
}
+ if (databaseAllocatedRegionGroups != null) {
+ for (TRegionReplicaSet regionReplicaSet : databaseAllocatedRegionGroups)
{
+ for (TDataNodeLocation location :
regionReplicaSet.getDataNodeLocations()) {
+ Integer fakeId = realToFakeIdMap.get(location.getDataNodeId());
+ if (fakeId != null) {
+ databaseRegionCounter[fakeId]++;
+ }
+ }
+ }
+ }
// Reset the optimal result
this.alphaDataNodeNum = replicationFactor / 2 + 1;
- this.bestValue = new Pair<>(Integer.MAX_VALUE, Integer.MAX_VALUE);
+ this.bestValue = Value.worst();
this.bestAlphaNodes = new int[alphaDataNodeNum];
}
- private Pair<Integer, Integer> valuation(int[] nodes) {
- int edgeSum = 0;
+ private Value valuation(int[] nodes) {
int regionSum = 0;
+ int databaseRegionSum = 0;
+ int edgeSum = 0;
for (int iota : nodes) {
+ regionSum += regionCounter[iota];
+ databaseRegionSum += databaseRegionCounter[iota];
for (int kappa : nodes) {
edgeSum += combinationCounter[iota][kappa];
}
- regionSum += regionCounter[iota];
}
- return new Pair<>(edgeSum, regionSum);
+ return new Value(regionSum, databaseRegionSum, edgeSum);
}
private void subGraphSearch(int firstIndex, Map<Integer, Double>
freeDiskSpaceMap) {
- List<DataNodeEntry> entryList = new ArrayList<>();
+ List<PgpDataNodeEntry> entryList = new ArrayList<>();
for (int index = firstIndex; index < dataNodeNum; index += subGraphCount) {
// Prune: skip filled DataNodes
if (regionCounter[index] < regionPerDataNode) {
entryList.add(
- new DataNodeEntry(
- index, regionCounter[index],
freeDiskSpaceMap.get(fakeToRealIdMap.get(index))));
+ new PgpDataNodeEntry(
+ index,
+ regionCounter[index],
+ databaseRegionCounter[index],
+ freeDiskSpaceMap.get(fakeToRealIdMap.get(index))));
}
}
if (entryList.size() < alphaDataNodeNum) {
@@ -209,9 +239,8 @@ public class PartiteGraphPlacementRegionGroupAllocator
implements IRegionGroupAl
}
for (int i = alphaDataNodeNum - 1; i < entryList.size(); i++) {
alphaNodes[alphaDataNodeNum - 1] = entryList.get(i).dataNodeId;
- Pair<Integer, Integer> currentValue = valuation(alphaNodes);
- if (currentValue.left < bestValue.left
- || (currentValue.left.equals(bestValue.left) && currentValue.right <
bestValue.right)) {
+ Value currentValue = valuation(alphaNodes);
+ if (currentValue.compareTo(bestValue) < 0) {
bestValue = currentValue;
System.arraycopy(alphaNodes, 0, bestAlphaNodes, 0, alphaDataNodeNum);
}
@@ -228,16 +257,15 @@ public class PartiteGraphPlacementRegionGroupAllocator
implements IRegionGroupAl
continue;
}
int selectedDataNode = -1;
- Pair<Integer, Integer> tmpValue = new Pair<>(Integer.MAX_VALUE,
Integer.MAX_VALUE);
+ Value tmpValue = Value.worst();
for (int i = partiteIndex; i < dataNodeNum; i += subGraphCount) {
if (regionCounter[i] >= regionPerDataNode) {
// Pruning: skip filled DataNodes
continue;
}
tmpNodes[alphaDataNodeNum] = i;
- Pair<Integer, Integer> currentValue = valuation(tmpNodes);
- if (currentValue.left < tmpValue.left
- || (currentValue.left.equals(tmpValue.left) && currentValue.right
< tmpValue.right)) {
+ Value currentValue = valuation(tmpNodes);
+ if (currentValue.compareTo(tmpValue) < 0) {
tmpValue = currentValue;
selectedDataNode = i;
}
@@ -249,4 +277,79 @@ public class PartiteGraphPlacementRegionGroupAllocator
implements IRegionGroupAl
}
return betaNodes;
}
+
+ /**
+ * Valuation for a candidate alpha (or alpha ∪ beta-candidate) set. Smaller
is better. Comparison
+ * priority:
+ *
+ * <ol>
+ * <li>{@code regionSum} — global per-DN region count (balance the whole
cluster)
+ * <li>{@code databaseRegionSum} — per-(database, DN) region count
(balance each database)
+ * <li>{@code edgeSum} — 2-Region combination scatter (refines PGP's
scatter property)
+ * </ol>
+ */
+ private static class Value implements Comparable<Value> {
+
+ private final int regionSum;
+ private final int databaseRegionSum;
+ private final int edgeSum;
+
+ private Value(int regionSum, int databaseRegionSum, int edgeSum) {
+ this.regionSum = regionSum;
+ this.databaseRegionSum = databaseRegionSum;
+ this.edgeSum = edgeSum;
+ }
+
+ private static Value worst() {
+ return new Value(Integer.MAX_VALUE, Integer.MAX_VALUE,
Integer.MAX_VALUE);
+ }
+
+ @Override
+ public int compareTo(Value other) {
+ if (regionSum != other.regionSum) {
+ return Integer.compare(regionSum, other.regionSum);
+ }
+ if (databaseRegionSum != other.databaseRegionSum) {
+ return Integer.compare(databaseRegionSum, other.databaseRegionSum);
+ }
+ return Integer.compare(edgeSum, other.edgeSum);
+ }
+ }
+
+ /**
+ * Pre-sort entry for selecting alpha nodes inside a sub-graph. Sort
priority matches {@link
+ * Value}: regionCount first, databaseRegionCount second, then freeDiskSpace
(descending) and a
+ * random tie-breaker.
+ */
+ private static class PgpDataNodeEntry implements
Comparable<PgpDataNodeEntry> {
+
+ private final int dataNodeId;
+ private final int regionCount;
+ private final int databaseRegionCount;
+ private final double freeDiskSpace;
+ private final int randomWeight;
+
+ private PgpDataNodeEntry(
+ int dataNodeId, int regionCount, int databaseRegionCount, double
freeDiskSpace) {
+ this.dataNodeId = dataNodeId;
+ this.regionCount = regionCount;
+ this.databaseRegionCount = databaseRegionCount;
+ this.freeDiskSpace = freeDiskSpace;
+ this.randomWeight = RANDOM.nextInt();
+ }
+
+ @Override
+ public int compareTo(PgpDataNodeEntry other) {
+ if (this.regionCount != other.regionCount) {
+ return Integer.compare(this.regionCount, other.regionCount);
+ }
+ if (this.databaseRegionCount != other.databaseRegionCount) {
+ return Integer.compare(this.databaseRegionCount,
other.databaseRegionCount);
+ }
+ if (this.freeDiskSpace != other.freeDiskSpace) {
+ return Double.compare(other.freeDiskSpace, this.freeDiskSpace);
+ }
+ return Integer.compare(this.randomWeight, other.randomWeight);
+ }
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java
index 9b1fd5c6dbc..5b505ec001b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java
@@ -87,7 +87,7 @@ public class RemoveDataNodeHandler {
case GREEDY:
this.regionGroupAllocator = new GreedyCopySetRegionGroupAllocator();
break;
- case PGR:
+ case PGP:
this.regionGroupAllocator = new GreedyCopySetRegionGroupAllocator();
break;
case GCR:
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocatorTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocatorTest.java
index b0dd6769f2e..55111efa09f 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocatorTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocatorTest.java
@@ -29,11 +29,13 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -42,6 +44,80 @@ public class GreedyRegionGroupAllocatorTest {
private static final GreedyRegionGroupAllocator ALLOCATOR = new
GreedyRegionGroupAllocator();
private static final int TEST_REPLICATION_FACTOR = 3;
+ /**
+ * Multi-database regression: each database's replicas should be evenly
distributed across all
+ * DataNodes. With 2 databases × 30 RGs × rf 3 on 10 DNs, per-(db, DN)
replica count must be
+ * exactly 9 (60 RGs × 3 / 10 / 2 = 9).
+ */
+ @Test
+ public void testPerDatabaseBalance() {
+ int dataNodeNum = 10;
+ int databaseNum = 2;
+ int regionGroupsPerDatabase = 30;
+ int rf = 3;
+
+ Map<Integer, TDataNodeConfiguration> availableDataNodeMap = new
ConcurrentHashMap<>();
+ Map<Integer, Double> freeSpaceMap = new ConcurrentHashMap<>();
+ Random random = new Random(0);
+ for (int i = 1; i <= dataNodeNum; i++) {
+ availableDataNodeMap.put(
+ i, new TDataNodeConfiguration().setLocation(new
TDataNodeLocation().setDataNodeId(i)));
+ freeSpaceMap.put(i, random.nextDouble());
+ }
+
+ List<TRegionReplicaSet> allocatedRegionGroups = new ArrayList<>();
+ Map<Integer, List<TRegionReplicaSet>> databaseAllocatedRegionGroups = new
TreeMap<>();
+ for (int db = 0; db < databaseNum; db++) {
+ databaseAllocatedRegionGroups.put(db, new ArrayList<>());
+ }
+
+ int regionId = 0;
+ for (int round = 0; round < regionGroupsPerDatabase; round++) {
+ for (int db = 0; db < databaseNum; db++) {
+ TRegionReplicaSet newRegionGroup =
+ ALLOCATOR.generateOptimalRegionReplicasDistribution(
+ availableDataNodeMap,
+ freeSpaceMap,
+ allocatedRegionGroups,
+ databaseAllocatedRegionGroups.get(db),
+ rf,
+ new TConsensusGroupId(TConsensusGroupType.DataRegion,
regionId++));
+ allocatedRegionGroups.add(newRegionGroup);
+ databaseAllocatedRegionGroups.get(db).add(newRegionGroup);
+ }
+ }
+
+ int expectedPerDb = regionGroupsPerDatabase * rf / dataNodeNum;
+ for (int db = 0; db < databaseNum; db++) {
+ Map<Integer, Integer> perDnReplicaCount = new HashMap<>();
+ for (TRegionReplicaSet rg : databaseAllocatedRegionGroups.get(db)) {
+ for (TDataNodeLocation loc : rg.getDataNodeLocations()) {
+ perDnReplicaCount.merge(loc.getDataNodeId(), 1, Integer::sum);
+ }
+ }
+ for (int dnId = 1; dnId <= dataNodeNum; dnId++) {
+ Assert.assertEquals(
+ "db " + db + " dn " + dnId + " replica count",
+ expectedPerDb,
+ perDnReplicaCount.getOrDefault(dnId, 0).intValue());
+ }
+ }
+
+ Map<Integer, Integer> globalCount = new HashMap<>();
+ for (TRegionReplicaSet rg : allocatedRegionGroups) {
+ for (TDataNodeLocation loc : rg.getDataNodeLocations()) {
+ globalCount.merge(loc.getDataNodeId(), 1, Integer::sum);
+ }
+ }
+ int expectedGlobal = databaseNum * regionGroupsPerDatabase * rf /
dataNodeNum;
+ for (int dnId = 1; dnId <= dataNodeNum; dnId++) {
+ Assert.assertEquals(
+ "dn " + dnId + " global replica count",
+ expectedGlobal,
+ globalCount.getOrDefault(dnId, 0).intValue());
+ }
+ }
+
@Test
public void testEvenDistribution() {
/* Construct input data */
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphPlacementRegionGroupAllocatorTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphPlacementRegionGroupAllocatorTest.java
new file mode 100644
index 00000000000..df086659d7f
--- /dev/null
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphPlacementRegionGroupAllocatorTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.balancer.region;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+public class PartiteGraphPlacementRegionGroupAllocatorTest {
+
+ private static final Logger LOGGER =
+
LoggerFactory.getLogger(PartiteGraphPlacementRegionGroupAllocatorTest.class);
+
+ private static final PartiteGraphPlacementRegionGroupAllocator ALLOCATOR =
+ new PartiteGraphPlacementRegionGroupAllocator();
+
+ private static final int TEST_DATA_NODE_NUM = 20;
+ private static final Map<Integer, TDataNodeConfiguration>
AVAILABLE_DATA_NODE_MAP =
+ new HashMap<>();
+ private static final Map<Integer, Double> FREE_SPACE_MAP = new HashMap<>();
+
+ @BeforeClass
+ public static void setUp() {
+ Random random = new Random(0);
+ for (int i = 1; i <= TEST_DATA_NODE_NUM; i++) {
+ AVAILABLE_DATA_NODE_MAP.put(
+ i, new TDataNodeConfiguration().setLocation(new
TDataNodeLocation().setDataNodeId(i)));
+ FREE_SPACE_MAP.put(i, random.nextDouble());
+ }
+ }
+
+ /**
+ * Regression for the user's reported scenario: 20 DataNodes, 2 databases,
60 DataRegions per
+ * database, replication factor 3.
+ *
+ * <p>With the legacy PGP (per-db blind) implementation, the per-(database,
DataNode) replica
+ * count could swing as far as {2, ..., 15}: some DataNodes ended up holding
15 replicas of one
+ * database while only 3 of the other. The per-db-aware implementation must
distribute each
+ * database's replicas evenly across all DataNodes.
+ */
+ @Test
+ public void testTwoDatabasesPerDbBalance() {
+ final int replicationFactor = 3;
+ final int regionGroupsPerDatabase = 60;
+ final int databaseNum = 2;
+
+ List<TRegionReplicaSet> allocatedRegionGroups = new ArrayList<>();
+ Map<Integer, List<TRegionReplicaSet>> databaseAllocatedRegionGroups = new
TreeMap<>();
+ for (int db = 0; db < databaseNum; db++) {
+ databaseAllocatedRegionGroups.put(db, new ArrayList<>());
+ }
+
+ // Interleave region group creation between databases the same way IoTDB
does in practice
+ int regionId = 0;
+ for (int round = 0; round < regionGroupsPerDatabase; round++) {
+ for (int db = 0; db < databaseNum; db++) {
+ TRegionReplicaSet newRegionGroup =
+ ALLOCATOR.generateOptimalRegionReplicasDistribution(
+ AVAILABLE_DATA_NODE_MAP,
+ FREE_SPACE_MAP,
+ allocatedRegionGroups,
+ databaseAllocatedRegionGroups.get(db),
+ replicationFactor,
+ new TConsensusGroupId(TConsensusGroupType.DataRegion,
regionId++));
+ allocatedRegionGroups.add(newRegionGroup);
+ databaseAllocatedRegionGroups.get(db).add(newRegionGroup);
+ }
+ }
+
+ // Per-(db, DN) replica count must be tightly balanced
+ for (int db = 0; db < databaseNum; db++) {
+ Map<Integer, Integer> perDnReplicaCount =
+ countReplicasPerDataNode(databaseAllocatedRegionGroups.get(db));
+ int minCount = Integer.MAX_VALUE;
+ int maxCount = Integer.MIN_VALUE;
+ for (int dnId = 1; dnId <= TEST_DATA_NODE_NUM; dnId++) {
+ int c = perDnReplicaCount.getOrDefault(dnId, 0);
+ minCount = Math.min(minCount, c);
+ maxCount = Math.max(maxCount, c);
+ }
+ LOGGER.info("db {}: per-DN replica min={}, max={}", db, minCount,
maxCount);
+ // Expected ideal: each DN holds (regionGroupsPerDatabase *
replicationFactor) /
+ // TEST_DATA_NODE_NUM = 9 replicas of each db. Max - min should not
exceed 1.
+ Assert.assertTrue(
+ "Per-db replica spread too wide: max=" + maxCount + ", min=" +
minCount,
+ maxCount - minCount <= 1);
+ }
+
+ // Global per-DN replica count must also be tightly balanced
+ Map<Integer, Integer> globalCount =
countReplicasPerDataNode(allocatedRegionGroups);
+ int globalMin = Integer.MAX_VALUE;
+ int globalMax = Integer.MIN_VALUE;
+ for (int dnId = 1; dnId <= TEST_DATA_NODE_NUM; dnId++) {
+ int c = globalCount.getOrDefault(dnId, 0);
+ globalMin = Math.min(globalMin, c);
+ globalMax = Math.max(globalMax, c);
+ }
+ LOGGER.info("global per-DN replica min={}, max={}", globalMin, globalMax);
+ Assert.assertTrue(
+ "Global replica spread too wide: max=" + globalMax + ", min=" +
globalMin,
+ globalMax - globalMin <= 1);
+ }
+
+ /** Multi-database (4 dbs) interleaved allocation with replication factor 3.
*/
+ @Test
+ public void testMultiDatabasePerDbBalance() {
+ // 40 RGs per db × rf 3 = 120 replicas per db → 6 per DN (integer avg)
+ runMultiDatabaseTest(4, 3, 40);
+ }
+
+ /** Multi-database (3 dbs) interleaved allocation with replication factor 2.
*/
+ @Test
+ public void testTwoFactorMultiDatabaseBalance() {
+ // 40 RGs per db × rf 2 = 80 replicas per db → 4 per DN (integer avg)
+ runMultiDatabaseTest(3, 2, 40);
+ }
+
+ /** Multi-database (3 dbs) interleaved allocation with replication factor 5.
*/
+ @Test
+ public void testFiveFactorMultiDatabaseBalance() {
+ // 20 RGs per db × rf 5 = 100 replicas per db → 5 per DN (integer avg)
+ runMultiDatabaseTest(3, 5, 20);
+ }
+
+ private void runMultiDatabaseTest(
+ int databaseNum, int replicationFactor, int regionGroupsPerDatabase) {
+ List<TRegionReplicaSet> allocatedRegionGroups = new ArrayList<>();
+ Map<Integer, List<TRegionReplicaSet>> databaseAllocatedRegionGroups = new
TreeMap<>();
+ for (int db = 0; db < databaseNum; db++) {
+ databaseAllocatedRegionGroups.put(db, new ArrayList<>());
+ }
+
+ int regionId = 0;
+ for (int round = 0; round < regionGroupsPerDatabase; round++) {
+ for (int db = 0; db < databaseNum; db++) {
+ TRegionReplicaSet newRegionGroup =
+ ALLOCATOR.generateOptimalRegionReplicasDistribution(
+ AVAILABLE_DATA_NODE_MAP,
+ FREE_SPACE_MAP,
+ allocatedRegionGroups,
+ databaseAllocatedRegionGroups.get(db),
+ replicationFactor,
+ new TConsensusGroupId(TConsensusGroupType.DataRegion,
regionId++));
+ allocatedRegionGroups.add(newRegionGroup);
+ databaseAllocatedRegionGroups.get(db).add(newRegionGroup);
+ }
+ }
+
+ for (int db = 0; db < databaseNum; db++) {
+ Map<Integer, Integer> perDnReplicaCount =
+ countReplicasPerDataNode(databaseAllocatedRegionGroups.get(db));
+ int minCount = Integer.MAX_VALUE;
+ int maxCount = Integer.MIN_VALUE;
+ for (int dnId = 1; dnId <= TEST_DATA_NODE_NUM; dnId++) {
+ int c = perDnReplicaCount.getOrDefault(dnId, 0);
+ minCount = Math.min(minCount, c);
+ maxCount = Math.max(maxCount, c);
+ }
+ LOGGER.info(
+ "rf={}, dbs={}, db {}: per-DN replica min={}, max={}",
+ replicationFactor,
+ databaseNum,
+ db,
+ minCount,
+ maxCount);
+ Assert.assertTrue(
+ "Per-db replica spread too wide for db " + db + ": max=" + maxCount
+ ", min=" + minCount,
+ maxCount - minCount <= 1);
+ }
+
+ Map<Integer, Integer> globalCount =
countReplicasPerDataNode(allocatedRegionGroups);
+ int globalMin = Integer.MAX_VALUE;
+ int globalMax = Integer.MIN_VALUE;
+ for (int dnId = 1; dnId <= TEST_DATA_NODE_NUM; dnId++) {
+ int c = globalCount.getOrDefault(dnId, 0);
+ globalMin = Math.min(globalMin, c);
+ globalMax = Math.max(globalMax, c);
+ }
+ LOGGER.info(
+ "rf={}, dbs={}, global per-DN replica min={}, max={}",
+ replicationFactor,
+ databaseNum,
+ globalMin,
+ globalMax);
+ Assert.assertTrue(
+ "Global replica spread too wide: max=" + globalMax + ", min=" +
globalMin,
+ globalMax - globalMin <= 1);
+ }
+
+ private Map<Integer, Integer>
countReplicasPerDataNode(List<TRegionReplicaSet> regionGroups) {
+ Map<Integer, Integer> counter = new HashMap<>();
+ for (TRegionReplicaSet rg : regionGroups) {
+ for (TDataNodeLocation loc : rg.getDataNodeLocations()) {
+ counter.merge(loc.getDataNodeId(), 1, Integer::sum);
+ }
+ }
+ return counter;
+ }
+}
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 623dffee28b..4d7b5bcea87 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -679,6 +679,15 @@ default_data_region_group_num_per_database=2
# Datatype: Integer
data_region_per_data_node=0
+# The policy of allocating new RegionGroup replicas to DataNodes.
+# These policies are currently supported:
+# 1. GREEDY (Each replica is placed on the DataNode that currently holds the
fewest regions; database-aware tie-break)
+# 2. GCR (Greedy + CopySet; balances cluster regions, per-database regions,
and 2-region scatter)
+# 3. PGP (Partite-Graph Placement; based on the PGP paper, with database-aware
balance)
+# effectiveMode: restart
+# Datatype: String
+# region_group_allocate_policy=GCR
+
# Whether to enable auto leader balance for Ratis consensus protocol.
# The ConfigNode-leader will balance the leader of Ratis-RegionGroups by
leader_distribution_policy if set true.
# Notice: Default is false because the Ratis is unstable for this function.