This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch Computing-resource-balancing_cp in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b2dd4d1b124aed464e365d8b082ddaeda3307c69 Author: YongzaoDan <[email protected]> AuthorDate: Mon Jul 31 16:28:26 2023 +0800 resolve conversations --- .../partition/IoTDBPartitionInheritPolicyIT.java | 5 ++ .../iotdb/confignode/manager/load/LoadManager.java | 4 +- .../manager/load/balancer/PartitionBalancer.java | 27 ++++--- .../persistence/partition/PartitionInfo.java | 3 +- .../impl/schema/DeleteDatabaseProcedure.java | 4 +- .../commons/partition/DataPartitionEntry.java | 87 ---------------------- .../iotdb/commons/structure/BalanceTreeMap.java | 38 ++++++---- .../commons/partition/DataPartitionEntryTest.java | 58 --------------- 8 files changed, 49 insertions(+), 177 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java index bdb233950e2..2efd5db7a0b 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java @@ -45,6 +45,7 @@ import java.util.Map; import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; @RunWith(IoTDBTestRunner.class) @Category({ClusterIT.class}) @@ -170,6 +171,7 @@ public class IoTDBPartitionInheritPolicyIT { counter.forEach((groupId, num) -> Assert.assertEquals(expectedPartitionNum2, num.intValue())); // Test DataPartition inherit policy + AtomicInteger inheritedSeriesSlotNum = new AtomicInteger(0); Map<TSeriesPartitionSlot, TConsensusGroupId> dataAllotTable2 = new ConcurrentHashMap<>(); dataPartitionTableResp .getDataPartitionTable() @@ -193,9 +195,12 @@ public class IoTDBPartitionInheritPolicyIT { // The DataRegionGroup has been inherited Assert.assertTrue(dataAllotTable1.containsKey(seriesPartitionSlot)); Assert.assertEquals(dataAllotTable1.get(seriesPartitionSlot), groupId); + inheritedSeriesSlotNum.incrementAndGet(); } dataAllotTable2.put(seriesPartitionSlot, groupId); })); + // Exactly half of the SeriesSlots are inherited + Assert.assertEquals(testSeriesSlotNum / 2, inheritedSeriesSlotNum.get()); // Test3: historical DataPartitions will inherit successor Random random = new Random(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java index e3472e2e0f2..b4c291c97e9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java @@ -152,8 +152,8 @@ public class LoadManager { partitionBalancer.clearPartitionBalancer(); } - public void clearPartitionBalancer() { - partitionBalancer.clearPartitionBalancer(); + public void clearDataPartitionPolicyTable(String database) { + partitionBalancer.clearDataPartitionPolicyTable(database); } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java index 42ac541655c..78be7046de5 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java @@ -40,6 +40,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -55,11 +56,11 @@ public class PartitionBalancer { private final IManager configManager; // Map<DatabaseName, DataPartitionPolicyTable> - private final Map<String, DataPartitionPolicyTable> dataAllotTableMap; + private final Map<String, DataPartitionPolicyTable> dataPartitionPolicyTableMap; public PartitionBalancer(IManager configManager) { this.configManager = configManager; - this.dataAllotTableMap = new ConcurrentHashMap<>(); + this.dataPartitionPolicyTableMap = new ConcurrentHashMap<>(); } /** @@ -71,7 +72,7 @@ public class PartitionBalancer { public Map<String, SchemaPartitionTable> allocateSchemaPartition( Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap) throws NoAvailableRegionGroupException { - Map<String, SchemaPartitionTable> result = new ConcurrentHashMap<>(); + Map<String, SchemaPartitionTable> result = new HashMap<>(); for (Map.Entry<String, List<TSeriesPartitionSlot>> slotsMapEntry : unassignedSchemaPartitionSlotsMap.entrySet()) { @@ -89,7 +90,7 @@ public class PartitionBalancer { } // Enumerate SeriesPartitionSlot - Map<TSeriesPartitionSlot, TConsensusGroupId> schemaPartitionMap = new ConcurrentHashMap<>(); + Map<TSeriesPartitionSlot, TConsensusGroupId> schemaPartitionMap = new HashMap<>(); for (TSeriesPartitionSlot seriesPartitionSlot : unassignedPartitionSlots) { // Greedy allocation: allocate the unassigned SchemaPartition to // the RegionGroup whose allocated SchemaPartitions is the least @@ -112,7 +113,7 @@ public class PartitionBalancer { public Map<String, DataPartitionTable> allocateDataPartition( Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> unassignedDataPartitionSlotsMap) throws NoAvailableRegionGroupException { - Map<String, DataPartitionTable> result = new ConcurrentHashMap<>(); + Map<String, DataPartitionTable> result = new HashMap<>(); for (Map.Entry<String, Map<TSeriesPartitionSlot, TTimeSlotList>> slotsMapEntry : unassignedDataPartitionSlotsMap.entrySet()) { @@ -130,7 +131,7 @@ public class PartitionBalancer { counter.put(pair.getRight(), pair.getLeft().intValue()); } - DataPartitionPolicyTable allotTable = dataAllotTableMap.get(database); + DataPartitionPolicyTable allotTable = dataPartitionPolicyTableMap.get(database); allotTable.acquireLock(); DataPartitionTable dataPartitionTable = new DataPartitionTable(); @@ -205,7 +206,7 @@ public class PartitionBalancer { */ public void reBalanceDataPartitionPolicy(String database) { try { - dataAllotTableMap + dataPartitionPolicyTableMap .computeIfAbsent(database, empty -> new DataPartitionPolicyTable()) .reBalanceDataPartitionPolicy( getPartitionManager().getAllRegionGroupIds(database, TConsensusGroupType.DataRegion)); @@ -216,13 +217,13 @@ public class PartitionBalancer { /** Set up the PartitionBalancer when the current ConfigNode becomes leader. */ public void setupPartitionBalancer() { - dataAllotTableMap.clear(); + dataPartitionPolicyTableMap.clear(); getClusterSchemaManager() .getDatabaseNames() .forEach( database -> { - dataAllotTableMap.put(database, new DataPartitionPolicyTable()); - DataPartitionPolicyTable dataPartitionPolicyTable = dataAllotTableMap.get(database); + DataPartitionPolicyTable dataPartitionPolicyTable = new DataPartitionPolicyTable(); + dataPartitionPolicyTableMap.put(database, dataPartitionPolicyTable); try { // Put all DataRegionGroups into the DataPartitionPolicyTable dataPartitionPolicyTable.reBalanceDataPartitionPolicy( @@ -239,7 +240,11 @@ public class PartitionBalancer { /** Clear the PartitionBalancer when the current ConfigNode is no longer the leader. */ public void clearPartitionBalancer() { - dataAllotTableMap.clear(); + dataPartitionPolicyTableMap.clear(); + } + + public void clearDataPartitionPolicyTable(String database) { + dataPartitionPolicyTableMap.remove(database); } private ClusterSchemaManager getClusterSchemaManager() { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java index 66ab605e066..d61396a4c7a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java @@ -79,7 +79,6 @@ import java.nio.file.Files; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -832,7 +831,7 @@ public class PartitionInfo implements SnapshotProcessor { if (isDatabaseExisted(database)) { return databasePartitionTables.get(database).getLastDataAllotTable(); } - return new HashMap<>(); + return Collections.emptyMap(); } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java index 864f7a3c186..7ea774cc33c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java @@ -206,7 +206,9 @@ public class DeleteDatabaseProcedure LOG.info( "[DeleteDatabaseProcedure] Database: {} is deleted successfully", deleteDatabaseSchema.getName()); - env.getConfigManager().getLoadManager().clearPartitionBalancer(); + env.getConfigManager() + .getLoadManager() + .clearDataPartitionPolicyTable(deleteDatabaseSchema.getName()); return Flow.NO_MORE_STATE; } else if (getCycles() > RETRY_THRESHOLD) { setFailure( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionEntry.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionEntry.java deleted file mode 100644 index e94f6eed085..00000000000 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionEntry.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.commons.partition; - -import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; -import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; -import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; - -import java.security.SecureRandom; -import java.util.Objects; - -public class DataPartitionEntry implements Comparable<DataPartitionEntry> { - - private final TSeriesPartitionSlot seriesPartitionSlot; - private final TTimePartitionSlot timePartitionSlot; - private final TConsensusGroupId dataRegionGroup; - private final int weight; - - public DataPartitionEntry( - TSeriesPartitionSlot seriesPartitionSlot, - TTimePartitionSlot timePartitionSlot, - TConsensusGroupId dataRegionGroup) { - this.seriesPartitionSlot = seriesPartitionSlot; - this.timePartitionSlot = timePartitionSlot; - this.dataRegionGroup = dataRegionGroup; - this.weight = new SecureRandom().nextInt(); - } - - public TSeriesPartitionSlot getSeriesPartitionSlot() { - return seriesPartitionSlot; - } - - public TTimePartitionSlot getTimePartitionSlot() { - return timePartitionSlot; - } - - public TConsensusGroupId getDataRegionGroup() { - return dataRegionGroup; - } - - @Override - public int compareTo(DataPartitionEntry o) { - // The timePartitionSlot will be in descending order - // After invoke Collections.sort() - if (!timePartitionSlot.equals(o.timePartitionSlot)) { - return o.timePartitionSlot.compareTo(timePartitionSlot); - } - return Integer.compare(weight, o.weight); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - DataPartitionEntry that = (DataPartitionEntry) o; - return weight == that.weight - && seriesPartitionSlot.equals(that.seriesPartitionSlot) - && timePartitionSlot.equals(that.timePartitionSlot) - && dataRegionGroup.equals(that.dataRegionGroup); - } - - @Override - public int hashCode() { - return Objects.hash(seriesPartitionSlot, timePartitionSlot, dataRegionGroup, weight); - } -} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java index 09a49898635..1fe5ce1bcdf 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java @@ -26,14 +26,21 @@ import java.util.HashSet; import java.util.Set; import java.util.TreeMap; +/** + * This class is used to store key-value pairs. It supports the following operations: 1. Put a + * key-value pair. 2. Get key with minimum value. + * + * @param <K> The type of Key + * @param <V> The type of Value, should be Comparable + */ public class BalanceTreeMap<K, V extends Comparable<V>> { private final HashMap<K, V> keyValueMap; - private final TreeMap<V, Set<K>> valueKeyMap; + private final TreeMap<V, Set<K>> valueKeysMap; public BalanceTreeMap() { this.keyValueMap = new HashMap<>(); - this.valueKeyMap = new TreeMap<>(); + this.valueKeysMap = new TreeMap<>(); } /** @@ -43,19 +50,18 @@ public class BalanceTreeMap<K, V extends Comparable<V>> { * @param value Value */ public void put(K key, V value) { - V oldValue = keyValueMap.getOrDefault(key, null); - // Update keyValueMap - keyValueMap.put(key, value); + V oldValue = keyValueMap.put(key, value); // Update valueKeyMap if (oldValue != null) { - valueKeyMap.get(oldValue).remove(key); - if (valueKeyMap.get(oldValue).isEmpty()) { - valueKeyMap.remove(oldValue); + Set<K> keysSet = valueKeysMap.get(oldValue); + keysSet.remove(key); + if (keysSet.isEmpty()) { + valueKeysMap.remove(oldValue); } } - valueKeyMap.computeIfAbsent(value, empty -> new HashSet<>()).add(key); + valueKeysMap.computeIfAbsent(value, empty -> new HashSet<>()).add(key); } /** @@ -64,7 +70,7 @@ public class BalanceTreeMap<K, V extends Comparable<V>> { * @return Key with minimum value */ public K getKeyWithMinValue() { - return valueKeyMap.firstEntry().getValue().iterator().next(); + return valueKeysMap.firstEntry().getValue().iterator().next(); } public V get(K key) { @@ -81,18 +87,18 @@ public class BalanceTreeMap<K, V extends Comparable<V>> { @TestOnly public void remove(K key) { - V value = keyValueMap.getOrDefault(key, null); + V value = keyValueMap.remove(key); if (value != null) { - keyValueMap.remove(key); - valueKeyMap.get(value).remove(key); - if (valueKeyMap.get(value).isEmpty()) { - valueKeyMap.remove(value); + Set<K> keysSet = valueKeysMap.get(value); + keysSet.remove(key); + if (keysSet.isEmpty()) { + valueKeysMap.remove(value); } } } @TestOnly public boolean isEmpty() { - return keyValueMap.isEmpty() && valueKeyMap.isEmpty(); + return keyValueMap.isEmpty() && valueKeysMap.isEmpty(); } } diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/partition/DataPartitionEntryTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/partition/DataPartitionEntryTest.java deleted file mode 100644 index c92d4564b76..00000000000 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/partition/DataPartitionEntryTest.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.commons.partition; - -import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; -import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; -import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; -import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; -import org.apache.iotdb.commons.conf.CommonDescriptor; - -import org.junit.Assert; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -public class DataPartitionEntryTest { - - private static final int SERIES_SLOT_NUM = 1000; - private static final long TIME_PARTITION_INTERVAL = - CommonDescriptor.getInstance().getConfig().getTimePartitionInterval(); - - @Test - public void testOrder() { - List<DataPartitionEntry> entries = new ArrayList<>(); - for (int i = 0; i < SERIES_SLOT_NUM; i++) { - entries.add( - new DataPartitionEntry( - new TSeriesPartitionSlot(i), - new TTimePartitionSlot(TIME_PARTITION_INTERVAL * i), - new TConsensusGroupId(TConsensusGroupType.DataRegion, i))); - } - - List<DataPartitionEntry> sortedEntries = new ArrayList<>(entries); - Collections.sort(sortedEntries); - for (int i = 0; i < SERIES_SLOT_NUM; i++) { - Assert.assertEquals(entries.get(SERIES_SLOT_NUM - i - 1), sortedEntries.get(i)); - } - } -}
