This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch Computing-resource-balancing_cp in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c6356e935fdff9c0f9220c3419b06b00dc6bf821 Author: YongzaoDan <[email protected]> AuthorDate: Fri Jul 21 16:03:27 2023 +0800 Function Finish --- .../iotdb/confignode/conf/ConfigNodeConfig.java | 14 -- .../confignode/conf/ConfigNodeDescriptor.java | 6 - .../iotdb/confignode/manager/load/LoadManager.java | 21 +- .../manager/load/balancer/PartitionBalancer.java | 270 ++++++++++++++++++++- .../load/balancer/partition/DataAllotTable.java | 218 +++++++++++++++++ .../partition/GreedyPartitionAllocator.java | 203 ---------------- .../balancer/partition/IPartitionAllocator.java | 55 ----- .../manager/partition/PartitionManager.java | 123 ++++++++-- .../partition/DatabasePartitionTable.java | 82 ++++++- .../persistence/partition/PartitionInfo.java | 111 ++++++++- .../statemachine/CreateRegionGroupsProcedure.java | 6 + .../resources/conf/iotdb-common.properties | 7 - .../commons/partition/DataPartitionTable.java | 123 +++++++++- .../commons/partition/SeriesPartitionTable.java | 119 +++++++-- .../iotdb/commons/structure/BalanceTreeMap.java | 75 ++++++ 15 files changed, 1066 insertions(+), 367 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index 2907c3b1eff..6b99e0da369 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -110,12 +110,6 @@ public class ConfigNodeConfig { private RegionBalancer.RegionGroupAllocatePolicy regionGroupAllocatePolicy = RegionBalancer.RegionGroupAllocatePolicy.GREEDY; - /** - * DataPartition within the same SeriesPartitionSlot will inherit the allocation result of the - * predecessor or successor TimePartitionSlot if set true. - */ - private boolean enableDataPartitionInheritPolicy = true; - /** Max concurrent client number. */ private int rpcMaxConcurrentClientNum = 65535; @@ -559,14 +553,6 @@ public class ConfigNodeConfig { this.regionGroupAllocatePolicy = regionGroupAllocatePolicy; } - public boolean isEnableDataPartitionInheritPolicy() { - return enableDataPartitionInheritPolicy; - } - - public void setEnableDataPartitionInheritPolicy(boolean enableDataPartitionInheritPolicy) { - this.enableDataPartitionInheritPolicy = enableDataPartitionInheritPolicy; - } - public int getThriftServerAwaitTimeForStopService() { return thriftServerAwaitTimeForStopService; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index 59eab23ddc4..5490b700c36 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java @@ -272,12 +272,6 @@ public class ConfigNodeDescriptor { "The configured region allocate strategy does not exist, use the default: GREEDY!"); } - conf.setEnableDataPartitionInheritPolicy( - Boolean.parseBoolean( - properties.getProperty( - "enable_data_partition_inherit_policy", - String.valueOf(conf.isEnableDataPartitionInheritPolicy())))); - conf.setCnRpcAdvancedCompressionEnable( Boolean.parseBoolean( properties diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java index d88ebd70fe4..0c284a149ee 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java @@ -94,7 +94,7 @@ public class LoadManager { /** * Generate an optimal CreateRegionGroupsPlan. * - * @param allotmentMap Map<StorageGroupName, Region allotment> + * @param allotmentMap Map<DatabaseName, Region allotment> * @param consensusGroupType TConsensusGroupType of RegionGroup to be allocated * @return CreateRegionGroupsPlan * @throws NotEnoughDataNodeException If there are not enough DataNodes @@ -110,7 +110,7 @@ public class LoadManager { * Allocate SchemaPartitions. * * @param unassignedSchemaPartitionSlotsMap SchemaPartitionSlots that should be assigned - * @return Map<StorageGroupName, SchemaPartitionTable>, the allocating result + * @return Map<DatabaseName, SchemaPartitionTable>, the allocating result */ public Map<String, SchemaPartitionTable> allocateSchemaPartition( Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap) @@ -122,7 +122,7 @@ public class LoadManager { * Allocate DataPartitions. * * @param unassignedDataPartitionSlotsMap DataPartitionSlots that should be assigned - * @return Map<StorageGroupName, DataPartitionTable>, the allocating result + * @return Map<DatabaseName, DataPartitionTable>, the allocating result */ public Map<String, DataPartitionTable> allocateDataPartition( Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> unassignedDataPartitionSlotsMap) @@ -130,6 +130,19 @@ public class LoadManager { return partitionBalancer.allocateDataPartition(unassignedDataPartitionSlotsMap); } + /** + * Re-balance runtime status cached in the PartitionBalancer. This method may shift the + * currentTimePartition or update the DataAllotTable. + */ + public void reBalancePartitionPolicyIfNecessary( + Map<String, DataPartitionTable> assignedDataPartition) { + partitionBalancer.reBalanceDataPartitionPolicyIfNecessary(assignedDataPartition); + } + + public void updateDataAllotTable(String database) { + partitionBalancer.updateDataAllotTable(database); + } + public void broadcastLatestRegionRouteMap() { statisticsService.broadcastLatestRegionRouteMap(); } @@ -138,12 +151,14 @@ public class LoadManager { loadCache.initHeartbeatCache(configManager); heartbeatService.startHeartbeatService(); statisticsService.startLoadStatisticsService(); + partitionBalancer.setupPartitionBalancer(); } public void stopLoadServices() { heartbeatService.stopHeartbeatService(); statisticsService.stopLoadStatisticsService(); loadCache.clearHeartbeatCache(); + partitionBalancer.clearPartitionBalancer(); } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java index 89d9235911c..f0d408e91cd 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java @@ -16,19 +16,37 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.confignode.manager.load.balancer; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.partition.DataPartitionTable; import org.apache.iotdb.commons.partition.SchemaPartitionTable; +import org.apache.iotdb.commons.partition.SeriesPartitionTable; +import org.apache.iotdb.commons.structure.BalanceTreeMap; +import org.apache.iotdb.confignode.conf.ConfigNodeConfig; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; +import org.apache.iotdb.confignode.exception.DatabaseNotExistsException; import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException; import org.apache.iotdb.confignode.manager.IManager; -import org.apache.iotdb.confignode.manager.load.balancer.partition.GreedyPartitionAllocator; -import org.apache.iotdb.confignode.manager.load.balancer.partition.IPartitionAllocator; +import org.apache.iotdb.confignode.manager.load.balancer.partition.DataAllotTable; +import org.apache.iotdb.confignode.manager.partition.PartitionManager; +import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager; import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; +import org.apache.iotdb.tsfile.utils.Pair; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * The SeriesPartitionSlotBalancer provides interfaces to generate optimal Partition allocation and @@ -36,38 +54,272 @@ import java.util.Map; */ public class PartitionBalancer { + private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); + private static final int SERIES_SLOT_NUM = CONF.getSeriesSlotNum(); + private static final long TIME_PARTITION_INTERVAL = + CommonDescriptor.getInstance().getConfig().getTimePartitionInterval(); + + private static final Logger LOGGER = LoggerFactory.getLogger(PartitionBalancer.class); + private final IManager configManager; + // Map<DatabaseName, DataAllotTable> + private final Map<String, DataAllotTable> dataAllotTableMap; + public PartitionBalancer(IManager configManager) { this.configManager = configManager; + this.dataAllotTableMap = new ConcurrentHashMap<>(); } /** * Allocate SchemaPartitions * * @param unassignedSchemaPartitionSlotsMap SchemaPartitionSlots that should be assigned - * @return Map<StorageGroupName, SchemaPartitionTable>, the allocating result + * @return Map<DatabaseName, SchemaPartitionTable>, the allocating result */ public Map<String, SchemaPartitionTable> allocateSchemaPartition( Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap) throws NoAvailableRegionGroupException { - return genPartitionAllocator().allocateSchemaPartition(unassignedSchemaPartitionSlotsMap); + Map<String, SchemaPartitionTable> result = new ConcurrentHashMap<>(); + + for (Map.Entry<String, List<TSeriesPartitionSlot>> slotsMapEntry : + unassignedSchemaPartitionSlotsMap.entrySet()) { + final String database = slotsMapEntry.getKey(); + final List<TSeriesPartitionSlot> unassignedPartitionSlots = slotsMapEntry.getValue(); + + // Filter available SchemaRegionGroups and + // sort them by the number of allocated SchemaPartitions + BalanceTreeMap<TConsensusGroupId, Integer> counter = new BalanceTreeMap<>(); + List<Pair<Long, TConsensusGroupId>> regionSlotsCounter = + getPartitionManager() + .getSortedRegionGroupSlotsCounter(database, TConsensusGroupType.SchemaRegion); + for (Pair<Long, TConsensusGroupId> pair : regionSlotsCounter) { + counter.put(pair.getRight(), pair.getLeft().intValue()); + } + + // Enumerate SeriesPartitionSlot + Map<TSeriesPartitionSlot, TConsensusGroupId> schemaPartitionMap = new ConcurrentHashMap<>(); + for (TSeriesPartitionSlot seriesPartitionSlot : unassignedPartitionSlots) { + // Greedy allocation: allocate the unassigned SchemaPartition to + // the RegionGroup whose allocated SchemaPartitions is the least + TConsensusGroupId consensusGroupId = counter.getKeyWithMinValue(); + schemaPartitionMap.put(seriesPartitionSlot, consensusGroupId); + counter.put(consensusGroupId, counter.get(consensusGroupId) + 1); + } + result.put(database, new SchemaPartitionTable(schemaPartitionMap)); + } + + return result; } /** * Allocate DataPartitions * * @param unassignedDataPartitionSlotsMap DataPartitionSlots that should be assigned - * @return Map<StorageGroupName, DataPartitionTable>, the allocating result + * @return Map<DatabaseName, DataPartitionTable>, the allocating result */ public Map<String, DataPartitionTable> allocateDataPartition( Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> unassignedDataPartitionSlotsMap) throws NoAvailableRegionGroupException { - return genPartitionAllocator().allocateDataPartition(unassignedDataPartitionSlotsMap); + Map<String, DataPartitionTable> result = new ConcurrentHashMap<>(); + + for (Map.Entry<String, Map<TSeriesPartitionSlot, TTimeSlotList>> slotsMapEntry : + unassignedDataPartitionSlotsMap.entrySet()) { + final String database = slotsMapEntry.getKey(); + final Map<TSeriesPartitionSlot, TTimeSlotList> unassignedPartitionSlotsMap = + slotsMapEntry.getValue(); + + // Filter available DataRegionGroups and + // sort them by the number of allocated DataPartitions + BalanceTreeMap<TConsensusGroupId, Integer> counter = new BalanceTreeMap<>(); + List<Pair<Long, TConsensusGroupId>> regionSlotsCounter = + getPartitionManager() + .getSortedRegionGroupSlotsCounter(database, TConsensusGroupType.DataRegion); + for (Pair<Long, TConsensusGroupId> pair : regionSlotsCounter) { + counter.put(pair.getRight(), pair.getLeft().intValue()); + } + + DataAllotTable allotTable = dataAllotTableMap.get(database); + allotTable.acquireReadLock(); + TTimePartitionSlot currentTimePartition = allotTable.getCurrentTimePartition(); + DataPartitionTable dataPartitionTable = new DataPartitionTable(); + + try { + // Enumerate SeriesPartitionSlot + for (Map.Entry<TSeriesPartitionSlot, TTimeSlotList> seriesPartitionEntry : + unassignedPartitionSlotsMap.entrySet()) { + SeriesPartitionTable seriesPartitionTable = new SeriesPartitionTable(); + + // Enumerate TimePartitionSlot in ascending order + TSeriesPartitionSlot seriesPartitionSlot = seriesPartitionEntry.getKey(); + List<TTimePartitionSlot> timePartitionSlots = + seriesPartitionEntry.getValue().getTimePartitionSlots(); + timePartitionSlots.sort(Comparator.comparingLong(TTimePartitionSlot::getStartTime)); + + for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) { + + // 1. The historical DataPartition will try to inherit successor DataPartition first + if (timePartitionSlot.getStartTime() < currentTimePartition.getStartTime()) { + TConsensusGroupId successor = + getPartitionManager() + .getSuccessorDataPartition(database, seriesPartitionSlot, timePartitionSlot); + if (successor != null && counter.containsKey(successor)) { + seriesPartitionTable.putDataPartition(timePartitionSlot, successor); + counter.put(successor, counter.get(successor) + 1); + continue; + } + } + + // 2. Assign DataPartition base on the DataAllotTable + TConsensusGroupId allotGroupId = allotTable.getRegionGroupId(seriesPartitionSlot); + if (counter.containsKey(allotGroupId)) { + seriesPartitionTable.putDataPartition(timePartitionSlot, allotGroupId); + counter.put(allotGroupId, counter.get(allotGroupId) + 1); + continue; + } + + // 3. Assign the DataPartition to DataRegionGroup with the least DataPartitions + // If the above DataRegionGroups are unavailable + TConsensusGroupId greedyGroupId = counter.getKeyWithMinValue(); + seriesPartitionTable.putDataPartition(timePartitionSlot, greedyGroupId); + counter.put(greedyGroupId, counter.get(greedyGroupId) + 1); + } + + dataPartitionTable + .getDataPartitionMap() + .put(seriesPartitionEntry.getKey(), seriesPartitionTable); + } + } finally { + allotTable.releaseReadLock(); + } + result.put(database, dataPartitionTable); + } + + return result; + } + + /** + * Try to re-balance the DataPartitionPolicy when new DataPartitions are created. + * + * @param assignedDataPartition new created DataPartitions + */ + public void reBalanceDataPartitionPolicyIfNecessary( + Map<String, DataPartitionTable> assignedDataPartition) { + assignedDataPartition.forEach( + (database, dataPartitionTable) -> { + if (updateDataPartitionCount(database, dataPartitionTable)) { + // Update the DataAllotTable if the currentTimePartition is updated + updateDataAllotTable(database); + } + }); + } + + /** + * Update the DataPartitionCount in DataAllotTable + * + * @param database Database name + * @param dataPartitionTable new created DataPartitionTable + * @return true if the currentTimePartition is updated, false otherwise + */ + public boolean updateDataPartitionCount(String database, DataPartitionTable dataPartitionTable) { + try { + dataAllotTableMap + .get(database) + .addTimePartitionCount(dataPartitionTable.getTimeSlotCountMap()); + return dataAllotTableMap + .get(database) + .updateCurrentTimePartition( + getPartitionManager().getRegionGroupCount(database, TConsensusGroupType.DataRegion)); + } catch (DatabaseNotExistsException e) { + LOGGER.error("Database {} not exists", database); + return false; + } + } + + /** + * Update the DataAllotTable + * + * @param database Database name + */ + public void updateDataAllotTable(String database) { + TTimePartitionSlot currentTimePartition = + dataAllotTableMap.get(database).getCurrentTimePartition(); + Map<TSeriesPartitionSlot, TConsensusGroupId> allocatedTable = new ConcurrentHashMap<>(); + for (int i = 0; i < SERIES_SLOT_NUM; i++) { + TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i); + Pair<TTimePartitionSlot, TConsensusGroupId> lastDataPartition = + getPartitionManager().getLastDataPartition(database, seriesPartitionSlot); + if (lastDataPartition != null + && currentTimePartition.compareTo(lastDataPartition.getLeft()) < 0) { + // Put all future DataPartitions into the allocatedTable + allocatedTable.put(seriesPartitionSlot, lastDataPartition.getRight()); + } + } + + try { + dataAllotTableMap + .get(database) + .updateDataAllotTable( + getPartitionManager().getAllRegionGroupIds(database, TConsensusGroupType.DataRegion), + allocatedTable); + } catch (DatabaseNotExistsException e) { + LOGGER.error("Database {} not exists", database); + } + } + + /** Set up the PartitionBalancer when the current ConfigNode becomes leader. */ + public void setupPartitionBalancer() { + dataAllotTableMap.clear(); + getClusterSchemaManager() + .getDatabaseNames() + .forEach( + database -> { + dataAllotTableMap.put(database, new DataAllotTable()); + DataAllotTable dataAllotTable = dataAllotTableMap.get(database); + dataAllotTable.acquireWriteLock(); + try { + int threshold = + DataAllotTable.timePartitionThreshold( + getPartitionManager() + .getRegionGroupCount(database, TConsensusGroupType.DataRegion)); + TTimePartitionSlot maxTimePartitionSlot = + getPartitionManager().getMaxTimePartitionSlot(database); + TTimePartitionSlot minTimePartitionSlot = + getPartitionManager().getMinTimePartitionSlot(database); + TTimePartitionSlot currentTimePartition = maxTimePartitionSlot.deepCopy(); + while (currentTimePartition.compareTo(minTimePartitionSlot) > 0) { + int seriesSlotCount = + getPartitionManager().countSeriesSlot(database, currentTimePartition); + if (seriesSlotCount >= threshold) { + dataAllotTable.setCurrentTimePartition(currentTimePartition.getStartTime()); + break; + } + dataAllotTable.addTimePartitionCount( + Collections.singletonMap(currentTimePartition, seriesSlotCount)); + currentTimePartition.setStartTime( + currentTimePartition.getStartTime() - TIME_PARTITION_INTERVAL); + } + + dataAllotTable.setDataAllotTable( + getPartitionManager().getLastDataAllotTable(database)); + } catch (DatabaseNotExistsException e) { + LOGGER.error("Database {} not exists", database); + } finally { + dataAllotTable.releaseWriteLock(); + } + }); + } + + /** Clear the PartitionBalancer when the current ConfigNode is no longer the leader. */ + public void clearPartitionBalancer() { + dataAllotTableMap.clear(); + } + + private ClusterSchemaManager getClusterSchemaManager() { + return configManager.getClusterSchemaManager(); } - private IPartitionAllocator genPartitionAllocator() { - // TODO: The type of PartitionAllocator should be configurable - return new GreedyPartitionAllocator(configManager); + private PartitionManager getPartitionManager() { + return configManager.getPartitionManager(); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java new file mode 100644 index 00000000000..f334f490888 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.load.balancer.partition; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.structure.BalanceTreeMap; +import org.apache.iotdb.confignode.conf.ConfigNodeConfig; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class DataAllotTable { + + private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); + private static final int SERIES_SLOT_NUM = CONF.getSeriesSlotNum(); + + private final ReentrantReadWriteLock dataAllotTableLock; + private final AtomicReference<TTimePartitionSlot> currentTimePartition; + // Map<TimePartitionSlot, DataPartitionCount> + // Cache the number of DataPartitions in each future TimePartitionSlot + private final TreeMap<TTimePartitionSlot, AtomicInteger> dataPartitionCounter; + // Map<SeriesPartitionSlot, RegionGroupId> + // The optimal allocation of SeriesSlots to RegionGroups in the currentTimePartition + private final Map<TSeriesPartitionSlot, TConsensusGroupId> dataAllotTable; + + public DataAllotTable() { + this.dataAllotTableLock = new ReentrantReadWriteLock(); + this.currentTimePartition = new AtomicReference<>(new TTimePartitionSlot(0)); + this.dataPartitionCounter = new TreeMap<>(); + this.dataAllotTable = new HashMap<>(); + } + + public boolean isEmpty() { + dataAllotTableLock.readLock().lock(); + try { + return dataAllotTable.isEmpty(); + } finally { + dataAllotTableLock.readLock().unlock(); + } + } + + /** + * Update the DataAllotTable according to the current DataRegionGroups and future DataAllotTable. + * + * @param dataRegionGroups the current DataRegionGroups + * @param allocatedTable the future DataAllotTable, i.e. some SeriesSlots have already allocated + */ + public void updateDataAllotTable( + List<TConsensusGroupId> dataRegionGroups, + Map<TSeriesPartitionSlot, TConsensusGroupId> allocatedTable) { + dataAllotTableLock.writeLock().lock(); + try { + // mu is the average number of slots allocated to each regionGroup + int mu = SERIES_SLOT_NUM / dataRegionGroups.size(); + // Decide all SeriesSlot randomly + List<TSeriesPartitionSlot> seriesSlotList = new ArrayList<>(); + for (int i = 0; i < SERIES_SLOT_NUM; i++) { + seriesSlotList.add(new TSeriesPartitionSlot(i)); + } + Collections.shuffle(seriesSlotList); + + // The counter will maintain the number of slots allocated to each regionGroup + BalanceTreeMap<TConsensusGroupId, Integer> counter = new BalanceTreeMap<>(); + Map<TConsensusGroupId, AtomicInteger> regionSlotCounter = new HashMap<>(); + allocatedTable.forEach( + (seriesSlot, regionGroupId) -> + regionSlotCounter + .computeIfAbsent(regionGroupId, empty -> new AtomicInteger(0)) + .incrementAndGet()); + dataRegionGroups.forEach( + regionGroupId -> regionSlotCounter.putIfAbsent(regionGroupId, new AtomicInteger(0))); + regionSlotCounter.forEach( + (regionGroupId, slotNum) -> counter.put(regionGroupId, slotNum.get())); + + Map<TSeriesPartitionSlot, TConsensusGroupId> newAllotTable = new HashMap<>(); + for (TSeriesPartitionSlot seriesPartitionSlot : seriesSlotList) { + TConsensusGroupId oldRegionGroupId = dataAllotTable.get(seriesPartitionSlot); + if (counter.get(oldRegionGroupId) < mu) { + // Inherit the oldRegionGroupId when the slotNum of oldRegionGroupId is less than average + newAllotTable.put(seriesPartitionSlot, oldRegionGroupId); + } else { + // Otherwise, choose the regionGroup with the least slotNum to keep load balance + TConsensusGroupId newRegionGroupId = counter.getKeyWithMinValue(); + newAllotTable.put(seriesPartitionSlot, newRegionGroupId); + counter.put(newRegionGroupId, counter.get(newRegionGroupId) + 1); + } + } + + dataAllotTable.clear(); + dataAllotTable.putAll(newAllotTable); + } finally { + dataAllotTableLock.writeLock().unlock(); + } + } + + /** + * Update the current time partition and remove the useless time partitions. + * + * @param regionGroupNum the number of regionGroups + * @return whether the current time partition is updated + */ + public boolean updateCurrentTimePartition(int regionGroupNum) { + dataAllotTableLock.writeLock().lock(); + try { + AtomicLong newStartTime = new AtomicLong(Long.MIN_VALUE); + dataPartitionCounter.forEach( + (timePartition, counter) -> { + // Select the maximum TimePartition whose slotNum is greater than the following equation + // Ensure that the remaining slots can be still distributed to new regionGroups + if (counter.get() >= timePartitionThreshold(regionGroupNum) + && timePartition.getStartTime() > newStartTime.get()) { + newStartTime.set(timePartition.getStartTime()); + } + }); + + if (newStartTime.get() > currentTimePartition.get().getStartTime()) { + currentTimePartition.set(new TTimePartitionSlot(newStartTime.get())); + dataPartitionCounter + .keySet() + .forEach( + timePartition -> { + // Remove the useless TimePartitions + if (timePartition.getStartTime() < newStartTime.get()) { + dataPartitionCounter.remove(timePartition); + } + }); + return true; + } + } finally { + dataAllotTableLock.writeLock().unlock(); + } + return false; + } + + public void addTimePartitionCount(Map<TTimePartitionSlot, Integer> timePartitionCountMap) { + dataAllotTableLock.writeLock().lock(); + try { + timePartitionCountMap.forEach( + (timePartition, count) -> + dataPartitionCounter + .computeIfAbsent(timePartition, empty -> new AtomicInteger(0)) + .addAndGet(count)); + } finally { + dataAllotTableLock.writeLock().unlock(); + } + } + + public TTimePartitionSlot getCurrentTimePartition() { + return currentTimePartition.get(); + } + + public TConsensusGroupId getRegionGroupId(TSeriesPartitionSlot seriesPartitionSlot) { + dataAllotTableLock.readLock().lock(); + try { + return dataAllotTable.get(seriesPartitionSlot); + } finally { + dataAllotTableLock.readLock().unlock(); + } + } + + /** Only use this interface when init PartitionBalancer. */ + public void setCurrentTimePartition(long startTime) { + currentTimePartition.set(new TTimePartitionSlot(startTime)); + } + + /** Only use this interface when init PartitionBalancer. */ + public void setDataAllotTable(Map<TSeriesPartitionSlot, TConsensusGroupId> dataAllotTable) { + this.dataAllotTable.putAll(dataAllotTable); + } + + public static int timePartitionThreshold(int regionGroupNum) { + return (int) (SERIES_SLOT_NUM * (1.0 - 2.0 / regionGroupNum)); + } + + public void acquireReadLock() { + dataAllotTableLock.readLock().lock(); + } + + public void releaseReadLock() { + dataAllotTableLock.readLock().unlock(); + } + + public void acquireWriteLock() { + dataAllotTableLock.writeLock().lock(); + } + + public void releaseWriteLock() { + dataAllotTableLock.writeLock().unlock(); + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java deleted file mode 100644 index aacf165a3ea..00000000000 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.confignode.manager.load.balancer.partition; - -import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; -import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; -import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; -import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; -import org.apache.iotdb.commons.conf.CommonDescriptor; -import org.apache.iotdb.commons.partition.DataPartitionTable; -import org.apache.iotdb.commons.partition.SchemaPartitionTable; -import org.apache.iotdb.commons.partition.SeriesPartitionTable; -import org.apache.iotdb.confignode.conf.ConfigNodeConfig; -import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; -import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException; -import org.apache.iotdb.confignode.manager.IManager; -import org.apache.iotdb.confignode.manager.partition.PartitionManager; -import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; -import org.apache.iotdb.tsfile.utils.Pair; - -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** Allocating new Partitions by greedy algorithm. */ -public class GreedyPartitionAllocator implements IPartitionAllocator { - - private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); - private static final boolean ENABLE_DATA_PARTITION_INHERIT_POLICY = - CONF.isEnableDataPartitionInheritPolicy(); - private static final long TIME_PARTITION_INTERVAL = - CommonDescriptor.getInstance().getConfig().getTimePartitionInterval(); - - private final IManager configManager; - - public GreedyPartitionAllocator(IManager configManager) { - this.configManager = configManager; - } - - @Override - public Map<String, SchemaPartitionTable> allocateSchemaPartition( - Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap) - throws NoAvailableRegionGroupException { - Map<String, SchemaPartitionTable> result = new ConcurrentHashMap<>(); - - for (Map.Entry<String, List<TSeriesPartitionSlot>> slotsMapEntry : - unassignedSchemaPartitionSlotsMap.entrySet()) { - final String storageGroup = slotsMapEntry.getKey(); - final List<TSeriesPartitionSlot> unassignedPartitionSlots = slotsMapEntry.getValue(); - - // List<Pair<allocatedSlotsNum, TConsensusGroupId>> - List<Pair<Long, TConsensusGroupId>> regionSlotsCounter = - getPartitionManager() - .getSortedRegionGroupSlotsCounter(storageGroup, TConsensusGroupType.SchemaRegion); - - // Enumerate SeriesPartitionSlot - Map<TSeriesPartitionSlot, TConsensusGroupId> schemaPartitionMap = new ConcurrentHashMap<>(); - for (TSeriesPartitionSlot seriesPartitionSlot : unassignedPartitionSlots) { - // Greedy allocation - schemaPartitionMap.put(seriesPartitionSlot, regionSlotsCounter.get(0).getRight()); - // Bubble sort - bubbleSort(regionSlotsCounter.get(0).getRight(), regionSlotsCounter); - } - result.put(storageGroup, new SchemaPartitionTable(schemaPartitionMap)); - } - - return result; - } - - @Override - public Map<String, DataPartitionTable> allocateDataPartition( - Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> unassignedDataPartitionSlotsMap) - throws NoAvailableRegionGroupException { - Map<String, DataPartitionTable> result = new ConcurrentHashMap<>(); - - for (Map.Entry<String, Map<TSeriesPartitionSlot, TTimeSlotList>> slotsMapEntry : - unassignedDataPartitionSlotsMap.entrySet()) { - final String database = slotsMapEntry.getKey(); - final Map<TSeriesPartitionSlot, TTimeSlotList> unassignedPartitionSlotsMap = - slotsMapEntry.getValue(); - - // List<Pair<allocatedSlotsNum, TConsensusGroupId>> - List<Pair<Long, TConsensusGroupId>> regionSlotsCounter = - getPartitionManager() - .getSortedRegionGroupSlotsCounter(database, TConsensusGroupType.DataRegion); - - DataPartitionTable dataPartitionTable = new DataPartitionTable(); - - // Enumerate SeriesPartitionSlot - for (Map.Entry<TSeriesPartitionSlot, TTimeSlotList> seriesPartitionEntry : - unassignedPartitionSlotsMap.entrySet()) { - SeriesPartitionTable seriesPartitionTable = new SeriesPartitionTable(); - - // Enumerate TimePartitionSlot in ascending order - List<TTimePartitionSlot> timePartitionSlots = - seriesPartitionEntry.getValue().getTimePartitionSlots(); - timePartitionSlots.sort(Comparator.comparingLong(TTimePartitionSlot::getStartTime)); - for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) { - - /* 1. Inherit policy */ - if (ENABLE_DATA_PARTITION_INHERIT_POLICY) { - // Check if the current Partition's neighbor(predecessor or successor) - // is allocated in the same batch of Partition creation - TConsensusGroupId neighbor = - seriesPartitionTable.getAdjacentDataPartition( - timePartitionSlot, TIME_PARTITION_INTERVAL); - if (neighbor != null) { - seriesPartitionTable - .getSeriesPartitionMap() - .put(timePartitionSlot, Collections.singletonList(neighbor)); - bubbleSort(neighbor, regionSlotsCounter); - continue; - } - - // Check if the current Partition's neighbor(predecessor or successor) - // was allocated in the former Partition creation - neighbor = - getPartitionManager() - .getAdjacentDataPartition( - database, - seriesPartitionEntry.getKey(), - timePartitionSlot, - TIME_PARTITION_INTERVAL); - if (neighbor != null) { - seriesPartitionTable - .getSeriesPartitionMap() - .put(timePartitionSlot, Collections.singletonList(neighbor)); - bubbleSort(neighbor, regionSlotsCounter); - continue; - } - } - - /* 2. Greedy policy */ - seriesPartitionTable - .getSeriesPartitionMap() - .put( - timePartitionSlot, - Collections.singletonList(regionSlotsCounter.get(0).getRight())); - bubbleSort(regionSlotsCounter.get(0).getRight(), regionSlotsCounter); - } - dataPartitionTable - .getDataPartitionMap() - .put(seriesPartitionEntry.getKey(), seriesPartitionTable); - } - result.put(database, dataPartitionTable); - } - - return result; - } - - /** - * Bubble sort the regionSlotsCounter from the specified consensus group - * - * <p>Notice: Here we use bubble sort instead of other sorting algorithm is because that, there is - * only one Partition allocated in each loop. Therefore, only consider one consensus group weight - * change is enough - * - * @param consensusGroupId The consensus group where the new Partition is allocated - * @param regionSlotsCounter List<Pair<Allocated Partition num, TConsensusGroupId>> - */ - private void bubbleSort( - TConsensusGroupId consensusGroupId, List<Pair<Long, TConsensusGroupId>> regionSlotsCounter) { - // Find the corresponding consensus group - int index = 0; - for (int i = 0; i < regionSlotsCounter.size(); i++) { - if (regionSlotsCounter.get(i).getRight().equals(consensusGroupId)) { - index = i; - break; - } - } - - // Do bubble sort - regionSlotsCounter.get(index).setLeft(regionSlotsCounter.get(index).getLeft() + 1); - while (index < regionSlotsCounter.size() - 1 - && regionSlotsCounter.get(index).getLeft() > regionSlotsCounter.get(index + 1).getLeft()) { - Collections.swap(regionSlotsCounter, index, index + 1); - index += 1; - } - } - - private PartitionManager getPartitionManager() { - return configManager.getPartitionManager(); - } -} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/IPartitionAllocator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/IPartitionAllocator.java deleted file mode 100644 index 62ef53d9655..00000000000 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/IPartitionAllocator.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.confignode.manager.load.balancer.partition; - -import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; -import org.apache.iotdb.commons.partition.DataPartitionTable; -import org.apache.iotdb.commons.partition.SchemaPartitionTable; -import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException; -import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; - -import java.util.List; -import java.util.Map; - -/** - * The IPartitionAllocator is a functional interface, which means a new functional class who - * implements the IPartitionAllocator must be created for each Partition allocation. - */ -public interface IPartitionAllocator { - - /** - * Allocate SchemaPartitions - * - * @param unassignedSchemaPartitionSlotsMap SchemaPartitionSlots that should be assigned - * @return Map<StorageGroupName, SchemaPartitionTable>, the allocating result - */ - Map<String, SchemaPartitionTable> allocateSchemaPartition( - Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap) - throws NoAvailableRegionGroupException; - - /** - * Allocate DataPartitions - * - * @param unassignedDataPartitionSlotsMap DataPartitionSlots that should be assigned - * @return Map<StorageGroupName, DataPartitionTable>, the allocating result - */ - Map<String, DataPartitionTable> allocateDataPartition( - Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> unassignedDataPartitionSlotsMap) - throws NoAvailableRegionGroupException; -} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index fae18091d36..a671150a308 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -41,6 +41,7 @@ import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool; import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler; import org.apache.iotdb.confignode.conf.ConfigNodeConfig; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; +import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; import org.apache.iotdb.confignode.consensus.request.read.partition.CountTimeSlotListPlan; import org.apache.iotdb.confignode.consensus.request.read.partition.GetDataPartitionPlan; import org.apache.iotdb.confignode.consensus.request.read.partition.GetNodePathsPartitionPlan; @@ -85,6 +86,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq; import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.response.ConsensusReadResponse; +import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq; import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq; import org.apache.iotdb.rpc.RpcUtils; @@ -265,14 +267,13 @@ public class PartitionManager { // Cache allocating result only if the current ConfigNode still holds its leadership CreateSchemaPartitionPlan createPlan = new CreateSchemaPartitionPlan(); createPlan.setAssignedSchemaPartition(assignedSchemaPartition); - status = getConsensusManager().confirmLeader(); + + status = consensusWritePartitionResult(createPlan); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - // Here we check the leadership second time - // since the RegionGroup creating process might take some time + // The allocation might fail due to consensus error resp.setStatus(status); return resp; } - getConsensusManager().write(createPlan); } resp = (SchemaPartitionResp) getSchemaPartition(req); @@ -389,14 +390,16 @@ public class PartitionManager { // Cache allocating result only if the current ConfigNode still holds its leadership CreateDataPartitionPlan createPlan = new CreateDataPartitionPlan(); createPlan.setAssignedDataPartition(assignedDataPartition); - status = getConsensusManager().confirmLeader(); + + status = consensusWritePartitionResult(createPlan); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - // Here we check the leadership second time - // since the RegionGroup creating process might take some time + // The allocation might fail due to consensus error resp.setStatus(status); return resp; } - getConsensusManager().write(createPlan); + + // Statistical allocation result and re-balance the DataPartitionPolicy if necessary + getLoadManager().reBalancePartitionPolicyIfNecessary(assignedDataPartition); } resp = (DataPartitionResp) getDataPartition(req); @@ -431,6 +434,24 @@ public class PartitionManager { return resp; } + private TSStatus consensusWritePartitionResult(ConfigPhysicalPlan plan) { + TSStatus status = getConsensusManager().confirmLeader(); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + // Here we check the leadership second time + // since the RegionGroup creating process might take some time + return status; + } + + ConsensusWriteResponse writeResp = getConsensusManager().write(plan); + if (!writeResp.isSuccessful()) { + // The allocation might fail due to consensus error + status = writeResp.getStatus(); + status.setMessage(writeResp.getErrorMessage()); + LOGGER.error("Write DataPartition allocation result failed because: {}", status); + } + return status; + } + // ====================================================== // Leader scheduling interfaces // ====================================================== @@ -590,22 +611,20 @@ public class PartitionManager { } /** - * Only leader use this interface. Checks whether the specified DataPartition has a predecessor or - * successor and returns if it does + * Only leader use this interface. Checks whether the specified DataPartition has a successor and + * returns if it does. * * @param database DatabaseName * @param seriesPartitionSlot Corresponding SeriesPartitionSlot * @param timePartitionSlot Corresponding TimePartitionSlot - * @param timePartitionInterval Time partition interval * @return The specific DataPartition's predecessor if exists, null otherwise */ - public TConsensusGroupId getAdjacentDataPartition( + public TConsensusGroupId getSuccessorDataPartition( String database, TSeriesPartitionSlot seriesPartitionSlot, - TTimePartitionSlot timePartitionSlot, - long timePartitionInterval) { - return partitionInfo.getAdjacentDataPartition( - database, seriesPartitionSlot, timePartitionSlot, timePartitionInterval); + TTimePartitionSlot timePartitionSlot) { + return partitionInfo.getSuccessorDataPartition( + database, seriesPartitionSlot, timePartitionSlot); } /** @@ -711,6 +730,21 @@ public class PartitionManager { return partitionInfo.getRegionGroupCount(database, type); } + /** + * Only leader use this interface. + * + * <p>Get all the RegionGroups currently owned by the specified Database + * + * @param database DatabaseName + * @param type SchemaRegion or DataRegion + * @return List of TConsensusGroupId + * @throws DatabaseNotExistsException When the specified Database doesn't exist + */ + public List<TConsensusGroupId> getAllRegionGroupIds(String database, TConsensusGroupType type) + throws DatabaseNotExistsException { + return partitionInfo.getAllRegionGroupIds(database, type); + } + /** * Check if the specified Database exists. * @@ -1249,6 +1283,63 @@ public class PartitionManager { partitionInfo.getDataRegionIds(databases, dataRegionIds); } + /** + * Get the max TimePartitionSlot of the specified Database. + * + * @param database The specified Database + * @return The max TimePartitionSlot, null if the Database doesn't exist or there are no + * DataPartitions yet + */ + public TTimePartitionSlot getMaxTimePartitionSlot(String database) { + return partitionInfo.getMaxTimePartitionSlot(database); + } + + /** + * Get the min TimePartitionSlot of the specified Database. + * + * @param database The specified Database + * @return The last DataPartition, null if the Database doesn't exist or there are no + * DataPartitions in the specified SeriesPartitionSlot + */ + public TTimePartitionSlot getMinTimePartitionSlot(String database) { + return partitionInfo.getMinTimePartitionSlot(database); + } + + /** + * Get the DataPartition with max TimePartition of the specified Database and the + * SeriesPartitionSlot. + * + * @param database The specified Database + * @param seriesPartitionSlot The specified SeriesPartitionSlot + * @return The last DataPartition, null if the Database doesn't exist or there are no + * DataPartitions yet + */ + public Pair<TTimePartitionSlot, TConsensusGroupId> getLastDataPartition( + String database, TSeriesPartitionSlot seriesPartitionSlot) { + return partitionInfo.getLastDataPartition(database, seriesPartitionSlot); + } + + /** + * Count SeriesSlot in the specified TimePartitionSlot of the Database. + * + * @param database The specified Database + * @param timePartitionSlot The specified TimePartitionSlot + * @return The count of SeriesSlot + */ + public int countSeriesSlot(String database, TTimePartitionSlot timePartitionSlot) { + return partitionInfo.countSeriesSlot(database, timePartitionSlot); + } + + /** + * Get the last DataAllotTable of the specified Database. + * + * @param database The specified Database + * @return The last DataAllotTable + */ + public Map<TSeriesPartitionSlot, TConsensusGroupId> getLastDataAllotTable(String database) { + return partitionInfo.getLastDataAllotTable(database); + } + public ScheduledExecutorService getRegionMaintainer() { return regionMaintainer; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java index b2ca2faf6b7..c1516e6f7c9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java @@ -220,6 +220,27 @@ public class DatabasePartitionTable { return result.getAndIncrement(); } + /** + * Only leader use this interface. + * + * <p>Get all the RegionGroups currently owned by the specified Database + * + * @param type SchemaRegion or DataRegion + * @return List of TConsensusGroupId + */ + public List<TConsensusGroupId> getAllRegionGroupIds(TConsensusGroupType type) { + List<TConsensusGroupId> result = new Vector<>(); + regionGroupMap + .values() + .forEach( + regionGroup -> { + if (regionGroup.getId().getType().equals(type)) { + result.add(regionGroup.getId()); + } + }); + return result; + } + public int getAssignedSeriesPartitionSlotsCount() { return Math.max( schemaPartitionTable.getSchemaPartitionMap().size(), @@ -251,19 +272,15 @@ public class DatabasePartitionTable { } /** - * Checks whether the specified DataPartition has a predecessor and returns if it does. + * Checks whether the specified DataPartition has a successor and returns if it does. * * @param seriesPartitionSlot Corresponding SeriesPartitionSlot * @param timePartitionSlot Corresponding TimePartitionSlot - * @param timePartitionInterval Time partition interval * @return The specific DataPartition's predecessor if exists, null otherwise */ - public TConsensusGroupId getAdjacentDataPartition( - TSeriesPartitionSlot seriesPartitionSlot, - TTimePartitionSlot timePartitionSlot, - long timePartitionInterval) { - return dataPartitionTable.getAdjacentDataPartition( - seriesPartitionSlot, timePartitionSlot, timePartitionInterval); + public TConsensusGroupId getSuccessorDataPartition( + TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot timePartitionSlot) { + return dataPartitionTable.getSuccessorDataPartition(seriesPartitionSlot, timePartitionSlot); } /** @@ -553,6 +570,55 @@ public class DatabasePartitionTable { return dataRegionIds; } + /** + * Get the max TimePartitionSlot. + * + * @return The max TimePartitionSlot, null if there are no DataPartitions yet + */ + public TTimePartitionSlot getMaxTimePartitionSlot() { + return dataPartitionTable.getMaxTimePartitionSlot(); + } + + /** + * Get the min TimePartitionSlot. + * + * @return The min TimePartitionSlot, null if there are no DataPartitions yet + */ + public TTimePartitionSlot getMinTimePartitionSlot() { + return dataPartitionTable.getMinTimePartitionSlot(); + } + + /** + * Get the DataPartition with max TimePartition of the specified the SeriesPartitionSlot. + * + * @param seriesPartitionSlot The specified SeriesPartitionSlot + * @return The last DataPartition, null if there are no DataPartitions in the specified + * SeriesPartitionSlot + */ + public Pair<TTimePartitionSlot, TConsensusGroupId> getLastDataPartition( + TSeriesPartitionSlot seriesPartitionSlot) { + return dataPartitionTable.getLastDataPartition(seriesPartitionSlot); + } + + /** + * Count SeriesSlot in the specified TimePartitionSlot. + * + * @param timePartitionSlot The specified TimePartitionSlot + * @return The count of SeriesSlot + */ + public int countSeriesSlot(TTimePartitionSlot timePartitionSlot) { + return dataPartitionTable.countSeriesSlot(timePartitionSlot); + } + + /** + * Get the last DataAllotTable. + * + * @return The last DataAllotTable + */ + public Map<TSeriesPartitionSlot, TConsensusGroupId> getLastDataAllotTable() { + return dataPartitionTable.getLastDataAllotTable(); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java index d85c63e4e10..a0e75d29168 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java @@ -79,6 +79,7 @@ import java.nio.file.Files; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -385,24 +386,21 @@ public class PartitionInfo implements SnapshotProcessor { } /** - * Checks whether the specified DataPartition has a predecessor or successor and returns if it - * does. + * Checks whether the specified DataPartition has a successor and returns if it does. * * @param database DatabaseName * @param seriesPartitionSlot Corresponding SeriesPartitionSlot * @param timePartitionSlot Corresponding TimePartitionSlot - * @param timePartitionInterval Time partition interval * @return The specific DataPartition's predecessor if exists, null otherwise */ - public TConsensusGroupId getAdjacentDataPartition( + public TConsensusGroupId getSuccessorDataPartition( String database, TSeriesPartitionSlot seriesPartitionSlot, - TTimePartitionSlot timePartitionSlot, - long timePartitionInterval) { - if (databasePartitionTables.containsKey(database)) { + TTimePartitionSlot timePartitionSlot) { + if (isDatabaseExisted(database)) { return databasePartitionTables .get(database) - .getAdjacentDataPartition(seriesPartitionSlot, timePartitionSlot, timePartitionInterval); + .getSuccessorDataPartition(seriesPartitionSlot, timePartitionSlot); } else { return null; } @@ -722,6 +720,25 @@ public class PartitionInfo implements SnapshotProcessor { return databasePartitionTables.get(database).getRegionGroupCount(type); } + /** + * Only leader use this interface. + * + * <p>Get all the RegionGroups currently owned by the specified Database + * + * @param database DatabaseName + * @param type SchemaRegion or DataRegion + * @return List of TConsensusGroupId + * @throws DatabaseNotExistsException When the specified Database doesn't exist + */ + public List<TConsensusGroupId> getAllRegionGroupIds(String database, TConsensusGroupType type) + throws DatabaseNotExistsException { + if (!isDatabaseExisted(database)) { + throw new DatabaseNotExistsException(database); + } + + return databasePartitionTables.get(database).getAllRegionGroupIds(type); + } + /** * Only leader use this interface. * @@ -761,14 +778,14 @@ public class PartitionInfo implements SnapshotProcessor { /** * Only leader use this interface. * - * @param storageGroup StorageGroupName + * @param database DatabaseName * @param type SchemaRegion or DataRegion * @return The StorageGroup's Running or Available Regions that sorted by the number of allocated * slots */ public List<Pair<Long, TConsensusGroupId>> getRegionGroupSlotsCounter( - String storageGroup, TConsensusGroupType type) { - return databasePartitionTables.get(storageGroup).getRegionGroupSlotsCounter(type); + String database, TConsensusGroupType type) { + return databasePartitionTables.get(database).getRegionGroupSlotsCounter(type); } /** @@ -784,6 +801,78 @@ public class PartitionInfo implements SnapshotProcessor { return schemaPartitionSet; } + /** + * Get the max TimePartitionSlot of the specified Database. + * + * @param database The specified Database + * @return The max TimePartitionSlot, null if the Database doesn't exist or there are no + * DataPartitions yet + */ + public TTimePartitionSlot getMaxTimePartitionSlot(String database) { + if (isDatabaseExisted(database)) { + return databasePartitionTables.get(database).getMaxTimePartitionSlot(); + } + return null; + } + + /** + * Get the min TimePartitionSlot of the specified Database. + * + * @param database The specified Database + * @return The min TimePartitionSlot, null if the Database doesn't exist or there are no + * DataPartitions yet + */ + public TTimePartitionSlot getMinTimePartitionSlot(String database) { + if (isDatabaseExisted(database)) { + return databasePartitionTables.get(database).getMinTimePartitionSlot(); + } + return null; + } + + /** + * Get the DataPartition with max TimePartition of the specified Database and the + * SeriesPartitionSlot. + * + * @param database The specified Database + * @param seriesPartitionSlot The specified SeriesPartitionSlot + * @return The last DataPartition, null if the Database doesn't exist or there are no + * DataPartitions in the specified SeriesPartitionSlot + */ + public Pair<TTimePartitionSlot, TConsensusGroupId> getLastDataPartition( + String database, TSeriesPartitionSlot seriesPartitionSlot) { + if (isDatabaseExisted(database)) { + return databasePartitionTables.get(database).getLastDataPartition(seriesPartitionSlot); + } + return null; + } + + /** + * Count SeriesSlot in the specified TimePartitionSlot of the Database. + * + * @param database The specified Database + * @param timePartitionSlot The specified TimePartitionSlot + * @return The count of SeriesSlot + */ + public int countSeriesSlot(String database, TTimePartitionSlot timePartitionSlot) { + if (isDatabaseExisted(database)) { + return databasePartitionTables.get(database).countSeriesSlot(timePartitionSlot); + } + return 0; + } + + /** + * Get the last DataAllotTable of the specified Database. + * + * @param database The specified Database + * @return The last DataAllotTable + */ + public Map<TSeriesPartitionSlot, TConsensusGroupId> getLastDataAllotTable(String database) { + if (isDatabaseExisted(database)) { + return databasePartitionTables.get(database).getLastDataAllotTable(); + } + return new HashMap<>(); + } + @Override public boolean processTakeSnapshot(File snapshotDir) throws TException, IOException { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java index 78abfbca2dc..3072746b378 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java @@ -201,6 +201,12 @@ public class CreateRegionGroupsProcedure setNextState(CreateRegionGroupsState.CREATE_REGION_GROUPS_FINISH); break; case CREATE_REGION_GROUPS_FINISH: + // Update all corresponding DataAllotTables + createRegionGroupsPlan + .getRegionGroupMap() + .keySet() + .forEach( + database -> env.getConfigManager().getLoadManager().updateDataAllotTable(database)); return Flow.NO_MORE_STATE; } diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties index 2a6ead060ec..08cc77c3c35 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties @@ -128,13 +128,6 @@ cluster_name=defaultCluster # data_region_per_data_node=5.0 -# Whether to enable the DataPartition inherit policy. -# DataPartition within the same SeriesPartitionSlot will inherit the allocation result of -# the predecessor or successor TimePartitionSlot if set true -# Datatype: Boolean -# enable_data_partition_inherit_policy=true - - # The policy of cluster RegionGroups' leader distribution. # E.g. we should balance cluster RegionGroups' leader distribution when some DataNodes are shutdown or re-connected. # These policies are currently supported: diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java index 1a5611536ae..6c2108f5898 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java @@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; +import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.apache.thrift.TException; @@ -34,12 +35,15 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; public class DataPartitionTable { @@ -99,22 +103,16 @@ public class DataPartitionTable { } /** - * Checks whether the specified DataPartition has a predecessor or successor and returns if it - * does + * Checks whether the specified DataPartition has a successor and returns if it does. * * @param seriesPartitionSlot Corresponding SeriesPartitionSlot * @param timePartitionSlot Corresponding TimePartitionSlot - * @param timePartitionInterval Time partition interval * @return The specific DataPartition's predecessor if exists, null otherwise */ - public TConsensusGroupId getAdjacentDataPartition( - TSeriesPartitionSlot seriesPartitionSlot, - TTimePartitionSlot timePartitionSlot, - long timePartitionInterval) { + public TConsensusGroupId getSuccessorDataPartition( + TSeriesPartitionSlot seriesPartitionSlot, TTimePartitionSlot timePartitionSlot) { if (dataPartitionMap.containsKey(seriesPartitionSlot)) { - return dataPartitionMap - .get(seriesPartitionSlot) - .getAdjacentDataPartition(timePartitionSlot, timePartitionInterval); + return dataPartitionMap.get(seriesPartitionSlot).getSuccessorDataPartition(timePartitionSlot); } else { return null; } @@ -236,6 +234,111 @@ public class DataPartitionTable { .collect(Collectors.toList()); } + /** + * Get the max TimePartitionSlot of the specified Database. + * + * @return The max TimePartitionSlot, null if there are no DataPartitions yet + */ + public TTimePartitionSlot getMaxTimePartitionSlot() { + AtomicReference<TTimePartitionSlot> maxTimeSlot = + new AtomicReference<>(new TTimePartitionSlot(0)); + dataPartitionMap + .values() + .forEach( + seriesPartitionTable -> { + TTimePartitionSlot timePartitionSlot = seriesPartitionTable.getMaxTimePartitionSlot(); + if (timePartitionSlot != null + && timePartitionSlot.getStartTime() > maxTimeSlot.get().getStartTime()) { + maxTimeSlot.set(timePartitionSlot); + } + }); + return maxTimeSlot.get().getStartTime() > 0 ? maxTimeSlot.get() : null; + } + + /** + * Get the min TimePartitionSlot of the specified Database. + * + * @return The min TimePartitionSlot, null if there are no DataPartitions yet + */ + public TTimePartitionSlot getMinTimePartitionSlot() { + AtomicReference<TTimePartitionSlot> minTimeSlot = + new AtomicReference<>(new TTimePartitionSlot(Long.MAX_VALUE)); + dataPartitionMap + .values() + .forEach( + seriesPartitionTable -> { + TTimePartitionSlot timePartitionSlot = seriesPartitionTable.getMinTimePartitionSlot(); + if (timePartitionSlot != null + && timePartitionSlot.getStartTime() < minTimeSlot.get().getStartTime()) { + minTimeSlot.set(timePartitionSlot); + } + }); + return minTimeSlot.get().getStartTime() < Long.MAX_VALUE ? minTimeSlot.get() : null; + } + + /** + * Get the DataPartition with max TimePartition of the specified Database and the + * SeriesPartitionSlot. + * + * @param seriesPartitionSlot The specified SeriesPartitionSlot + * @return The last DataPartition, null if there are no DataPartitions in the specified + * SeriesPartitionSlot + */ + public Pair<TTimePartitionSlot, TConsensusGroupId> getLastDataPartition( + TSeriesPartitionSlot seriesPartitionSlot) { + if (dataPartitionMap.containsKey(seriesPartitionSlot)) { + return dataPartitionMap.get(seriesPartitionSlot).getLastDataPartition(); + } else { + return null; + } + } + + /** + * Count SeriesSlot in the specified TimePartitionSlot of the Database. + * + * @param timePartitionSlot The specified TimePartitionSlot + * @return The count of SeriesSlot + */ + public int countSeriesSlot(TTimePartitionSlot timePartitionSlot) { + AtomicInteger count = new AtomicInteger(0); + dataPartitionMap + .values() + .forEach( + seriesPartitionTable -> + count.addAndGet(seriesPartitionTable.isDataPartitionExist(timePartitionSlot))); + return count.get(); + } + + /** + * Get the last DataAllotTable. + * + * @return The last DataAllotTable + */ + public Map<TSeriesPartitionSlot, TConsensusGroupId> getLastDataAllotTable() { + Map<TSeriesPartitionSlot, TConsensusGroupId> result = new HashMap<>(); + dataPartitionMap.forEach( + (seriesPartitionSlot, seriesPartitionTable) -> + result.put(seriesPartitionSlot, seriesPartitionTable.getLastConsensusGroupId())); + return result; + } + + /** + * Get the number of DataPartitions in each TimePartitionSlot + * + * @return Map<TimePartitionSlot, the number of DataPartitions> + */ + public Map<TTimePartitionSlot, Integer> getTimeSlotCountMap() { + Map<TTimePartitionSlot, Integer> result = new ConcurrentHashMap<>(); + dataPartitionMap.forEach( + (seriesPartitionSlot, seriesPartitionTable) -> + seriesPartitionTable + .getTimeSlotCountMap() + .forEach( + (timePartitionSlot, count) -> + result.merge(timePartitionSlot, count, Integer::sum))); + return result; + } + public void serialize(OutputStream outputStream, TProtocol protocol) throws IOException, TException { ReadWriteIOUtils.write(dataPartitionMap.size(), outputStream); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java index 47216cc41ce..def3cc07fd6 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java @@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; +import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.apache.thrift.TException; @@ -33,10 +34,12 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Objects; +import java.util.TreeMap; import java.util.Vector; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -45,20 +48,24 @@ import java.util.stream.Collectors; public class SeriesPartitionTable { - private final Map<TTimePartitionSlot, List<TConsensusGroupId>> seriesPartitionMap; + private final TreeMap<TTimePartitionSlot, List<TConsensusGroupId>> seriesPartitionMap; public SeriesPartitionTable() { - this.seriesPartitionMap = new ConcurrentHashMap<>(); + this.seriesPartitionMap = new TreeMap<>(); } public SeriesPartitionTable(Map<TTimePartitionSlot, List<TConsensusGroupId>> seriesPartitionMap) { - this.seriesPartitionMap = seriesPartitionMap; + this.seriesPartitionMap = new TreeMap<>(seriesPartitionMap); } public Map<TTimePartitionSlot, List<TConsensusGroupId>> getSeriesPartitionMap() { return seriesPartitionMap; } + public void putDataPartition(TTimePartitionSlot timePartitionSlot, TConsensusGroupId groupId) { + seriesPartitionMap.computeIfAbsent(timePartitionSlot, empty -> new ArrayList<>()).add(groupId); + } + /** * Thread-safely get DataPartition within the specific StorageGroup * @@ -119,30 +126,14 @@ public class SeriesPartitionTable { } /** - * Checks whether the specified DataPartition has a predecessor or successor and returns if it - * does + * Check and return the specified DataPartition's successor * * @param timePartitionSlot Corresponding TimePartitionSlot - * @param timePartitionInterval Time partition interval - * @return The specific DataPartition's predecessor if exists, null otherwise + * @return The specified DataPartition's successor if exists, null otherwise */ - public TConsensusGroupId getAdjacentDataPartition( - TTimePartitionSlot timePartitionSlot, long timePartitionInterval) { - if (timePartitionSlot.getStartTime() >= timePartitionInterval) { - // Check predecessor first - TTimePartitionSlot predecessorSlot = - new TTimePartitionSlot(timePartitionSlot.getStartTime() - timePartitionInterval); - TConsensusGroupId predecessor = - seriesPartitionMap.getOrDefault(predecessorSlot, Collections.singletonList(null)).get(0); - if (predecessor != null) { - return predecessor; - } - } - - // Check successor - TTimePartitionSlot successorSlot = - new TTimePartitionSlot(timePartitionSlot.getStartTime() + timePartitionInterval); - return seriesPartitionMap.getOrDefault(successorSlot, Collections.singletonList(null)).get(0); + public TConsensusGroupId getSuccessorDataPartition(TTimePartitionSlot timePartitionSlot) { + TTimePartitionSlot successorSlot = seriesPartitionMap.higherKey(timePartitionSlot); + return successorSlot == null ? null : seriesPartitionMap.get(successorSlot).get(0); } /** @@ -225,6 +216,84 @@ public class SeriesPartitionTable { return result; } + /** + * Get the max TimePartitionSlot of the specified Database. + * + * @return The max TimePartitionSlot, null if there are no DataPartitions yet + */ + public TTimePartitionSlot getMaxTimePartitionSlot() { + try { + return seriesPartitionMap.lastKey(); + } catch (NoSuchElementException e) { + return null; + } + } + + /** + * Get the min TimePartitionSlot of the specified Database. + * + * @return The min TimePartitionSlot, null if there are no DataPartitions yet + */ + public TTimePartitionSlot getMinTimePartitionSlot() { + try { + return seriesPartitionMap.firstKey(); + } catch (NoSuchElementException e) { + return null; + } + } + + /** + * Get the DataPartition with max TimePartition of the specified Database and the + * SeriesPartitionSlot. + * + * @return The last DataPartition, null if there are no DataPartitions + */ + public Pair<TTimePartitionSlot, TConsensusGroupId> getLastDataPartition() { + try { + Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> lastEntry = + seriesPartitionMap.lastEntry(); + return new Pair<>( + lastEntry.getKey(), lastEntry.getValue().get(lastEntry.getValue().size() - 1)); + } catch (NoSuchElementException e) { + return null; + } + } + + /** + * Check whether the specified DataPartition exists + * + * @param timePartitionSlot Corresponding TimePartitionSlot + * @return 1 if exists, 0 otherwise + */ + public int isDataPartitionExist(TTimePartitionSlot timePartitionSlot) { + return seriesPartitionMap.containsKey(timePartitionSlot) ? 1 : 0; + } + + /** + * Get the last DataPartition's ConsensusGroupId + * + * @return The last DataPartition's ConsensusGroupId, null if there are no DataPartitions yet + */ + public TConsensusGroupId getLastConsensusGroupId() { + try { + return seriesPartitionMap.lastEntry().getValue().get(0); + } catch (NoSuchElementException e) { + return null; + } + } + + /** + * Get the number of DataPartitions in each TimePartitionSlot + * + * @return Map<TimePartitionSlot, the number of DataPartitions> + */ + public Map<TTimePartitionSlot, Integer> getTimeSlotCountMap() { + Map<TTimePartitionSlot, Integer> result = new HashMap<>(); + seriesPartitionMap.forEach( + (timePartitionSlot, consensusGroupIds) -> result.put(timePartitionSlot, 1)); + return result; + } + public void serialize(OutputStream outputStream, TProtocol protocol) throws IOException, TException { ReadWriteIOUtils.write(seriesPartitionMap.size(), outputStream); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java new file mode 100644 index 00000000000..f1ecdc3a5fb --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.structure; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; +import java.util.TreeMap; + +public class BalanceTreeMap<K, V extends Comparable<V>> { + + private final HashMap<K, V> keyValueMap; + private final TreeMap<V, Set<K>> valueKeyMap; + + public BalanceTreeMap() { + this.keyValueMap = new HashMap<>(); + this.valueKeyMap = new TreeMap<>(); + } + + /** + * Put or modify a key-value pair. + * + * @param key Key + * @param value Value + */ + public void put(K key, V value) { + V oldValue = keyValueMap.getOrDefault(key, null); + + // Update keyValueMap + keyValueMap.put(key, value); + + // Update valueKeyMap + if (oldValue != null) { + valueKeyMap.get(oldValue).remove(key); + if (valueKeyMap.get(oldValue).isEmpty()) { + valueKeyMap.remove(oldValue); + } + } + valueKeyMap.computeIfAbsent(value, empty -> new HashSet<>()).add(key); + } + + public V get(K key) { + return keyValueMap.getOrDefault(key, null); + } + + public boolean containsKey(K key) { + return keyValueMap.containsKey(key); + } + + /** + * Get key with minimum value. + * + * @return Key with minimum value + */ + public K getKeyWithMinValue() { + return valueKeyMap.firstEntry().getValue().iterator().next(); + } +}
