This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch Computing-resource-balancing_cp in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit aa03eac9f0bc160a6103e76672946c0cc6eedd34 Author: YongzaoDan <[email protected]> AuthorDate: Fri Jul 21 20:42:05 2023 +0800 Pass UT --- .../manager/load/balancer/PartitionBalancer.java | 4 +- .../load/balancer/partition/DataAllotTable.java | 41 +++-- .../partition/DatabasePartitionTable.java | 13 +- .../statemachine/CreateRegionGroupsProcedure.java | 35 ++-- .../balancer/partition/DataAllotTableTest.java | 177 +++++++++++++++++++++ .../commons/partition/SeriesPartitionTable.java | 18 +-- .../iotdb/commons/structure/BalanceTreeMap.java | 33 +++- .../commons/structure/BalanceTreeMapTest.java | 82 ++++++++++ 8 files changed, 349 insertions(+), 54 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java index f0d408e91cd..963ec39edd8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java @@ -243,7 +243,9 @@ public class PartitionBalancer { */ public void updateDataAllotTable(String database) { TTimePartitionSlot currentTimePartition = - dataAllotTableMap.get(database).getCurrentTimePartition(); + dataAllotTableMap + .computeIfAbsent(database, empty -> new DataAllotTable()) + .getCurrentTimePartition(); Map<TSeriesPartitionSlot, TConsensusGroupId> allocatedTable = new ConcurrentHashMap<>(); for (int i = 0; i < SERIES_SLOT_NUM; i++) { TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java index f334f490888..0507272f6c3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java @@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; public class DataAllotTable { @@ -102,16 +103,26 @@ public class DataAllotTable { Map<TSeriesPartitionSlot, TConsensusGroupId> newAllotTable = new HashMap<>(); for (TSeriesPartitionSlot seriesPartitionSlot : seriesSlotList) { + if (allocatedTable.containsKey(seriesPartitionSlot)) { + // If the SeriesSlot has already been allocated, keep the allocation + newAllotTable.put(seriesPartitionSlot, allocatedTable.get(seriesPartitionSlot)); + continue; + } + TConsensusGroupId oldRegionGroupId = dataAllotTable.get(seriesPartitionSlot); - if (counter.get(oldRegionGroupId) < mu) { + if (oldRegionGroupId != null + && counter.containsKey(oldRegionGroupId) + && counter.get(oldRegionGroupId) < mu) { // Inherit the oldRegionGroupId when the slotNum of oldRegionGroupId is less than average newAllotTable.put(seriesPartitionSlot, oldRegionGroupId); - } else { - // Otherwise, choose the regionGroup with the least slotNum to keep load balance - TConsensusGroupId newRegionGroupId = counter.getKeyWithMinValue(); - newAllotTable.put(seriesPartitionSlot, newRegionGroupId); - counter.put(newRegionGroupId, counter.get(newRegionGroupId) + 1); + counter.put(oldRegionGroupId, counter.get(oldRegionGroupId) + 1); + continue; } + + // Otherwise, choose the regionGroup with the least slotNum to keep load balance + TConsensusGroupId newRegionGroupId = counter.getKeyWithMinValue(); + newAllotTable.put(seriesPartitionSlot, newRegionGroupId); + counter.put(newRegionGroupId, counter.get(newRegionGroupId) + 1); } dataAllotTable.clear(); @@ -128,6 +139,7 @@ public class DataAllotTable { * @return whether the current time partition is updated */ public boolean updateCurrentTimePartition(int regionGroupNum) { + int threshold = timePartitionThreshold(regionGroupNum); dataAllotTableLock.writeLock().lock(); try { AtomicLong newStartTime = new AtomicLong(Long.MIN_VALUE); @@ -135,23 +147,18 @@ public class DataAllotTable { (timePartition, counter) -> { // Select the maximum TimePartition whose slotNum is greater than the following equation // Ensure that the remaining slots can be still distributed to new regionGroups - if (counter.get() >= timePartitionThreshold(regionGroupNum) - && timePartition.getStartTime() > newStartTime.get()) { + if (counter.get() >= threshold && timePartition.getStartTime() > newStartTime.get()) { newStartTime.set(timePartition.getStartTime()); } }); if (newStartTime.get() > currentTimePartition.get().getStartTime()) { currentTimePartition.set(new TTimePartitionSlot(newStartTime.get())); - dataPartitionCounter - .keySet() - .forEach( - timePartition -> { - // Remove the useless TimePartitions - if (timePartition.getStartTime() < newStartTime.get()) { - dataPartitionCounter.remove(timePartition); - } - }); + List<TTimePartitionSlot> removeTimePartitionSlots = + dataPartitionCounter.keySet().stream() + .filter(timePartition -> timePartition.getStartTime() < newStartTime.get()) + .collect(Collectors.toList()); + removeTimePartitionSlots.forEach(dataPartitionCounter::remove); return true; } } finally { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java index c1516e6f7c9..9782356b595 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java @@ -229,16 +229,9 @@ public class DatabasePartitionTable { * @return List of TConsensusGroupId */ public List<TConsensusGroupId> getAllRegionGroupIds(TConsensusGroupType type) { - List<TConsensusGroupId> result = new Vector<>(); - regionGroupMap - .values() - .forEach( - regionGroup -> { - if (regionGroup.getId().getType().equals(type)) { - result.add(regionGroup.getId()); - } - }); - return result; + return regionGroupMap.keySet().stream() + .filter(regionGroupId -> regionGroupId.getType().equals(type)) + .collect(Collectors.toList()); } public int getAssignedSeriesPartitionSlotsCount() { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java index 3072746b378..279a5e8c872 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.confignode.procedure.impl.statemachine; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; @@ -52,6 +53,7 @@ public class CreateRegionGroupsProcedure private TConsensusGroupType consensusGroupType; private CreateRegionGroupsPlan createRegionGroupsPlan = new CreateRegionGroupsPlan(); + private CreateRegionGroupsPlan persistPlan; /** key: TConsensusGroupId value: Failed RegionReplicas */ private Map<TConsensusGroupId, TRegionReplicaSet> failedRegionReplicaSets = new HashMap<>(); @@ -64,6 +66,7 @@ public class CreateRegionGroupsProcedure TConsensusGroupType consensusGroupType, CreateRegionGroupsPlan createRegionGroupsPlan) { this.consensusGroupType = consensusGroupType; this.createRegionGroupsPlan = createRegionGroupsPlan; + this.persistPlan = new CreateRegionGroupsPlan(); } @TestOnly @@ -74,6 +77,7 @@ public class CreateRegionGroupsProcedure this.consensusGroupType = consensusGroupType; this.createRegionGroupsPlan = createRegionGroupsPlan; this.failedRegionReplicaSets = failedRegionReplicaSets; + this.persistPlan = new CreateRegionGroupsPlan(); } @Override @@ -84,7 +88,7 @@ public class CreateRegionGroupsProcedure setNextState(CreateRegionGroupsState.SHUNT_REGION_REPLICAS); break; case SHUNT_REGION_REPLICAS: - CreateRegionGroupsPlan persistPlan = new CreateRegionGroupsPlan(); + persistPlan = new CreateRegionGroupsPlan(); OfferRegionMaintainTasksPlan offerPlan = new OfferRegionMaintainTasksPlan(); // Filter those RegionGroups that created successfully createRegionGroupsPlan @@ -201,12 +205,15 @@ public class CreateRegionGroupsProcedure setNextState(CreateRegionGroupsState.CREATE_REGION_GROUPS_FINISH); break; case CREATE_REGION_GROUPS_FINISH: - // Update all corresponding DataAllotTables - createRegionGroupsPlan - .getRegionGroupMap() - .keySet() - .forEach( - database -> env.getConfigManager().getLoadManager().updateDataAllotTable(database)); + if (TConsensusGroupType.DataRegion.equals(consensusGroupType)) { + // Update all corresponding DataAllotTables + persistPlan + .getRegionGroupMap() + .keySet() + .forEach( + database -> + env.getConfigManager().getLoadManager().updateDataAllotTable(database)); + } return Flow.NO_MORE_STATE; } @@ -248,6 +255,7 @@ public class CreateRegionGroupsProcedure ThriftCommonsSerDeUtils.serializeTConsensusGroupId(groupId, stream); ThriftCommonsSerDeUtils.serializeTRegionReplicaSet(replica, stream); }); + persistPlan.serializeForProcedure(stream); } @Override @@ -265,6 +273,7 @@ public class CreateRegionGroupsProcedure ThriftCommonsSerDeUtils.deserializeTRegionReplicaSet(byteBuffer); failedRegionReplicaSets.put(groupId, replica); } + persistPlan.deserializeForProcedure(byteBuffer); } catch (Exception e) { LOGGER.error("Deserialize meets error in CreateRegionGroupsProcedure", e); throw new RuntimeException(e); @@ -273,16 +282,22 @@ public class CreateRegionGroupsProcedure @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } CreateRegionGroupsProcedure that = (CreateRegionGroupsProcedure) o; return consensusGroupType == that.consensusGroupType && createRegionGroupsPlan.equals(that.createRegionGroupsPlan) + && persistPlan.equals(that.persistPlan) && failedRegionReplicaSets.equals(that.failedRegionReplicaSets); } @Override public int hashCode() { - return Objects.hash(consensusGroupType, createRegionGroupsPlan, failedRegionReplicaSets); + return Objects.hash( + consensusGroupType, createRegionGroupsPlan, persistPlan, failedRegionReplicaSets); } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTableTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTableTest.java new file mode 100644 index 00000000000..45ee77cecb1 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTableTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.load.balancer.partition; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.confignode.conf.ConfigNodeConfig; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +public class DataAllotTableTest { + + private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); + private static final int SERIES_SLOT_NUM = CONF.getSeriesSlotNum(); + + @Test + public void testUpdateCurrentTimePartition() { + final int regionGroupNum = 5; + final int threshold = DataAllotTable.timePartitionThreshold(regionGroupNum); + final long timePartitionInterval = 1000; + DataAllotTable dataAllotTable = new DataAllotTable(); + + // Test 1: currentTimePartition is the first one + TTimePartitionSlot nextTimePartition = new TTimePartitionSlot(1000); + Map<TTimePartitionSlot, Integer> timePartitionCountMap = new HashMap<>(); + timePartitionCountMap.put(new TTimePartitionSlot(nextTimePartition), threshold); + timePartitionCountMap.put( + new TTimePartitionSlot(nextTimePartition.getStartTime() + timePartitionInterval), + threshold - 100); + timePartitionCountMap.put( + new TTimePartitionSlot(nextTimePartition.getStartTime() + 2 * timePartitionInterval), + threshold - 200); + dataAllotTable.addTimePartitionCount(timePartitionCountMap); + dataAllotTable.updateCurrentTimePartition(regionGroupNum); + Assert.assertEquals(nextTimePartition, dataAllotTable.getCurrentTimePartition()); + + // Test 2: currentTimePartition in the middle + timePartitionCountMap.clear(); + nextTimePartition = new TTimePartitionSlot(5000); + timePartitionCountMap.put( + new TTimePartitionSlot(nextTimePartition.getStartTime() - timePartitionInterval), + threshold - 100); + timePartitionCountMap.put(new TTimePartitionSlot(nextTimePartition), threshold); + timePartitionCountMap.put( + new TTimePartitionSlot(nextTimePartition.getStartTime() + timePartitionInterval), + threshold - 100); + dataAllotTable.addTimePartitionCount(timePartitionCountMap); + dataAllotTable.updateCurrentTimePartition(regionGroupNum); + Assert.assertEquals(nextTimePartition, dataAllotTable.getCurrentTimePartition()); + + // Test 3: currentTimePartition will be the maximum timePartitionSlot that greater or equal to + // threshold + int offset = 200; + Random random = new Random(); + timePartitionCountMap.clear(); + TTimePartitionSlot baseSlot = new TTimePartitionSlot(10000); + nextTimePartition = baseSlot; + timePartitionCountMap.put(nextTimePartition, threshold); + for (int i = 1; i < 100; i++) { + TTimePartitionSlot slot = + new TTimePartitionSlot(baseSlot.getStartTime() + i * timePartitionInterval); + int count = threshold + random.nextInt(offset) - offset / 2; + timePartitionCountMap.put(slot, count); + if (count >= threshold) { + nextTimePartition = slot; + } + } + dataAllotTable.addTimePartitionCount(timePartitionCountMap); + dataAllotTable.updateCurrentTimePartition(regionGroupNum); + Assert.assertEquals(nextTimePartition, dataAllotTable.getCurrentTimePartition()); + } + + @Test + public void testUpdateDataAllotTable() { + DataAllotTable dataAllotTable = new DataAllotTable(); + List<TConsensusGroupId> dataRegionGroups = new ArrayList<>(); + + // Test 1: construct DataAllotTable from scratch + TConsensusGroupId group1 = new TConsensusGroupId(TConsensusGroupType.DataRegion, 1); + dataRegionGroups.add(group1); + dataAllotTable.updateDataAllotTable(dataRegionGroups, new HashMap<>()); + for (int i = 0; i < SERIES_SLOT_NUM; i++) { + TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i); + // All SeriesPartitionSlots belong to group1 + Assert.assertEquals(group1, dataAllotTable.getRegionGroupId(seriesPartitionSlot)); + } + + // Test2: extend DataRegionGroups + Map<TSeriesPartitionSlot, TConsensusGroupId> lastDataAllotTable = new HashMap<>(); + dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 2)); + dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 3)); + dataAllotTable.updateDataAllotTable(dataRegionGroups, new HashMap<>()); + int mu = SERIES_SLOT_NUM / 3; + Map<TConsensusGroupId, AtomicInteger> counter = new HashMap<>(); + for (int i = 0; i < SERIES_SLOT_NUM; i++) { + TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i); + TConsensusGroupId groupId = dataAllotTable.getRegionGroupId(seriesPartitionSlot); + lastDataAllotTable.put(seriesPartitionSlot, groupId); + counter.computeIfAbsent(groupId, empty -> new AtomicInteger(0)).incrementAndGet(); + } + // All DataRegionGroups divide SeriesPartitionSlots evenly + for (Map.Entry<TConsensusGroupId, AtomicInteger> counterEntry : counter.entrySet()) { + Assert.assertTrue(Math.abs(counterEntry.getValue().get() - mu) <= 1); + } + + // Test 3: extend DataRegionGroups while inherit future allocate result + dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 4)); + dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 5)); + Random random = new Random(); + Map<TSeriesPartitionSlot, TConsensusGroupId> allocatedTable = new HashMap<>(); + Map<TConsensusGroupId, AtomicInteger> unchangedSlots = new HashMap<>(); + for (int i = 0; i < 50; i++) { + TSeriesPartitionSlot seriesPartitionSlot = + new TSeriesPartitionSlot(random.nextInt(SERIES_SLOT_NUM)); + while (allocatedTable.containsKey(seriesPartitionSlot)) { + seriesPartitionSlot = new TSeriesPartitionSlot(random.nextInt(SERIES_SLOT_NUM)); + } + allocatedTable.put( + seriesPartitionSlot, + new TConsensusGroupId(TConsensusGroupType.DataRegion, random.nextInt(2) + 4)); + } + dataAllotTable.updateDataAllotTable(dataRegionGroups, allocatedTable); + mu = SERIES_SLOT_NUM / 5; + counter.clear(); + for (int i = 0; i < SERIES_SLOT_NUM; i++) { + TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i); + TConsensusGroupId groupId = dataAllotTable.getRegionGroupId(seriesPartitionSlot); + counter.computeIfAbsent(groupId, empty -> new AtomicInteger(0)).incrementAndGet(); + if (groupId.getId() < 4) { + // Most of SeriesPartitionSlots in the first three DataRegionGroups should remain unchanged + Assert.assertEquals(lastDataAllotTable.get(seriesPartitionSlot), groupId); + unchangedSlots.computeIfAbsent(groupId, empty -> new AtomicInteger(0)).incrementAndGet(); + } + } + // All DataRegionGroups divide SeriesPartitionSlots evenly + for (Map.Entry<TConsensusGroupId, AtomicInteger> counterEntry : counter.entrySet()) { + Assert.assertTrue(Math.abs(counterEntry.getValue().get() - mu) <= 1); + } + // All SeriesPartitionSlots that have been allocated before should be allocated to the same + // DataRegionGroup + allocatedTable.forEach( + (seriesPartitionSlot, groupId) -> + Assert.assertEquals(groupId, dataAllotTable.getRegionGroupId(seriesPartitionSlot))); + // Most of SeriesPartitionSlots in the first three DataRegionGroups should remain unchanged + for (Map.Entry<TConsensusGroupId, AtomicInteger> counterEntry : unchangedSlots.entrySet()) { + Assert.assertEquals(mu, counterEntry.getValue().get()); + } + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java index def3cc07fd6..00edd8bcb89 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java @@ -249,14 +249,13 @@ public class SeriesPartitionTable { * @return The last DataPartition, null if there are no DataPartitions */ public Pair<TTimePartitionSlot, TConsensusGroupId> getLastDataPartition() { - try { - Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> lastEntry = - seriesPartitionMap.lastEntry(); - return new Pair<>( - lastEntry.getKey(), lastEntry.getValue().get(lastEntry.getValue().size() - 1)); - } catch (NoSuchElementException e) { + Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> lastEntry = + seriesPartitionMap.lastEntry(); + if (lastEntry == null) { return null; } + return new Pair<>( + lastEntry.getKey(), lastEntry.getValue().get(lastEntry.getValue().size() - 1)); } /** @@ -275,11 +274,12 @@ public class SeriesPartitionTable { * @return The last DataPartition's ConsensusGroupId, null if there are no DataPartitions yet */ public TConsensusGroupId getLastConsensusGroupId() { - try { - return seriesPartitionMap.lastEntry().getValue().get(0); - } catch (NoSuchElementException e) { + Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> lastEntry = + seriesPartitionMap.lastEntry(); + if (lastEntry == null) { return null; } + return lastEntry.getValue().get(lastEntry.getValue().size() - 1); } /** diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java index f1ecdc3a5fb..1ea29c9a80b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java @@ -19,6 +19,8 @@ package org.apache.iotdb.commons.structure; +import org.apache.iotdb.commons.utils.TestOnly; + import java.util.HashMap; import java.util.HashSet; import java.util.Set; @@ -56,6 +58,15 @@ public class BalanceTreeMap<K, V extends Comparable<V>> { valueKeyMap.computeIfAbsent(value, empty -> new HashSet<>()).add(key); } + /** + * Get key with minimum value. + * + * @return Key with minimum value + */ + public K getKeyWithMinValue() { + return valueKeyMap.firstEntry().getValue().iterator().next(); + } + public V get(K key) { return keyValueMap.getOrDefault(key, null); } @@ -64,12 +75,20 @@ public class BalanceTreeMap<K, V extends Comparable<V>> { return keyValueMap.containsKey(key); } - /** - * Get key with minimum value. - * - * @return Key with minimum value - */ - public K getKeyWithMinValue() { - return valueKeyMap.firstEntry().getValue().iterator().next(); + @TestOnly + public void remove(K key) { + V value = keyValueMap.getOrDefault(key, null); + if (value != null) { + keyValueMap.remove(key); + valueKeyMap.get(value).remove(key); + if (valueKeyMap.get(value).isEmpty()) { + valueKeyMap.remove(value); + } + } + } + + @TestOnly + public boolean isEmpty() { + return keyValueMap.isEmpty() && valueKeyMap.isEmpty(); } } diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/structure/BalanceTreeMapTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/structure/BalanceTreeMapTest.java new file mode 100644 index 00000000000..d38b8176f26 --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/structure/BalanceTreeMapTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.structure; + +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + +public class BalanceTreeMapTest { + + @Test + public void testGetKeyWithMinValue() { + Random random = new Random(); + BalanceTreeMap<TSeriesPartitionSlot, Integer> balanceTreeMap = new BalanceTreeMap<>(); + for (int i = 0; i < 100; i++) { + balanceTreeMap.put(new TSeriesPartitionSlot(i), random.nextInt(Integer.MAX_VALUE)); + } + TSeriesPartitionSlot minSlot = new TSeriesPartitionSlot(100); + balanceTreeMap.put(minSlot, Integer.MIN_VALUE); + for (int i = 101; i < 200; i++) { + balanceTreeMap.put(new TSeriesPartitionSlot(i), random.nextInt(Integer.MAX_VALUE)); + } + Assert.assertEquals(minSlot, balanceTreeMap.getKeyWithMinValue()); + + int currentValue = Integer.MIN_VALUE; + for (int i = 0; i < 200; i++) { + TSeriesPartitionSlot slot = balanceTreeMap.getKeyWithMinValue(); + Assert.assertTrue(balanceTreeMap.get(slot) >= currentValue); + currentValue = balanceTreeMap.get(slot); + balanceTreeMap.remove(slot); + } + } + + @Test + public void testKeysDuplicate() { + BalanceTreeMap<TSeriesPartitionSlot, Integer> balanceTreeMap = new BalanceTreeMap<>(); + Set<TSeriesPartitionSlot> duplicateSet0 = new HashSet<>(); + for (int i = 0; i < 10; i++) { + TSeriesPartitionSlot slot = new TSeriesPartitionSlot(i); + balanceTreeMap.put(slot, 0); + duplicateSet0.add(slot); + } + Set<TSeriesPartitionSlot> duplicateSet1 = new HashSet<>(); + for (int i = 10; i < 20; i++) { + TSeriesPartitionSlot slot = new TSeriesPartitionSlot(i); + balanceTreeMap.put(slot, 1); + duplicateSet1.add(slot); + } + + for (int i = 0; i < 10; i++) { + Assert.assertTrue(duplicateSet0.contains(balanceTreeMap.getKeyWithMinValue())); + balanceTreeMap.remove(balanceTreeMap.getKeyWithMinValue()); + } + for (int i = 0; i < 10; i++) { + Assert.assertTrue(duplicateSet1.contains(balanceTreeMap.getKeyWithMinValue())); + balanceTreeMap.remove(balanceTreeMap.getKeyWithMinValue()); + } + Assert.assertTrue(balanceTreeMap.isEmpty()); + } +}
