This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch feature/concurrent-fastutil-maps in repository https://gitbox.apache.org/repos/asf/doris.git
commit 5932e34a88610e2958d49313bfefe4c54175c98c Author: Yongqiang YANG <[email protected]> AuthorDate: Sat Mar 14 05:10:34 2026 -0700 [Enhancement](memory) Add ConcurrentLong2ObjectHashMap and ConcurrentLong2LongHashMap Add thread-safe primitive-key concurrent hash maps built on fastutil, designed to replace ConcurrentHashMap<Long, V> and ConcurrentHashMap<Long, Long> in memory-sensitive FE paths. These maps eliminate Long autoboxing overhead and reduce per-entry memory from ~64 bytes (ConcurrentHashMap) to ~16 bytes, a 4x improvement. Key design: - Segment-based locking (default 16 segments) for concurrent throughput - Full Map interface compatibility for drop-in replacement - Atomic putIfAbsent, computeIfAbsent, replace, remove operations - Comprehensive unit tests covering CRUD, concurrency, iteration, edge cases, and default value semantics Co-Authored-By: Claude Opus 4.6 <[email protected]> --- .../doris/common/ConcurrentLong2LongHashMap.java | 486 +++++++++++++++++++++ .../doris/common/ConcurrentLong2ObjectHashMap.java | 443 +++++++++++++++++++ .../common/ConcurrentLong2LongHashMapTest.java | 455 +++++++++++++++++++ .../common/ConcurrentLong2ObjectHashMapTest.java | 432 ++++++++++++++++++ 4 files changed, 1816 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ConcurrentLong2LongHashMap.java b/fe/fe-core/src/main/java/org/apache/doris/common/ConcurrentLong2LongHashMap.java new file mode 100644 index 00000000000..0cb8de92dcd --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ConcurrentLong2LongHashMap.java @@ -0,0 +1,486 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import it.unimi.dsi.fastutil.HashCommon; +import it.unimi.dsi.fastutil.longs.AbstractLong2LongMap; +import it.unimi.dsi.fastutil.longs.Long2LongFunction; +import it.unimi.dsi.fastutil.longs.Long2LongMap; +import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap; +import it.unimi.dsi.fastutil.longs.LongArrayList; +import it.unimi.dsi.fastutil.longs.LongBinaryOperator; +import it.unimi.dsi.fastutil.longs.LongOpenHashSet; +import it.unimi.dsi.fastutil.longs.LongSet; +import it.unimi.dsi.fastutil.objects.ObjectArrayList; +import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet; +import it.unimi.dsi.fastutil.objects.ObjectSet; + +import java.util.Map; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.function.LongUnaryOperator; + +/** + * A concurrent map with primitive long keys and primitive long values, backed by segmented + * {@link Long2LongOpenHashMap} instances with {@link ReentrantReadWriteLock} per segment. + * + * <p>This class saves ~48 bytes per entry compared to {@code ConcurrentHashMap<Long, Long>} + * by avoiding boxing of both keys and values. For fields like partition update row counts + * with millions of entries, this translates to hundreds of MB of heap savings. + * + * <p>The {@link #addTo(long, long)} method provides atomic increment semantics, useful for + * counter patterns. + * + * <p><b>Important:</b> All compound operations from both {@link Long2LongMap} and {@link Map} + * interfaces are overridden to ensure atomicity within a segment's write lock. + */ +public class ConcurrentLong2LongHashMap extends AbstractLong2LongMap { + + private static final int DEFAULT_SEGMENT_COUNT = 16; + private static final int DEFAULT_INITIAL_CAPACITY_PER_SEGMENT = 16; + + private final Segment[] segments; + private final int segmentMask; + private final int segmentBits; + + public ConcurrentLong2LongHashMap() { + this(DEFAULT_SEGMENT_COUNT); + } + + public ConcurrentLong2LongHashMap(int segmentCount) { + if (segmentCount <= 0 || (segmentCount & (segmentCount - 1)) != 0) { + throw new IllegalArgumentException("segmentCount must be a positive power of 2: " + segmentCount); + } + this.segmentBits = Integer.numberOfTrailingZeros(segmentCount); + this.segmentMask = segmentCount - 1; + this.segments = new Segment[segmentCount]; + for (int i = 0; i < segmentCount; i++) { + segments[i] = new Segment(DEFAULT_INITIAL_CAPACITY_PER_SEGMENT); + } + } + + private Segment segmentFor(long key) { + return segments[(int) (HashCommon.mix(key) >>> (64 - segmentBits)) & segmentMask]; + } + + // ---- Read operations (read-lock) ---- + + @Override + public long get(long key) { + Segment seg = segmentFor(key); + seg.lock.readLock().lock(); + try { + return seg.map.get(key); + } finally { + seg.lock.readLock().unlock(); + } + } + + @Override + public long getOrDefault(long key, long defaultValue) { + Segment seg = segmentFor(key); + seg.lock.readLock().lock(); + try { + return seg.map.getOrDefault(key, defaultValue); + } finally { + seg.lock.readLock().unlock(); + } + } + + @Override + public boolean containsKey(long key) { + Segment seg = segmentFor(key); + seg.lock.readLock().lock(); + try { + return seg.map.containsKey(key); + } finally { + seg.lock.readLock().unlock(); + } + } + + @Override + public boolean containsValue(long value) { + for (Segment seg : segments) { + seg.lock.readLock().lock(); + try { + if (seg.map.containsValue(value)) { + return true; + } + } finally { + seg.lock.readLock().unlock(); + } + } + return false; + } + + @Override + public int size() { + long total = 0; + for (Segment seg : segments) { + seg.lock.readLock().lock(); + try { + total += seg.map.size(); + } finally { + seg.lock.readLock().unlock(); + } + } + return (int) Math.min(total, Integer.MAX_VALUE); + } + + @Override + public boolean isEmpty() { + for (Segment seg : segments) { + seg.lock.readLock().lock(); + try { + if (!seg.map.isEmpty()) { + return false; + } + } finally { + seg.lock.readLock().unlock(); + } + } + return true; + } + + // ---- Write operations (write-lock) ---- + + @Override + public long put(long key, long value) { + Segment seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + return seg.map.put(key, value); + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public long remove(long key) { + Segment seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + return seg.map.remove(key); + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public long putIfAbsent(long key, long value) { + Segment seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + return seg.map.putIfAbsent(key, value); + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public boolean replace(long key, long oldValue, long newValue) { + Segment seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + return seg.map.replace(key, oldValue, newValue); + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public long replace(long key, long value) { + Segment seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + return seg.map.replace(key, value); + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public void clear() { + for (Segment seg : segments) { + seg.lock.writeLock().lock(); + try { + seg.map.clear(); + } finally { + seg.lock.writeLock().unlock(); + } + } + } + + @Override + public void putAll(Map<? extends Long, ? extends Long> m) { + for (Map.Entry<? extends Long, ? extends Long> entry : m.entrySet()) { + put(entry.getKey().longValue(), entry.getValue().longValue()); + } + } + + // ---- Atomic compound operations ---- + // Override ALL compound methods from both Long2LongMap and Map interfaces. + + /** + * Atomically adds the given increment to the value associated with the key. + * If the key is not present, the entry is created with the increment as value + * (starting from defaultReturnValue, which is 0L by default). + * + * @return the new value after the increment + */ + public long addTo(long key, long increment) { + Segment seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + long newValue = seg.map.addTo(key, increment) + increment; + return newValue; + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public long computeIfAbsent(long key, LongUnaryOperator mappingFunction) { + Segment seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + if (seg.map.containsKey(key)) { + return seg.map.get(key); + } + long newValue = mappingFunction.applyAsLong(key); + seg.map.put(key, newValue); + return newValue; + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public long computeIfAbsent(long key, Long2LongFunction mappingFunction) { + Segment seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + if (seg.map.containsKey(key)) { + return seg.map.get(key); + } + long newValue = mappingFunction.get(key); + seg.map.put(key, newValue); + return newValue; + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public Long computeIfAbsent(Long key, Function<? super Long, ? extends Long> mappingFunction) { + long k = key.longValue(); + Segment seg = segmentFor(k); + seg.lock.writeLock().lock(); + try { + if (seg.map.containsKey(k)) { + return seg.map.get(k); + } + Long newValue = mappingFunction.apply(key); + if (newValue != null) { + seg.map.put(k, newValue.longValue()); + } + return newValue; + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public long computeIfPresent(long key, + BiFunction<? super Long, ? super Long, ? extends Long> remappingFunction) { + Segment seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + if (!seg.map.containsKey(key)) { + return defaultReturnValue(); + } + long oldValue = seg.map.get(key); + Long newValue = remappingFunction.apply(key, oldValue); + if (newValue != null) { + seg.map.put(key, newValue.longValue()); + return newValue; + } else { + seg.map.remove(key); + return defaultReturnValue(); + } + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public long compute(long key, BiFunction<? super Long, ? super Long, ? extends Long> remappingFunction) { + Segment seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + Long oldValue = seg.map.containsKey(key) ? seg.map.get(key) : null; + Long newValue = remappingFunction.apply(key, oldValue); + if (newValue != null) { + seg.map.put(key, newValue.longValue()); + return newValue; + } else if (oldValue != null) { + seg.map.remove(key); + } + return defaultReturnValue(); + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public long merge(long key, long value, + BiFunction<? super Long, ? super Long, ? extends Long> remappingFunction) { + Segment seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + if (!seg.map.containsKey(key)) { + seg.map.put(key, value); + return value; + } + long oldValue = seg.map.get(key); + Long newValue = remappingFunction.apply(oldValue, value); + if (newValue != null) { + seg.map.put(key, newValue.longValue()); + return newValue; + } else { + seg.map.remove(key); + return defaultReturnValue(); + } + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public long mergeLong(long key, long value, java.util.function.LongBinaryOperator remappingFunction) { + Segment seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + if (!seg.map.containsKey(key)) { + seg.map.put(key, value); + return value; + } + long oldValue = seg.map.get(key); + long newValue = remappingFunction.applyAsLong(oldValue, value); + seg.map.put(key, newValue); + return newValue; + } finally { + seg.lock.writeLock().unlock(); + } + } + + // ---- Iteration (weakly consistent snapshots) ---- + + @Override + public ObjectSet<Long2LongMap.Entry> long2LongEntrySet() { + ObjectOpenHashSet<Long2LongMap.Entry> snapshot = new ObjectOpenHashSet<>(); + for (Segment seg : segments) { + seg.lock.readLock().lock(); + try { + for (Long2LongMap.Entry entry : seg.map.long2LongEntrySet()) { + snapshot.add(new AbstractLong2LongMap.BasicEntry(entry.getLongKey(), entry.getLongValue())); + } + } finally { + seg.lock.readLock().unlock(); + } + } + return snapshot; + } + + @Override + public LongSet keySet() { + LongOpenHashSet snapshot = new LongOpenHashSet(); + for (Segment seg : segments) { + seg.lock.readLock().lock(); + try { + snapshot.addAll(seg.map.keySet()); + } finally { + seg.lock.readLock().unlock(); + } + } + return snapshot; + } + + /** + * Returns the keys as a {@link LongArrayList}. + */ + public LongArrayList keyList() { + LongArrayList list = new LongArrayList(size()); + for (Segment seg : segments) { + seg.lock.readLock().lock(); + try { + list.addAll(seg.map.keySet()); + } finally { + seg.lock.readLock().unlock(); + } + } + return list; + } + + @Override + public it.unimi.dsi.fastutil.longs.LongCollection values() { + LongArrayList snapshot = new LongArrayList(); + for (Segment seg : segments) { + seg.lock.readLock().lock(); + try { + snapshot.addAll(seg.map.values()); + } finally { + seg.lock.readLock().unlock(); + } + } + return snapshot; + } + + /** + * Applies the given action to each entry under read-lock per segment. + */ + public void forEach(LongLongConsumer action) { + for (Segment seg : segments) { + seg.lock.readLock().lock(); + try { + for (Long2LongMap.Entry entry : seg.map.long2LongEntrySet()) { + action.accept(entry.getLongKey(), entry.getLongValue()); + } + } finally { + seg.lock.readLock().unlock(); + } + } + } + + @FunctionalInterface + public interface LongLongConsumer { + void accept(long key, long value); + } + + // ---- Segment inner class ---- + + private static final class Segment { + final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + final Long2LongOpenHashMap map; + + Segment(int initialCapacity) { + this.map = new Long2LongOpenHashMap(initialCapacity); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ConcurrentLong2ObjectHashMap.java b/fe/fe-core/src/main/java/org/apache/doris/common/ConcurrentLong2ObjectHashMap.java new file mode 100644 index 00000000000..a599d9f712c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ConcurrentLong2ObjectHashMap.java @@ -0,0 +1,443 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import it.unimi.dsi.fastutil.HashCommon; +import it.unimi.dsi.fastutil.longs.AbstractLong2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectFunction; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; +import it.unimi.dsi.fastutil.longs.LongArrayList; +import it.unimi.dsi.fastutil.longs.LongOpenHashSet; +import it.unimi.dsi.fastutil.longs.LongSet; +import it.unimi.dsi.fastutil.objects.ObjectArrayList; +import it.unimi.dsi.fastutil.objects.ObjectCollection; +import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet; +import it.unimi.dsi.fastutil.objects.ObjectSet; + +import java.util.Map; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.function.LongFunction; + +/** + * A concurrent map with primitive long keys and object values, backed by segmented + * {@link Long2ObjectOpenHashMap} instances with {@link ReentrantReadWriteLock} per segment. + * + * <p>This class provides similar concurrency guarantees to {@link java.util.concurrent.ConcurrentHashMap} + * while avoiding the memory overhead of boxing long keys. For a cluster with millions of tablet entries, + * this saves ~32 bytes per entry compared to {@code ConcurrentHashMap<Long, V>}. + * + * <p>Iteration methods ({@link #long2ObjectEntrySet()}, {@link #keySet()}, {@link #values()}) + * return snapshot copies and are weakly consistent. + * + * <p><b>Important:</b> All compound operations (computeIfAbsent, computeIfPresent, compute, merge) + * from both {@link Long2ObjectMap} and {@link Map} interfaces are overridden to ensure atomicity + * within a segment. The default interface implementations would call get/put as separate locked + * operations, breaking atomicity. + * + * @param <V> the type of mapped values + */ +public class ConcurrentLong2ObjectHashMap<V> extends AbstractLong2ObjectMap<V> { + + private static final int DEFAULT_SEGMENT_COUNT = 16; + private static final int DEFAULT_INITIAL_CAPACITY_PER_SEGMENT = 16; + + private final Segment<V>[] segments; + private final int segmentMask; + private final int segmentBits; + + public ConcurrentLong2ObjectHashMap() { + this(DEFAULT_SEGMENT_COUNT); + } + + @SuppressWarnings("unchecked") + public ConcurrentLong2ObjectHashMap(int segmentCount) { + if (segmentCount <= 0 || (segmentCount & (segmentCount - 1)) != 0) { + throw new IllegalArgumentException("segmentCount must be a positive power of 2: " + segmentCount); + } + this.segmentBits = Integer.numberOfTrailingZeros(segmentCount); + this.segmentMask = segmentCount - 1; + this.segments = new Segment[segmentCount]; + for (int i = 0; i < segmentCount; i++) { + segments[i] = new Segment<>(DEFAULT_INITIAL_CAPACITY_PER_SEGMENT); + } + } + + private Segment<V> segmentFor(long key) { + return segments[(int) (HashCommon.mix(key) >>> (64 - segmentBits)) & segmentMask]; + } + + // ---- Read operations (read-lock) ---- + + @Override + public V get(long key) { + Segment<V> seg = segmentFor(key); + seg.lock.readLock().lock(); + try { + return seg.map.get(key); + } finally { + seg.lock.readLock().unlock(); + } + } + + @Override + public V getOrDefault(long key, V defaultValue) { + Segment<V> seg = segmentFor(key); + seg.lock.readLock().lock(); + try { + return seg.map.getOrDefault(key, defaultValue); + } finally { + seg.lock.readLock().unlock(); + } + } + + @Override + public boolean containsKey(long key) { + Segment<V> seg = segmentFor(key); + seg.lock.readLock().lock(); + try { + return seg.map.containsKey(key); + } finally { + seg.lock.readLock().unlock(); + } + } + + @Override + public boolean containsValue(Object value) { + for (Segment<V> seg : segments) { + seg.lock.readLock().lock(); + try { + if (seg.map.containsValue(value)) { + return true; + } + } finally { + seg.lock.readLock().unlock(); + } + } + return false; + } + + @Override + public int size() { + long total = 0; + for (Segment<V> seg : segments) { + seg.lock.readLock().lock(); + try { + total += seg.map.size(); + } finally { + seg.lock.readLock().unlock(); + } + } + return (int) Math.min(total, Integer.MAX_VALUE); + } + + @Override + public boolean isEmpty() { + for (Segment<V> seg : segments) { + seg.lock.readLock().lock(); + try { + if (!seg.map.isEmpty()) { + return false; + } + } finally { + seg.lock.readLock().unlock(); + } + } + return true; + } + + // ---- Write operations (write-lock) ---- + + @Override + public V put(long key, V value) { + Segment<V> seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + return seg.map.put(key, value); + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public V remove(long key) { + Segment<V> seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + return seg.map.remove(key); + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public V putIfAbsent(long key, V value) { + Segment<V> seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + return seg.map.putIfAbsent(key, value); + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public boolean replace(long key, V oldValue, V newValue) { + Segment<V> seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + return seg.map.replace(key, oldValue, newValue); + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public V replace(long key, V value) { + Segment<V> seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + return seg.map.replace(key, value); + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public void clear() { + for (Segment<V> seg : segments) { + seg.lock.writeLock().lock(); + try { + seg.map.clear(); + } finally { + seg.lock.writeLock().unlock(); + } + } + } + + @Override + public void putAll(Map<? extends Long, ? extends V> m) { + for (Map.Entry<? extends Long, ? extends V> entry : m.entrySet()) { + put(entry.getKey().longValue(), entry.getValue()); + } + } + + // ---- Atomic compound operations ---- + // Override ALL compound methods from both Long2ObjectMap and Map interfaces + // to ensure the check-then-act is atomic within a segment's write lock. + + @Override + public V computeIfAbsent(long key, LongFunction<? extends V> mappingFunction) { + Segment<V> seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + V val = seg.map.get(key); + if (val != null || seg.map.containsKey(key)) { + return val; + } + V newValue = mappingFunction.apply(key); + seg.map.put(key, newValue); + return newValue; + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public V computeIfAbsent(long key, Long2ObjectFunction<? extends V> mappingFunction) { + Segment<V> seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + V val = seg.map.get(key); + if (val != null || seg.map.containsKey(key)) { + return val; + } + V newValue = mappingFunction.get(key); + seg.map.put(key, newValue); + return newValue; + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public V computeIfAbsent(Long key, Function<? super Long, ? extends V> mappingFunction) { + return computeIfAbsent(key.longValue(), (long k) -> mappingFunction.apply(k)); + } + + @Override + public V computeIfPresent(long key, BiFunction<? super Long, ? super V, ? extends V> remappingFunction) { + Segment<V> seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + V oldValue = seg.map.get(key); + if (oldValue != null || seg.map.containsKey(key)) { + V newValue = remappingFunction.apply(key, oldValue); + if (newValue != null) { + seg.map.put(key, newValue); + } else { + seg.map.remove(key); + } + return newValue; + } + return null; + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public V compute(long key, BiFunction<? super Long, ? super V, ? extends V> remappingFunction) { + Segment<V> seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + V oldValue = seg.map.containsKey(key) ? seg.map.get(key) : null; + V newValue = remappingFunction.apply(key, oldValue); + if (newValue != null) { + seg.map.put(key, newValue); + } else if (seg.map.containsKey(key)) { + seg.map.remove(key); + } + return newValue; + } finally { + seg.lock.writeLock().unlock(); + } + } + + @Override + public V merge(long key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) { + Segment<V> seg = segmentFor(key); + seg.lock.writeLock().lock(); + try { + V oldValue = seg.map.get(key); + V newValue; + if (oldValue != null || seg.map.containsKey(key)) { + newValue = remappingFunction.apply(oldValue, value); + } else { + newValue = value; + } + if (newValue != null) { + seg.map.put(key, newValue); + } else { + seg.map.remove(key); + } + return newValue; + } finally { + seg.lock.writeLock().unlock(); + } + } + + // ---- Iteration (weakly consistent snapshots) ---- + + @Override + public ObjectSet<Long2ObjectMap.Entry<V>> long2ObjectEntrySet() { + ObjectOpenHashSet<Long2ObjectMap.Entry<V>> snapshot = new ObjectOpenHashSet<>(); + for (Segment<V> seg : segments) { + seg.lock.readLock().lock(); + try { + for (Long2ObjectMap.Entry<V> entry : seg.map.long2ObjectEntrySet()) { + snapshot.add(new AbstractLong2ObjectMap.BasicEntry<>(entry.getLongKey(), entry.getValue())); + } + } finally { + seg.lock.readLock().unlock(); + } + } + return snapshot; + } + + @Override + public LongSet keySet() { + LongOpenHashSet snapshot = new LongOpenHashSet(); + for (Segment<V> seg : segments) { + seg.lock.readLock().lock(); + try { + snapshot.addAll(seg.map.keySet()); + } finally { + seg.lock.readLock().unlock(); + } + } + return snapshot; + } + + /** + * Returns the keys as a {@link LongArrayList}. Useful when callers need indexed access + * or will iterate the keys once. Snapshot-based and weakly consistent. + */ + public LongArrayList keyList() { + LongArrayList list = new LongArrayList(size()); + for (Segment<V> seg : segments) { + seg.lock.readLock().lock(); + try { + list.addAll(seg.map.keySet()); + } finally { + seg.lock.readLock().unlock(); + } + } + return list; + } + + @Override + public ObjectCollection<V> values() { + ObjectArrayList<V> snapshot = new ObjectArrayList<>(); + for (Segment<V> seg : segments) { + seg.lock.readLock().lock(); + try { + snapshot.addAll(seg.map.values()); + } finally { + seg.lock.readLock().unlock(); + } + } + return snapshot; + } + + /** + * Applies the given action to each entry under read-lock per segment. + * This is more efficient than iterating {@link #long2ObjectEntrySet()} as it avoids + * creating a snapshot. + */ + public void forEach(LongObjConsumer<? super V> action) { + for (Segment<V> seg : segments) { + seg.lock.readLock().lock(); + try { + for (Long2ObjectMap.Entry<V> entry : seg.map.long2ObjectEntrySet()) { + action.accept(entry.getLongKey(), entry.getValue()); + } + } finally { + seg.lock.readLock().unlock(); + } + } + } + + @FunctionalInterface + public interface LongObjConsumer<V> { + void accept(long key, V value); + } + + // ---- Segment inner class ---- + + private static final class Segment<V> { + final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + final Long2ObjectOpenHashMap<V> map; + + Segment(int initialCapacity) { + this.map = new Long2ObjectOpenHashMap<>(initialCapacity); + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/ConcurrentLong2LongHashMapTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/ConcurrentLong2LongHashMapTest.java new file mode 100644 index 00000000000..4bc749d1501 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/common/ConcurrentLong2LongHashMapTest.java @@ -0,0 +1,455 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import com.google.gson.Gson; +import it.unimi.dsi.fastutil.longs.Long2LongMap; +import it.unimi.dsi.fastutil.longs.LongSet; +import it.unimi.dsi.fastutil.objects.ObjectSet; +import org.apache.doris.persist.gson.GsonUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +class ConcurrentLong2LongHashMapTest { + + @Test + void testPutAndGet() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + map.put(1L, 100L); + Assertions.assertEquals(100L, map.get(1L)); + map.put(1L, 200L); + Assertions.assertEquals(200L, map.get(1L)); + } + + @Test + void testGetMissingKeyReturnsDefaultReturnValue() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + // Default return value is 0L + Assertions.assertEquals(0L, map.get(999L)); + } + + @Test + void testGetOrDefault() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + map.put(1L, 100L); + Assertions.assertEquals(100L, map.getOrDefault(1L, -1L)); + Assertions.assertEquals(-1L, map.getOrDefault(2L, -1L)); + } + + @Test + void testRemove() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + map.put(1L, 100L); + Assertions.assertEquals(100L, map.remove(1L)); + Assertions.assertFalse(map.containsKey(1L)); + // Remove non-existent key returns defaultReturnValue + Assertions.assertEquals(0L, map.remove(1L)); + } + + @Test + void testContainsKey() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + Assertions.assertFalse(map.containsKey(1L)); + map.put(1L, 0L); + Assertions.assertTrue(map.containsKey(1L)); + } + + @Test + void testContainsValue() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + map.put(1L, 100L); + map.put(2L, 200L); + Assertions.assertTrue(map.containsValue(100L)); + Assertions.assertFalse(map.containsValue(300L)); + } + + @Test + void testSizeAndIsEmpty() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + Assertions.assertTrue(map.isEmpty()); + Assertions.assertEquals(0, map.size()); + map.put(1L, 100L); + map.put(2L, 200L); + Assertions.assertFalse(map.isEmpty()); + Assertions.assertEquals(2, map.size()); + } + + @Test + void testClear() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + map.put(1L, 100L); + map.put(2L, 200L); + map.clear(); + Assertions.assertTrue(map.isEmpty()); + Assertions.assertEquals(0, map.size()); + } + + @Test + void testPutAll() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + Map<Long, Long> source = new HashMap<>(); + source.put(1L, 100L); + source.put(2L, 200L); + source.put(3L, 300L); + map.putAll(source); + Assertions.assertEquals(3, map.size()); + Assertions.assertEquals(200L, map.get(2L)); + } + + @Test + void testPutIfAbsent() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + Assertions.assertEquals(0L, map.putIfAbsent(1L, 100L)); + Assertions.assertEquals(100L, map.putIfAbsent(1L, 200L)); + Assertions.assertEquals(100L, map.get(1L)); + } + + @Test + void testComputeIfAbsent() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + long val = map.computeIfAbsent(1L, k -> k * 10); + Assertions.assertEquals(10L, val); + // Should not recompute + long val2 = map.computeIfAbsent(1L, k -> k * 20); + Assertions.assertEquals(10L, val2); + } + + // ---- addTo tests ---- + + @Test + void testAddToNewKey() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + long result = map.addTo(1L, 5L); + Assertions.assertEquals(5L, result); + Assertions.assertEquals(5L, map.get(1L)); + } + + @Test + void testAddToExistingKey() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + map.put(1L, 10L); + long result = map.addTo(1L, 5L); + Assertions.assertEquals(15L, result); + Assertions.assertEquals(15L, map.get(1L)); + } + + @Test + void testAddToNegative() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + map.put(1L, 10L); + long result = map.addTo(1L, -3L); + Assertions.assertEquals(7L, result); + } + + // ---- Iteration tests ---- + + @Test + void testEntrySet() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + map.put(1L, 100L); + map.put(2L, 200L); + + ObjectSet<Long2LongMap.Entry> entries = map.long2LongEntrySet(); + Assertions.assertEquals(2, entries.size()); + + Set<Long> keys = new HashSet<>(); + for (Long2LongMap.Entry entry : entries) { + keys.add(entry.getLongKey()); + } + Assertions.assertTrue(keys.contains(1L)); + Assertions.assertTrue(keys.contains(2L)); + } + + @Test + void testKeySet() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + map.put(10L, 100L); + map.put(20L, 200L); + LongSet keys = map.keySet(); + Assertions.assertEquals(2, keys.size()); + Assertions.assertTrue(keys.contains(10L)); + Assertions.assertTrue(keys.contains(20L)); + } + + @Test + void testValues() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + map.put(1L, 100L); + map.put(2L, 200L); + it.unimi.dsi.fastutil.longs.LongCollection values = map.values(); + Assertions.assertEquals(2, values.size()); + Assertions.assertTrue(values.contains(100L)); + Assertions.assertTrue(values.contains(200L)); + } + + @Test + void testForEach() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + map.put(1L, 100L); + map.put(2L, 200L); + Map<Long, Long> collected = new HashMap<>(); + map.forEach(collected::put); + Assertions.assertEquals(2, collected.size()); + Assertions.assertEquals(100L, (long) collected.get(1L)); + } + + // ---- Large map test ---- + + @Test + void testLargeMap() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + int count = 100_000; + for (long i = 0; i < count; i++) { + map.put(i, i * 3); + } + Assertions.assertEquals(count, map.size()); + for (long i = 0; i < count; i++) { + Assertions.assertEquals(i * 3, map.get(i)); + } + } + + @Test + void testCustomSegmentCount() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(4); + for (long i = 0; i < 1000; i++) { + map.put(i, i); + } + Assertions.assertEquals(1000, map.size()); + } + + @Test + void testInvalidSegmentCount() { + Assertions.assertThrows(IllegalArgumentException.class, () -> new ConcurrentLong2LongHashMap(3)); + Assertions.assertThrows(IllegalArgumentException.class, () -> new ConcurrentLong2LongHashMap(0)); + } + + // ---- Concurrency tests ---- + + @Test + void testConcurrentPuts() throws Exception { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + int threads = 8; + int keysPerThread = 10_000; + ExecutorService executor = Executors.newFixedThreadPool(threads); + CountDownLatch latch = new CountDownLatch(threads); + + for (int t = 0; t < threads; t++) { + final int threadId = t; + executor.submit(() -> { + for (int i = 0; i < keysPerThread; i++) { + long key = (long) threadId * keysPerThread + i; + map.put(key, key * 2); + } + latch.countDown(); + }); + } + latch.await(); + executor.shutdown(); + Assertions.assertEquals(threads * keysPerThread, map.size()); + } + + @Test + void testConcurrentAddTo() throws Exception { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + int threads = 16; + int incrementsPerThread = 10_000; + long key = 42L; + ExecutorService executor = Executors.newFixedThreadPool(threads); + CountDownLatch latch = new CountDownLatch(threads); + + for (int t = 0; t < threads; t++) { + executor.submit(() -> { + for (int i = 0; i < incrementsPerThread; i++) { + map.addTo(key, 1L); + } + latch.countDown(); + }); + } + latch.await(); + executor.shutdown(); + + Assertions.assertEquals((long) threads * incrementsPerThread, map.get(key)); + } + + @Test + void testConcurrentReadWrite() throws Exception { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + for (long i = 0; i < 1000; i++) { + map.put(i, i); + } + + int threads = 8; + ExecutorService executor = Executors.newFixedThreadPool(threads); + CountDownLatch latch = new CountDownLatch(threads); + AtomicInteger errors = new AtomicInteger(); + + for (int t = 0; t < threads; t++) { + final int threadId = t; + executor.submit(() -> { + try { + for (int i = 0; i < 5000; i++) { + long key = i % 1000; + if (threadId % 2 == 0) { + map.get(key); + map.containsKey(key); + } else { + map.put(key + 1000L * threadId, (long) i); + } + } + } catch (Exception e) { + errors.incrementAndGet(); + } finally { + latch.countDown(); + } + }); + } + latch.await(); + executor.shutdown(); + Assertions.assertEquals(0, errors.get()); + } + + @Test + void testConcurrentComputeIfAbsent() throws Exception { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + int threads = 16; + long sharedKey = 42L; + AtomicInteger computeCount = new AtomicInteger(); + ExecutorService executor = Executors.newFixedThreadPool(threads); + List<Future<Long>> futures = new ArrayList<>(); + + for (int t = 0; t < threads; t++) { + futures.add(executor.submit(() -> + map.computeIfAbsent(sharedKey, k -> { + computeCount.incrementAndGet(); + return k * 10; + }) + )); + } + Set<Long> results = new HashSet<>(); + for (Future<Long> f : futures) { + results.add(f.get()); + } + executor.shutdown(); + + Assertions.assertEquals(1, results.size()); + Assertions.assertTrue(results.contains(420L)); + Assertions.assertEquals(1, computeCount.get()); + } + + @Test + void testConcurrentIterationDuringModification() throws Exception { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + for (long i = 0; i < 1000; i++) { + map.put(i, i); + } + + int threads = 4; + ExecutorService executor = Executors.newFixedThreadPool(threads); + CountDownLatch latch = new CountDownLatch(threads); + AtomicInteger errors = new AtomicInteger(); + + for (int t = 0; t < threads; t++) { + final int threadId = t; + executor.submit(() -> { + try { + for (int i = 0; i < 100; i++) { + if (threadId % 2 == 0) { + map.keySet(); + map.values(); + map.long2LongEntrySet(); + } else { + map.put(1000L + threadId * 100 + i, (long) i); + map.remove((long) (i % 500)); + } + } + } catch (Exception e) { + errors.incrementAndGet(); + } finally { + latch.countDown(); + } + }); + } + latch.await(); + executor.shutdown(); + Assertions.assertEquals(0, errors.get()); + } + + // ---- Gson serialization tests ---- + + @Test + void testGsonRoundTrip() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + map.put(100L, 1000L); + map.put(200L, 2000L); + + String json = GsonUtils.GSON.toJson(map); + + ConcurrentLong2LongHashMap deserialized = GsonUtils.GSON.fromJson(json, ConcurrentLong2LongHashMap.class); + + Assertions.assertEquals(2, deserialized.size()); + Assertions.assertEquals(1000L, deserialized.get(100L)); + Assertions.assertEquals(2000L, deserialized.get(200L)); + } + + @Test + void testGsonFormatCompatibleWithConcurrentHashMap() { + ConcurrentHashMap<Long, Long> chm = new ConcurrentHashMap<>(); + chm.put(1L, 100L); + chm.put(2L, 200L); + String chmJson = new Gson().toJson(chm); + + ConcurrentLong2LongHashMap fastMap = new ConcurrentLong2LongHashMap(); + fastMap.put(1L, 100L); + fastMap.put(2L, 200L); + String fastJson = GsonUtils.GSON.toJson(fastMap); + + Gson gson = new Gson(); + Map<?, ?> chmParsed = gson.fromJson(chmJson, Map.class); + Map<?, ?> fastParsed = gson.fromJson(fastJson, Map.class); + Assertions.assertEquals(chmParsed, fastParsed); + } + + @Test + void testDefaultReturnValueBehavior() { + ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(); + // Primitive get returns 0L (defaultReturnValue) for missing keys + Assertions.assertEquals(0L, map.get(999L)); + + // Store 0L explicitly + map.put(1L, 0L); + Assertions.assertTrue(map.containsKey(1L)); + Assertions.assertEquals(0L, map.get(1L)); + + // Boxed get via Map<Long,Long> interface returns null for missing keys + Long boxedResult = map.getOrDefault(999L, map.defaultReturnValue()); + Assertions.assertEquals(0L, boxedResult); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/ConcurrentLong2ObjectHashMapTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/ConcurrentLong2ObjectHashMapTest.java new file mode 100644 index 00000000000..88f1a45e7e3 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/common/ConcurrentLong2ObjectHashMapTest.java @@ -0,0 +1,432 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.LongSet; +import it.unimi.dsi.fastutil.objects.ObjectCollection; +import it.unimi.dsi.fastutil.objects.ObjectSet; +import org.apache.doris.persist.gson.GsonUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +class ConcurrentLong2ObjectHashMapTest { + + @Test + void testPutAndGet() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + Assertions.assertNull(map.put(1L, "one")); + Assertions.assertEquals("one", map.get(1L)); + Assertions.assertEquals("one", map.put(1L, "ONE")); + Assertions.assertEquals("ONE", map.get(1L)); + } + + @Test + void testGetMissingKey() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + Assertions.assertNull(map.get(999L)); + } + + @Test + void testGetOrDefault() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + map.put(1L, "one"); + Assertions.assertEquals("one", map.getOrDefault(1L, "default")); + Assertions.assertEquals("default", map.getOrDefault(2L, "default")); + } + + @Test + void testRemove() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + map.put(1L, "one"); + Assertions.assertEquals("one", map.remove(1L)); + Assertions.assertNull(map.get(1L)); + Assertions.assertNull(map.remove(1L)); + } + + @Test + void testContainsKey() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + Assertions.assertFalse(map.containsKey(1L)); + map.put(1L, "one"); + Assertions.assertTrue(map.containsKey(1L)); + } + + @Test + void testContainsValue() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + map.put(1L, "one"); + map.put(2L, "two"); + Assertions.assertTrue(map.containsValue("one")); + Assertions.assertFalse(map.containsValue("three")); + } + + @Test + void testSizeAndIsEmpty() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + Assertions.assertTrue(map.isEmpty()); + Assertions.assertEquals(0, map.size()); + map.put(1L, "one"); + map.put(2L, "two"); + Assertions.assertFalse(map.isEmpty()); + Assertions.assertEquals(2, map.size()); + } + + @Test + void testClear() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + map.put(1L, "one"); + map.put(2L, "two"); + map.clear(); + Assertions.assertTrue(map.isEmpty()); + Assertions.assertEquals(0, map.size()); + } + + @Test + void testPutAll() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + Map<Long, String> source = new HashMap<>(); + source.put(1L, "one"); + source.put(2L, "two"); + source.put(3L, "three"); + map.putAll(source); + Assertions.assertEquals(3, map.size()); + Assertions.assertEquals("two", map.get(2L)); + } + + @Test + void testPutIfAbsent() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + Assertions.assertNull(map.putIfAbsent(1L, "one")); + Assertions.assertEquals("one", map.putIfAbsent(1L, "ONE")); + Assertions.assertEquals("one", map.get(1L)); + } + + @Test + void testComputeIfAbsent() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + String val = map.computeIfAbsent(1L, k -> "computed-" + k); + Assertions.assertEquals("computed-1", val); + // Should not recompute + String val2 = map.computeIfAbsent(1L, k -> "recomputed-" + k); + Assertions.assertEquals("computed-1", val2); + } + + @Test + void testComputeIfPresent() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + // Not present — should return null + Assertions.assertNull(map.computeIfPresent(1L, (k, v) -> v + "-updated")); + + map.put(1L, "one"); + String val = map.computeIfPresent(1L, (k, v) -> v + "-updated"); + Assertions.assertEquals("one-updated", val); + Assertions.assertEquals("one-updated", map.get(1L)); + + // Return null to remove + Assertions.assertNull(map.computeIfPresent(1L, (k, v) -> null)); + Assertions.assertFalse(map.containsKey(1L)); + } + + @Test + void testEntrySet() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + map.put(1L, "one"); + map.put(2L, "two"); + + ObjectSet<Long2ObjectMap.Entry<String>> entries = map.long2ObjectEntrySet(); + Assertions.assertEquals(2, entries.size()); + + Set<Long> keys = new HashSet<>(); + for (Long2ObjectMap.Entry<String> entry : entries) { + keys.add(entry.getLongKey()); + } + Assertions.assertTrue(keys.contains(1L)); + Assertions.assertTrue(keys.contains(2L)); + } + + @Test + void testKeySet() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + map.put(10L, "ten"); + map.put(20L, "twenty"); + LongSet keys = map.keySet(); + Assertions.assertEquals(2, keys.size()); + Assertions.assertTrue(keys.contains(10L)); + Assertions.assertTrue(keys.contains(20L)); + } + + @Test + void testValues() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + map.put(1L, "one"); + map.put(2L, "two"); + ObjectCollection<String> values = map.values(); + Assertions.assertEquals(2, values.size()); + Assertions.assertTrue(values.contains("one")); + Assertions.assertTrue(values.contains("two")); + } + + @Test + void testStream() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + for (long i = 0; i < 100; i++) { + map.put(i, "val-" + i); + } + long count = map.values().stream().filter(v -> v.startsWith("val-")).count(); + Assertions.assertEquals(100, count); + } + + @Test + void testForEach() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + map.put(1L, "one"); + map.put(2L, "two"); + Map<Long, String> collected = new HashMap<>(); + map.forEach(collected::put); + Assertions.assertEquals(2, collected.size()); + Assertions.assertEquals("one", collected.get(1L)); + } + + @Test + void testNullValues() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + map.put(1L, null); + Assertions.assertTrue(map.containsKey(1L)); + Assertions.assertNull(map.get(1L)); + } + + @Test + void testLargeMap() { + ConcurrentLong2ObjectHashMap<Long> map = new ConcurrentLong2ObjectHashMap<>(); + int count = 100_000; + for (long i = 0; i < count; i++) { + map.put(i, i * 2); + } + Assertions.assertEquals(count, map.size()); + for (long i = 0; i < count; i++) { + Assertions.assertEquals(Long.valueOf(i * 2), map.get(i)); + } + } + + @Test + void testCustomSegmentCount() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(4); + for (long i = 0; i < 1000; i++) { + map.put(i, "v" + i); + } + Assertions.assertEquals(1000, map.size()); + } + + @Test + void testInvalidSegmentCount() { + Assertions.assertThrows(IllegalArgumentException.class, () -> new ConcurrentLong2ObjectHashMap<>(3)); + Assertions.assertThrows(IllegalArgumentException.class, () -> new ConcurrentLong2ObjectHashMap<>(0)); + Assertions.assertThrows(IllegalArgumentException.class, () -> new ConcurrentLong2ObjectHashMap<>(-1)); + } + + // ---- Concurrency tests ---- + + @Test + void testConcurrentPuts() throws Exception { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + int threads = 8; + int keysPerThread = 10_000; + ExecutorService executor = Executors.newFixedThreadPool(threads); + CountDownLatch latch = new CountDownLatch(threads); + + for (int t = 0; t < threads; t++) { + final int threadId = t; + executor.submit(() -> { + for (int i = 0; i < keysPerThread; i++) { + long key = (long) threadId * keysPerThread + i; + map.put(key, "t" + threadId + "-" + i); + } + latch.countDown(); + }); + } + latch.await(); + executor.shutdown(); + + Assertions.assertEquals(threads * keysPerThread, map.size()); + } + + @Test + void testConcurrentReadWrite() throws Exception { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + // Pre-populate + for (long i = 0; i < 1000; i++) { + map.put(i, "v" + i); + } + + int threads = 8; + ExecutorService executor = Executors.newFixedThreadPool(threads); + CountDownLatch latch = new CountDownLatch(threads); + AtomicInteger errors = new AtomicInteger(); + + for (int t = 0; t < threads; t++) { + final int threadId = t; + executor.submit(() -> { + try { + for (int i = 0; i < 5000; i++) { + long key = i % 1000; + if (threadId % 2 == 0) { + // Reader + map.get(key); + map.containsKey(key); + } else { + // Writer + map.put(key + 1000L * threadId, "new-" + i); + } + } + } catch (Exception e) { + errors.incrementAndGet(); + } finally { + latch.countDown(); + } + }); + } + latch.await(); + executor.shutdown(); + Assertions.assertEquals(0, errors.get()); + } + + @Test + void testConcurrentComputeIfAbsent() throws Exception { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + int threads = 16; + long sharedKey = 42L; + AtomicInteger computeCount = new AtomicInteger(); + ExecutorService executor = Executors.newFixedThreadPool(threads); + List<Future<String>> futures = new ArrayList<>(); + + for (int t = 0; t < threads; t++) { + futures.add(executor.submit(() -> + map.computeIfAbsent(sharedKey, k -> { + computeCount.incrementAndGet(); + return "computed"; + }) + )); + } + Set<String> results = new HashSet<>(); + for (Future<String> f : futures) { + results.add(f.get()); + } + executor.shutdown(); + + // All threads should get the same value + Assertions.assertEquals(1, results.size()); + Assertions.assertTrue(results.contains("computed")); + // The function should have been called exactly once + Assertions.assertEquals(1, computeCount.get()); + } + + @Test + void testConcurrentIterationDuringModification() throws Exception { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + for (long i = 0; i < 1000; i++) { + map.put(i, "v" + i); + } + + int threads = 4; + ExecutorService executor = Executors.newFixedThreadPool(threads); + CountDownLatch latch = new CountDownLatch(threads); + AtomicInteger errors = new AtomicInteger(); + + for (int t = 0; t < threads; t++) { + final int threadId = t; + executor.submit(() -> { + try { + for (int i = 0; i < 100; i++) { + if (threadId % 2 == 0) { + // Iterator - should not throw + map.keySet(); + map.values(); + map.long2ObjectEntrySet(); + } else { + // Modifier + map.put(1000L + threadId * 100 + i, "new"); + map.remove((long) (i % 500)); + } + } + } catch (Exception e) { + errors.incrementAndGet(); + } finally { + latch.countDown(); + } + }); + } + latch.await(); + executor.shutdown(); + Assertions.assertEquals(0, errors.get()); + } + + // ---- Gson serialization tests ---- + + @Test + void testGsonRoundTrip() { + ConcurrentLong2ObjectHashMap<String> map = new ConcurrentLong2ObjectHashMap<>(); + map.put(100L, "hundred"); + map.put(200L, "two-hundred"); + + String json = GsonUtils.GSON.toJson(map); + + Type type = new TypeToken<ConcurrentLong2ObjectHashMap<String>>() {}.getType(); + ConcurrentLong2ObjectHashMap<String> deserialized = GsonUtils.GSON.fromJson(json, type); + + Assertions.assertEquals(2, deserialized.size()); + Assertions.assertEquals("hundred", deserialized.get(100L)); + Assertions.assertEquals("two-hundred", deserialized.get(200L)); + } + + @Test + void testGsonFormatCompatibleWithConcurrentHashMap() { + // Verify the JSON format matches what ConcurrentHashMap<Long, String> produces + ConcurrentHashMap<Long, String> chm = new ConcurrentHashMap<>(); + chm.put(1L, "one"); + chm.put(2L, "two"); + String chmJson = new Gson().toJson(chm); + + ConcurrentLong2ObjectHashMap<String> fastMap = new ConcurrentLong2ObjectHashMap<>(); + fastMap.put(1L, "one"); + fastMap.put(2L, "two"); + String fastJson = GsonUtils.GSON.toJson(fastMap); + + // Both should be parseable as the same JSON object + Gson gson = new Gson(); + Map<?, ?> chmParsed = gson.fromJson(chmJson, Map.class); + Map<?, ?> fastParsed = gson.fromJson(fastJson, Map.class); + Assertions.assertEquals(chmParsed, fastParsed); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
