This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/1.3.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d529ee80f4fd2c718e1920fef9c120205de8f1ff Author: Yongzao <[email protected]> AuthorDate: Wed Aug 27 09:42:07 2025 +0800 Shuffle data partition allocation strategy (#16260) (#16279) --- .../it/env/cluster/config/MppCommonConfig.java | 6 + .../env/cluster/config/MppSharedCommonConfig.java | 6 + .../it/env/remote/config/RemoteCommonConfig.java | 5 + .../org/apache/iotdb/itbase/env/CommonConfig.java | 2 + ...T.java => IoTDBPartitionInheritStrategyIT.java} | 4 +- .../partition/IoTDBPartitionShuffleStrategyIT.java | 140 +++++++++++++++ .../iotdb/confignode/conf/ConfigNodeConfig.java | 10 ++ .../confignode/conf/ConfigNodeDescriptor.java | 4 + .../manager/load/balancer/PartitionBalancer.java | 194 +++++++++++++++------ 9 files changed, 319 insertions(+), 52 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java index 26e7d8df357..891c785a33d 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java @@ -363,6 +363,12 @@ public class MppCommonConfig extends MppBaseConfig implements CommonConfig { return this; } + @Override + public CommonConfig setDataPartitionAllocationStrategy(String dataPartitionAllocationStrategy) { + setProperty("data_partition_allocation_strategy", dataPartitionAllocationStrategy); + return this; + } + @Override public CommonConfig setSeriesPartitionExecutorClass(String seriesPartitionExecutorClass) { setProperty("series_partition_executor_class", seriesPartitionExecutorClass); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java index 6ad723a8013..da08bec4c55 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java @@ -361,6 +361,12 @@ public class MppSharedCommonConfig implements CommonConfig { return this; } + @Override + public CommonConfig setDataPartitionAllocationStrategy(String dataPartitionAllocationStrategy) { + cnConfig.setDataPartitionAllocationStrategy(dataPartitionAllocationStrategy); + return this; + } + @Override public CommonConfig setSeriesPartitionExecutorClass(String seriesPartitionExecutorClass) { cnConfig.setSeriesPartitionExecutorClass(seriesPartitionExecutorClass); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java index 48f090943f5..6ba2b3c2f71 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java @@ -254,6 +254,11 @@ public class RemoteCommonConfig implements CommonConfig { return this; } + @Override + public CommonConfig setDataPartitionAllocationStrategy(String dataPartitionAllocationStrategy) { + return this; + } + @Override public CommonConfig setSeriesPartitionExecutorClass(String seriesPartitionExecutorClass) { return this; diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java index 5f39e911e39..f17b923442e 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java @@ -116,6 +116,8 @@ public interface CommonConfig { CommonConfig setSeriesSlotNum(int seriesSlotNum); + CommonConfig setDataPartitionAllocationStrategy(String dataPartitionAllocationStrategy); + CommonConfig setSeriesPartitionExecutorClass(String seriesPartitionExecutorClass); CommonConfig setSchemaMemoryAllocate(String schemaMemoryAllocate); diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritStrategyIT.java similarity index 98% rename from integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java rename to integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritStrategyIT.java index 2efd5db7a0b..91d34b2d0c9 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritStrategyIT.java @@ -49,7 +49,7 @@ import java.util.concurrent.atomic.AtomicInteger; @RunWith(IoTDBTestRunner.class) @Category({ClusterIT.class}) -public class IoTDBPartitionInheritPolicyIT { +public class IoTDBPartitionInheritStrategyIT { private static final String testDataRegionConsensusProtocolClass = ConsensusFactory.RATIS_CONSENSUS; @@ -91,7 +91,7 @@ public class IoTDBPartitionInheritPolicyIT { } @Test - public void testDataPartitionInheritPolicy() throws Exception { + public void testDataPartitionInheritStrategy() throws Exception { final long baseStartTime = 1000; Map<TSeriesPartitionSlot, TConsensusGroupId> dataAllotTable1 = new ConcurrentHashMap<>(); diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionShuffleStrategyIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionShuffleStrategyIT.java new file mode 100644 index 00000000000..70f170caa11 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionShuffleStrategyIT.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.it.partition; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils; +import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq; +import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp; +import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +@RunWith(IoTDBTestRunner.class) +@Category({ClusterIT.class}) +public class IoTDBPartitionShuffleStrategyIT { + + private static final String testDataRegionConsensusProtocolClass = + ConsensusFactory.RATIS_CONSENSUS; + private static final int testReplicationFactor = 1; + private static final String testDataPartitionAllocationStrategy = "SHUFFLE"; + private static final int testSeriesSlotNum = 1000; + private static final long testTimePartitionInterval = 604800000; + private static final double testDataRegionPerDataNode = 5.0; + + private static final String database = "root.database"; + private static final int testTimePartitionSlotsNum = 100; + + @BeforeClass + public static void setUp() throws Exception { + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setDataRegionConsensusProtocolClass(testDataRegionConsensusProtocolClass) + .setDataReplicationFactor(testReplicationFactor) + .setTimePartitionInterval(testTimePartitionInterval) + .setSeriesSlotNum(testSeriesSlotNum) + .setDataPartitionAllocationStrategy(testDataPartitionAllocationStrategy) + .setDataRegionPerDataNode(testDataRegionPerDataNode); + + // Init 1C1D environment + EnvFactory.getEnv().initClusterEnvironment(1, 1); + + // Set Database + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + TSStatus status = client.setDatabase(new TDatabaseSchema(database)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + } + } + + @AfterClass + public static void tearDown() { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testDataPartitionShuffleStrategy() throws Exception { + List<Integer> randomTimeSlotList = new ArrayList<>(); + for (int i = 0; i < testTimePartitionSlotsNum; i++) { + randomTimeSlotList.add(i); + } + Collections.shuffle(randomTimeSlotList); + for (int timeSlotId : randomTimeSlotList) { + // To test the shuffle strategy, we merely need to use a random time slot order + ConfigNodeTestUtils.getOrCreateDataPartitionWithRetry( + database, 0, testSeriesSlotNum, timeSlotId, timeSlotId + 1, testTimePartitionInterval); + } + TDataPartitionTableResp dataPartitionTableResp; + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + dataPartitionTableResp = + client.getDataPartitionTable( + new TDataPartitionReq( + ConfigNodeTestUtils.constructPartitionSlotsMap( + database, + 0, + testSeriesSlotNum, + 0, + testTimePartitionSlotsNum, + testTimePartitionInterval))); + } + Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>> + partitionTable = dataPartitionTableResp.getDataPartitionTable(); + for (long currentStartTime = testTimePartitionInterval; + currentStartTime < testTimePartitionInterval * testTimePartitionSlotsNum; + currentStartTime += testTimePartitionInterval) { + TTimePartitionSlot precedingTimeSlot = + new TTimePartitionSlot(currentStartTime - testTimePartitionInterval); + TTimePartitionSlot currentTimeSlot = new TTimePartitionSlot(currentStartTime); + for (int seriesSlotId = 0; seriesSlotId < testSeriesSlotNum; seriesSlotId++) { + TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(seriesSlotId); + List<TConsensusGroupId> precedingRegionGroupIds = + partitionTable.get(database).get(seriesPartitionSlot).get(precedingTimeSlot); + List<TConsensusGroupId> currentRegionGroupIds = + partitionTable.get(database).get(seriesPartitionSlot).get(currentTimeSlot); + Assert.assertEquals(precedingRegionGroupIds.size(), currentRegionGroupIds.size()); + for (int i = 0; i < precedingRegionGroupIds.size(); i++) { + // Ensure that the RegionGroupId is different in two adjacent TimePartitionSlots + Assert.assertNotEquals(precedingRegionGroupIds.get(i), currentRegionGroupIds.get(i)); + } + } + } + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index 50c8f5bc9a0..03d12ee46a6 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -82,6 +82,8 @@ public class ConfigNodeConfig { private String seriesPartitionExecutorClass = "org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor"; + private String dataPartitionAllocationStrategy = "INHERIT"; + /** The policy of extension SchemaRegionGroup for each Database. */ private RegionGroupExtensionPolicy schemaRegionGroupExtensionPolicy = RegionGroupExtensionPolicy.AUTO; @@ -421,6 +423,14 @@ public class ConfigNodeConfig { this.seriesPartitionExecutorClass = seriesPartitionExecutorClass; } + public String getDataPartitionAllocationStrategy() { + return dataPartitionAllocationStrategy; + } + + public void setDataPartitionAllocationStrategy(String dataPartitionAllocationStrategy) { + this.dataPartitionAllocationStrategy = dataPartitionAllocationStrategy; + } + public int getCnRpcMaxConcurrentClientNum() { return rpcMaxConcurrentClientNum; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index b9efee34e28..1209e669055 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java @@ -191,6 +191,10 @@ public class ConfigNodeDescriptor { .getProperty("series_partition_executor_class", conf.getSeriesPartitionExecutorClass()) .trim()); + conf.setDataPartitionAllocationStrategy( + properties.getProperty( + "data_partition_allocation_strategy", conf.getDataPartitionAllocationStrategy())); + conf.setConfigNodeConsensusProtocolClass( properties .getProperty( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java index 4d540c1ab2a..7d0f6e67126 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.partition.DataPartitionTable; import org.apache.iotdb.commons.partition.SchemaPartitionTable; import org.apache.iotdb.commons.partition.SeriesPartitionTable; import org.apache.iotdb.commons.structure.BalanceTreeMap; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.exception.DatabaseNotExistsException; import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException; import org.apache.iotdb.confignode.manager.IManager; @@ -39,10 +40,12 @@ import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; @@ -56,12 +59,36 @@ public class PartitionBalancer { private final IManager configManager; - // Map<DatabaseName, DataPartitionPolicyTable> + private final DataPartitionAllocationStrategy dataPartitionAllocationStrategy; + // Map<DatabaseName, DataPartitionPolicyTable>, employed by INHERIT allocation strategy private final Map<String, DataPartitionPolicyTable> dataPartitionPolicyTableMap; + private enum DataPartitionAllocationStrategy { + // The INHERIT strategy tries to allocate adjacent DataPartitions as + // consistent as possible, while ensuring load balancing. + INHERIT, + // The SHUFFLE strategy tries to allocate adjacent DataPartitions as + // inconsistent as possible, note the result could be unbalanced. + SHUFFLE + } + public PartitionBalancer(IManager configManager) { this.configManager = configManager; this.dataPartitionPolicyTableMap = new ConcurrentHashMap<>(); + switch (ConfigNodeDescriptor.getInstance().getConf().getDataPartitionAllocationStrategy()) { + case "INHERIT": + this.dataPartitionAllocationStrategy = DataPartitionAllocationStrategy.INHERIT; + break; + case "SHUFFLE": + this.dataPartitionAllocationStrategy = DataPartitionAllocationStrategy.SHUFFLE; + break; + default: + LOGGER.warn( + "Unknown DataPartition allocation strategy {}, using INHERIT strategy by default.", + ConfigNodeDescriptor.getInstance().getConf().getDataPartitionAllocationStrategy()); + this.dataPartitionAllocationStrategy = DataPartitionAllocationStrategy.INHERIT; + break; + } } /** @@ -152,56 +179,25 @@ public class PartitionBalancer { List<TTimePartitionSlot> timePartitionSlots = seriesPartitionEntry.getValue().getTimePartitionSlots(); timePartitionSlots.sort(Comparator.comparingLong(TTimePartitionSlot::getStartTime)); - - for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) { - - // 1. The historical DataPartition will try to inherit successor DataPartition first - TConsensusGroupId successor = - getPartitionManager() - .getSuccessorDataPartition(database, seriesPartitionSlot, timePartitionSlot); - if (successor != null && availableDataRegionGroupCounter.containsKey(successor)) { - seriesPartitionTable.putDataPartition(timePartitionSlot, successor); - availableDataRegionGroupCounter.put( - successor, availableDataRegionGroupCounter.get(successor) + 1); - continue; - } - - // 2. Assign DataPartition base on the DataAllotTable - TConsensusGroupId allotGroupId = - allotTable.getRegionGroupIdOrActivateIfNecessary(seriesPartitionSlot); - if (availableDataRegionGroupCounter.containsKey(allotGroupId)) { - seriesPartitionTable.putDataPartition(timePartitionSlot, allotGroupId); - availableDataRegionGroupCounter.put( - allotGroupId, availableDataRegionGroupCounter.get(allotGroupId) + 1); - continue; - } - - // 3. The allotDataRegionGroup is unavailable, - // try to inherit predecessor DataPartition - TConsensusGroupId predecessor = - getPartitionManager() - .getPredecessorDataPartition(database, seriesPartitionSlot, timePartitionSlot); - if (predecessor != null && availableDataRegionGroupCounter.containsKey(predecessor)) { - seriesPartitionTable.putDataPartition(timePartitionSlot, predecessor); - availableDataRegionGroupCounter.put( - predecessor, availableDataRegionGroupCounter.get(predecessor) + 1); - continue; - } - - // 4. Assign the DataPartition to DataRegionGroup with the least DataPartitions - // If the above DataRegionGroups are unavailable - TConsensusGroupId greedyGroupId = availableDataRegionGroupCounter.getKeyWithMinValue(); - seriesPartitionTable.putDataPartition(timePartitionSlot, greedyGroupId); - availableDataRegionGroupCounter.put( - greedyGroupId, availableDataRegionGroupCounter.get(greedyGroupId) + 1); - LOGGER.warn( - "[PartitionBalancer] The SeriesSlot: {} in TimeSlot: {} will be allocated to DataRegionGroup: {}, because the original target: {} is currently unavailable.", - seriesPartitionSlot, - timePartitionSlot, - greedyGroupId, - allotGroupId); + switch (dataPartitionAllocationStrategy) { + case INHERIT: + inheritAllocationStrategy( + database, + allotTable, + seriesPartitionSlot, + timePartitionSlots, + availableDataRegionGroupCounter, + seriesPartitionTable); + break; + case SHUFFLE: + shuffleAllocationStrategy( + database, + seriesPartitionSlot, + timePartitionSlots, + availableDataRegionGroupCounter, + seriesPartitionTable); + break; } - dataPartitionTable .getDataPartitionMap() .put(seriesPartitionEntry.getKey(), seriesPartitionTable); @@ -215,6 +211,104 @@ public class PartitionBalancer { return result; } + private void inheritAllocationStrategy( + String database, + DataPartitionPolicyTable allotTable, + TSeriesPartitionSlot seriesPartitionSlot, + List<TTimePartitionSlot> timePartitionSlots, + BalanceTreeMap<TConsensusGroupId, Integer> availableDataRegionGroupCounter, + SeriesPartitionTable seriesPartitionTable) { + for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) { + + // 1. The historical DataPartition will try to inherit successor DataPartition first + TConsensusGroupId successor = + getPartitionManager() + .getSuccessorDataPartition(database, seriesPartitionSlot, timePartitionSlot); + if (successor != null && availableDataRegionGroupCounter.containsKey(successor)) { + seriesPartitionTable.putDataPartition(timePartitionSlot, successor); + availableDataRegionGroupCounter.put( + successor, availableDataRegionGroupCounter.get(successor) + 1); + continue; + } + + // 2. Assign DataPartition base on the DataAllotTable + TConsensusGroupId allotGroupId = + allotTable.getRegionGroupIdOrActivateIfNecessary(seriesPartitionSlot); + if (availableDataRegionGroupCounter.containsKey(allotGroupId)) { + seriesPartitionTable.putDataPartition(timePartitionSlot, allotGroupId); + availableDataRegionGroupCounter.put( + allotGroupId, availableDataRegionGroupCounter.get(allotGroupId) + 1); + continue; + } + + // 3. The allotDataRegionGroup is unavailable, + // try to inherit predecessor DataPartition + TConsensusGroupId predecessor = + getPartitionManager() + .getPredecessorDataPartition(database, seriesPartitionSlot, timePartitionSlot); + if (predecessor != null && availableDataRegionGroupCounter.containsKey(predecessor)) { + seriesPartitionTable.putDataPartition(timePartitionSlot, predecessor); + availableDataRegionGroupCounter.put( + predecessor, availableDataRegionGroupCounter.get(predecessor) + 1); + continue; + } + + // 4. Assign the DataPartition to DataRegionGroup with the least DataPartitions + // If the above DataRegionGroups are unavailable + TConsensusGroupId greedyGroupId = availableDataRegionGroupCounter.getKeyWithMinValue(); + seriesPartitionTable.putDataPartition(timePartitionSlot, greedyGroupId); + availableDataRegionGroupCounter.put( + greedyGroupId, availableDataRegionGroupCounter.get(greedyGroupId) + 1); + LOGGER.warn( + "[PartitionBalancer] The SeriesSlot: {} in TimeSlot: {} will be allocated to DataRegionGroup: {}, because the original target: {} is currently unavailable.", + seriesPartitionSlot, + timePartitionSlot, + greedyGroupId, + allotGroupId); + } + } + + private void shuffleAllocationStrategy( + String database, + TSeriesPartitionSlot seriesPartitionSlot, + List<TTimePartitionSlot> timePartitionSlots, + BalanceTreeMap<TConsensusGroupId, Integer> availableDataRegionGroupCounter, + SeriesPartitionTable seriesPartitionTable) { + final Random random = new Random(); + List<TConsensusGroupId> availableDataRegionGroups = + new ArrayList<>(availableDataRegionGroupCounter.keySet()); + for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) { + if (availableDataRegionGroups.size() == 1) { + // Only one available DataRegionGroup + seriesPartitionTable.putDataPartition( + timePartitionSlot, availableDataRegionGroups.iterator().next()); + continue; + } + TConsensusGroupId predecessor = + getPartitionManager() + .getPredecessorDataPartition(database, seriesPartitionSlot, timePartitionSlot); + TConsensusGroupId successor = + getPartitionManager() + .getSuccessorDataPartition(database, seriesPartitionSlot, timePartitionSlot); + if (predecessor != null + && successor != null + && !predecessor.equals(successor) + && availableDataRegionGroups.size() == 2) { + // Only two available DataRegionGroups and predecessor equals successor + seriesPartitionTable.putDataPartition( + timePartitionSlot, random.nextBoolean() ? successor : predecessor); + continue; + } + TConsensusGroupId targetGroupId; + do { + // Randomly pick a DataRegionGroup from availableDataRegionGroups + targetGroupId = + availableDataRegionGroups.get(random.nextInt(availableDataRegionGroups.size())); + } while (targetGroupId.equals(predecessor) || targetGroupId.equals(successor)); + seriesPartitionTable.putDataPartition(timePartitionSlot, targetGroupId); + } + } + /** * Re-balance the DataPartitionPolicyTable. *
