Copilot commented on code in PR #61332:
URL: https://github.com/apache/doris/pull/61332#discussion_r2935224695


##########
fe/fe-core/src/main/java/org/apache/doris/common/ConcurrentLong2LongHashMap.java:
##########
@@ -0,0 +1,486 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+import it.unimi.dsi.fastutil.HashCommon;
+import it.unimi.dsi.fastutil.longs.AbstractLong2LongMap;
+import it.unimi.dsi.fastutil.longs.Long2LongFunction;
+import it.unimi.dsi.fastutil.longs.Long2LongMap;
+import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongBinaryOperator;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongSet;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import it.unimi.dsi.fastutil.objects.ObjectSet;
+
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.LongUnaryOperator;
+
+/**
+ * A concurrent map with primitive long keys and primitive long values, backed 
by segmented
+ * {@link Long2LongOpenHashMap} instances with {@link ReentrantReadWriteLock} 
per segment.
+ *
+ * <p>This class saves ~48 bytes per entry compared to {@code 
ConcurrentHashMap<Long, Long>}
+ * by avoiding boxing of both keys and values. For fields like partition 
update row counts
+ * with millions of entries, this translates to hundreds of MB of heap savings.
+ *
+ * <p>The {@link #addTo(long, long)} method provides atomic increment 
semantics, useful for
+ * counter patterns.
+ *
+ * <p><b>Important:</b> All compound operations from both {@link Long2LongMap} 
and {@link Map}
+ * interfaces are overridden to ensure atomicity within a segment's write lock.

Review Comment:
   Similar to the object map: the Javadoc claims all compound operations from 
both `Long2LongMap` and `Map` are overridden for atomicity, but boxed 
`Map`-level compound methods like `putIfAbsent(Long,Long)`, 
`remove(Object,Object)`, `computeIfPresent(Long,...)`, `merge(Long,...)`, etc. 
are not implemented in this class. Either add explicit overrides that lock once 
per segment, or narrow the documentation so it matches actual coverage.
   



##########
fe/fe-core/src/main/java/org/apache/doris/common/ConcurrentLong2LongHashMap.java:
##########
@@ -0,0 +1,486 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+import it.unimi.dsi.fastutil.HashCommon;
+import it.unimi.dsi.fastutil.longs.AbstractLong2LongMap;
+import it.unimi.dsi.fastutil.longs.Long2LongFunction;
+import it.unimi.dsi.fastutil.longs.Long2LongMap;
+import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongBinaryOperator;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongSet;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;

Review Comment:
   Unused imports `it.unimi.dsi.fastutil.longs.LongBinaryOperator` and 
`it.unimi.dsi.fastutil.objects.ObjectArrayList` will fail the FE checkstyle 
`UnusedImports` rule; please remove them (the code uses 
`java.util.function.LongBinaryOperator` fully-qualified and never uses 
`ObjectArrayList`).
   



##########
fe/fe-core/src/main/java/org/apache/doris/common/ConcurrentLong2ObjectHashMap.java:
##########
@@ -0,0 +1,443 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+import it.unimi.dsi.fastutil.HashCommon;
+import it.unimi.dsi.fastutil.longs.AbstractLong2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectFunction;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongSet;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
+import it.unimi.dsi.fastutil.objects.ObjectCollection;
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import it.unimi.dsi.fastutil.objects.ObjectSet;
+
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.LongFunction;
+
+/**
+ * A concurrent map with primitive long keys and object values, backed by 
segmented
+ * {@link Long2ObjectOpenHashMap} instances with {@link 
ReentrantReadWriteLock} per segment.
+ *
+ * <p>This class provides similar concurrency guarantees to {@link 
java.util.concurrent.ConcurrentHashMap}
+ * while avoiding the memory overhead of boxing long keys. For a cluster with 
millions of tablet entries,
+ * this saves ~32 bytes per entry compared to {@code ConcurrentHashMap<Long, 
V>}.
+ *
+ * <p>Iteration methods ({@link #long2ObjectEntrySet()}, {@link #keySet()}, 
{@link #values()})
+ * return snapshot copies and are weakly consistent.
+ *
+ * <p><b>Important:</b> All compound operations (computeIfAbsent, 
computeIfPresent, compute, merge)
+ * from both {@link Long2ObjectMap} and {@link Map} interfaces are overridden 
to ensure atomicity
+ * within a segment. The default interface implementations would call get/put 
as separate locked
+ * operations, breaking atomicity.
+ *
+ * @param <V> the type of mapped values
+ */
+public class ConcurrentLong2ObjectHashMap<V> extends AbstractLong2ObjectMap<V> 
{
+
+    private static final int DEFAULT_SEGMENT_COUNT = 16;
+    private static final int DEFAULT_INITIAL_CAPACITY_PER_SEGMENT = 16;
+
+    private final Segment<V>[] segments;
+    private final int segmentMask;
+    private final int segmentBits;
+
+    public ConcurrentLong2ObjectHashMap() {
+        this(DEFAULT_SEGMENT_COUNT);
+    }
+
+    @SuppressWarnings("unchecked")
+    public ConcurrentLong2ObjectHashMap(int segmentCount) {
+        if (segmentCount <= 0 || (segmentCount & (segmentCount - 1)) != 0) {
+            throw new IllegalArgumentException("segmentCount must be a 
positive power of 2: " + segmentCount);
+        }
+        this.segmentBits = Integer.numberOfTrailingZeros(segmentCount);
+        this.segmentMask = segmentCount - 1;
+        this.segments = new Segment[segmentCount];
+        for (int i = 0; i < segmentCount; i++) {
+            segments[i] = new Segment<>(DEFAULT_INITIAL_CAPACITY_PER_SEGMENT);
+        }
+    }
+
+    private Segment<V> segmentFor(long key) {
+        return segments[(int) (HashCommon.mix(key) >>> (64 - segmentBits)) & 
segmentMask];
+    }
+
+    // ---- Read operations (read-lock) ----
+
+    @Override
+    public V get(long key) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.readLock().lock();
+        try {
+            return seg.map.get(key);
+        } finally {
+            seg.lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public V getOrDefault(long key, V defaultValue) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.readLock().lock();
+        try {
+            return seg.map.getOrDefault(key, defaultValue);
+        } finally {
+            seg.lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean containsKey(long key) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.readLock().lock();
+        try {
+            return seg.map.containsKey(key);
+        } finally {
+            seg.lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean containsValue(Object value) {
+        for (Segment<V> seg : segments) {
+            seg.lock.readLock().lock();
+            try {
+                if (seg.map.containsValue(value)) {
+                    return true;
+                }
+            } finally {
+                seg.lock.readLock().unlock();
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public int size() {
+        long total = 0;
+        for (Segment<V> seg : segments) {
+            seg.lock.readLock().lock();
+            try {
+                total += seg.map.size();
+            } finally {
+                seg.lock.readLock().unlock();
+            }
+        }
+        return (int) Math.min(total, Integer.MAX_VALUE);
+    }
+
+    @Override
+    public boolean isEmpty() {
+        for (Segment<V> seg : segments) {
+            seg.lock.readLock().lock();
+            try {
+                if (!seg.map.isEmpty()) {
+                    return false;
+                }
+            } finally {
+                seg.lock.readLock().unlock();
+            }
+        }
+        return true;
+    }
+
+    // ---- Write operations (write-lock) ----
+
+    @Override
+    public V put(long key, V value) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            return seg.map.put(key, value);
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public V remove(long key) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            return seg.map.remove(key);
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public V putIfAbsent(long key, V value) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            return seg.map.putIfAbsent(key, value);
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean replace(long key, V oldValue, V newValue) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            return seg.map.replace(key, oldValue, newValue);
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public V replace(long key, V value) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            return seg.map.replace(key, value);
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public void clear() {
+        for (Segment<V> seg : segments) {
+            seg.lock.writeLock().lock();
+            try {
+                seg.map.clear();
+            } finally {
+                seg.lock.writeLock().unlock();
+            }
+        }
+    }
+
+    @Override
+    public void putAll(Map<? extends Long, ? extends V> m) {
+        for (Map.Entry<? extends Long, ? extends V> entry : m.entrySet()) {
+            put(entry.getKey().longValue(), entry.getValue());
+        }
+    }
+
+    // ---- Atomic compound operations ----
+    // Override ALL compound methods from both Long2ObjectMap and Map 
interfaces
+    // to ensure the check-then-act is atomic within a segment's write lock.
+
+    @Override
+    public V computeIfAbsent(long key, LongFunction<? extends V> 
mappingFunction) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            V val = seg.map.get(key);
+            if (val != null || seg.map.containsKey(key)) {
+                return val;
+            }
+            V newValue = mappingFunction.apply(key);
+            seg.map.put(key, newValue);
+            return newValue;

Review Comment:
   `computeIfAbsent` should not insert a mapping when the mapping function 
returns null (per `Map.computeIfAbsent` contract). Currently a null return is 
still `put` into the underlying map, which also makes absent-vs-null-mapped 
semantics ambiguous.



##########
fe/fe-core/src/test/java/org/apache/doris/common/ConcurrentLong2LongHashMapTest.java:
##########
@@ -0,0 +1,455 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+import com.google.gson.Gson;
+import it.unimi.dsi.fastutil.longs.Long2LongMap;
+import it.unimi.dsi.fastutil.longs.LongSet;
+import it.unimi.dsi.fastutil.objects.ObjectSet;
+import org.apache.doris.persist.gson.GsonUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+class ConcurrentLong2LongHashMapTest {
+
+    @Test
+    void testPutAndGet() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        map.put(1L, 100L);
+        Assertions.assertEquals(100L, map.get(1L));
+        map.put(1L, 200L);
+        Assertions.assertEquals(200L, map.get(1L));
+    }
+
+    @Test
+    void testGetMissingKeyReturnsDefaultReturnValue() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        // Default return value is 0L
+        Assertions.assertEquals(0L, map.get(999L));
+    }
+
+    @Test
+    void testGetOrDefault() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        map.put(1L, 100L);
+        Assertions.assertEquals(100L, map.getOrDefault(1L, -1L));
+        Assertions.assertEquals(-1L, map.getOrDefault(2L, -1L));
+    }
+
+    @Test
+    void testRemove() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        map.put(1L, 100L);
+        Assertions.assertEquals(100L, map.remove(1L));
+        Assertions.assertFalse(map.containsKey(1L));
+        // Remove non-existent key returns defaultReturnValue
+        Assertions.assertEquals(0L, map.remove(1L));
+    }
+
+    @Test
+    void testContainsKey() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        Assertions.assertFalse(map.containsKey(1L));
+        map.put(1L, 0L);
+        Assertions.assertTrue(map.containsKey(1L));
+    }
+
+    @Test
+    void testContainsValue() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        map.put(1L, 100L);
+        map.put(2L, 200L);
+        Assertions.assertTrue(map.containsValue(100L));
+        Assertions.assertFalse(map.containsValue(300L));
+    }
+
+    @Test
+    void testSizeAndIsEmpty() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        Assertions.assertTrue(map.isEmpty());
+        Assertions.assertEquals(0, map.size());
+        map.put(1L, 100L);
+        map.put(2L, 200L);
+        Assertions.assertFalse(map.isEmpty());
+        Assertions.assertEquals(2, map.size());
+    }
+
+    @Test
+    void testClear() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        map.put(1L, 100L);
+        map.put(2L, 200L);
+        map.clear();
+        Assertions.assertTrue(map.isEmpty());
+        Assertions.assertEquals(0, map.size());
+    }
+
+    @Test
+    void testPutAll() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        Map<Long, Long> source = new HashMap<>();
+        source.put(1L, 100L);
+        source.put(2L, 200L);
+        source.put(3L, 300L);
+        map.putAll(source);
+        Assertions.assertEquals(3, map.size());
+        Assertions.assertEquals(200L, map.get(2L));
+    }
+
+    @Test
+    void testPutIfAbsent() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        Assertions.assertEquals(0L, map.putIfAbsent(1L, 100L));
+        Assertions.assertEquals(100L, map.putIfAbsent(1L, 200L));
+        Assertions.assertEquals(100L, map.get(1L));
+    }
+
+    @Test
+    void testComputeIfAbsent() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        long val = map.computeIfAbsent(1L, k -> k * 10);
+        Assertions.assertEquals(10L, val);
+        // Should not recompute
+        long val2 = map.computeIfAbsent(1L, k -> k * 20);
+        Assertions.assertEquals(10L, val2);
+    }
+
+    // ---- addTo tests ----
+
+    @Test
+    void testAddToNewKey() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        long result = map.addTo(1L, 5L);
+        Assertions.assertEquals(5L, result);
+        Assertions.assertEquals(5L, map.get(1L));
+    }
+
+    @Test
+    void testAddToExistingKey() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        map.put(1L, 10L);
+        long result = map.addTo(1L, 5L);
+        Assertions.assertEquals(15L, result);
+        Assertions.assertEquals(15L, map.get(1L));
+    }
+
+    @Test
+    void testAddToNegative() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        map.put(1L, 10L);
+        long result = map.addTo(1L, -3L);
+        Assertions.assertEquals(7L, result);
+    }
+
+    // ---- Iteration tests ----
+
+    @Test
+    void testEntrySet() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        map.put(1L, 100L);
+        map.put(2L, 200L);
+
+        ObjectSet<Long2LongMap.Entry> entries = map.long2LongEntrySet();
+        Assertions.assertEquals(2, entries.size());
+
+        Set<Long> keys = new HashSet<>();
+        for (Long2LongMap.Entry entry : entries) {
+            keys.add(entry.getLongKey());
+        }
+        Assertions.assertTrue(keys.contains(1L));
+        Assertions.assertTrue(keys.contains(2L));
+    }
+
+    @Test
+    void testKeySet() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        map.put(10L, 100L);
+        map.put(20L, 200L);
+        LongSet keys = map.keySet();
+        Assertions.assertEquals(2, keys.size());
+        Assertions.assertTrue(keys.contains(10L));
+        Assertions.assertTrue(keys.contains(20L));
+    }
+
+    @Test
+    void testValues() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        map.put(1L, 100L);
+        map.put(2L, 200L);
+        it.unimi.dsi.fastutil.longs.LongCollection values = map.values();
+        Assertions.assertEquals(2, values.size());
+        Assertions.assertTrue(values.contains(100L));
+        Assertions.assertTrue(values.contains(200L));
+    }
+
+    @Test
+    void testForEach() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        map.put(1L, 100L);
+        map.put(2L, 200L);
+        Map<Long, Long> collected = new HashMap<>();
+        map.forEach(collected::put);
+        Assertions.assertEquals(2, collected.size());
+        Assertions.assertEquals(100L, (long) collected.get(1L));
+    }
+
+    // ---- Large map test ----
+
+    @Test
+    void testLargeMap() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        int count = 100_000;
+        for (long i = 0; i < count; i++) {
+            map.put(i, i * 3);
+        }
+        Assertions.assertEquals(count, map.size());
+        for (long i = 0; i < count; i++) {
+            Assertions.assertEquals(i * 3, map.get(i));
+        }
+    }
+
+    @Test
+    void testCustomSegmentCount() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap(4);
+        for (long i = 0; i < 1000; i++) {
+            map.put(i, i);
+        }
+        Assertions.assertEquals(1000, map.size());
+    }
+
+    @Test
+    void testInvalidSegmentCount() {
+        Assertions.assertThrows(IllegalArgumentException.class, () -> new 
ConcurrentLong2LongHashMap(3));
+        Assertions.assertThrows(IllegalArgumentException.class, () -> new 
ConcurrentLong2LongHashMap(0));
+    }
+
+    // ---- Concurrency tests ----
+
+    @Test
+    void testConcurrentPuts() throws Exception {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        int threads = 8;
+        int keysPerThread = 10_000;
+        ExecutorService executor = Executors.newFixedThreadPool(threads);
+        CountDownLatch latch = new CountDownLatch(threads);
+
+        for (int t = 0; t < threads; t++) {
+            final int threadId = t;
+            executor.submit(() -> {
+                for (int i = 0; i < keysPerThread; i++) {
+                    long key = (long) threadId * keysPerThread + i;
+                    map.put(key, key * 2);
+                }
+                latch.countDown();
+            });
+        }
+        latch.await();
+        executor.shutdown();
+        Assertions.assertEquals(threads * keysPerThread, map.size());
+    }
+
+    @Test
+    void testConcurrentAddTo() throws Exception {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        int threads = 16;
+        int incrementsPerThread = 10_000;
+        long key = 42L;
+        ExecutorService executor = Executors.newFixedThreadPool(threads);
+        CountDownLatch latch = new CountDownLatch(threads);
+
+        for (int t = 0; t < threads; t++) {
+            executor.submit(() -> {
+                for (int i = 0; i < incrementsPerThread; i++) {
+                    map.addTo(key, 1L);
+                }
+                latch.countDown();
+            });
+        }
+        latch.await();
+        executor.shutdown();
+
+        Assertions.assertEquals((long) threads * incrementsPerThread, 
map.get(key));
+    }
+
+    @Test
+    void testConcurrentReadWrite() throws Exception {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        for (long i = 0; i < 1000; i++) {
+            map.put(i, i);
+        }
+
+        int threads = 8;
+        ExecutorService executor = Executors.newFixedThreadPool(threads);
+        CountDownLatch latch = new CountDownLatch(threads);
+        AtomicInteger errors = new AtomicInteger();
+
+        for (int t = 0; t < threads; t++) {
+            final int threadId = t;
+            executor.submit(() -> {
+                try {
+                    for (int i = 0; i < 5000; i++) {
+                        long key = i % 1000;
+                        if (threadId % 2 == 0) {
+                            map.get(key);
+                            map.containsKey(key);
+                        } else {
+                            map.put(key + 1000L * threadId, (long) i);
+                        }
+                    }
+                } catch (Exception e) {
+                    errors.incrementAndGet();
+                } finally {
+                    latch.countDown();
+                }
+            });
+        }
+        latch.await();
+        executor.shutdown();
+        Assertions.assertEquals(0, errors.get());
+    }
+
+    @Test
+    void testConcurrentComputeIfAbsent() throws Exception {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        int threads = 16;
+        long sharedKey = 42L;
+        AtomicInteger computeCount = new AtomicInteger();
+        ExecutorService executor = Executors.newFixedThreadPool(threads);
+        List<Future<Long>> futures = new ArrayList<>();
+
+        for (int t = 0; t < threads; t++) {
+            futures.add(executor.submit(() ->
+                    map.computeIfAbsent(sharedKey, k -> {
+                        computeCount.incrementAndGet();
+                        return k * 10;
+                    })
+            ));
+        }
+        Set<Long> results = new HashSet<>();
+        for (Future<Long> f : futures) {
+            results.add(f.get());
+        }
+        executor.shutdown();
+
+        Assertions.assertEquals(1, results.size());
+        Assertions.assertTrue(results.contains(420L));
+        Assertions.assertEquals(1, computeCount.get());
+    }
+
+    @Test
+    void testConcurrentIterationDuringModification() throws Exception {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        for (long i = 0; i < 1000; i++) {
+            map.put(i, i);
+        }
+
+        int threads = 4;
+        ExecutorService executor = Executors.newFixedThreadPool(threads);
+        CountDownLatch latch = new CountDownLatch(threads);
+        AtomicInteger errors = new AtomicInteger();
+
+        for (int t = 0; t < threads; t++) {
+            final int threadId = t;
+            executor.submit(() -> {
+                try {
+                    for (int i = 0; i < 100; i++) {
+                        if (threadId % 2 == 0) {
+                            map.keySet();
+                            map.values();
+                            map.long2LongEntrySet();
+                        } else {
+                            map.put(1000L + threadId * 100 + i, (long) i);
+                            map.remove((long) (i % 500));
+                        }
+                    }
+                } catch (Exception e) {
+                    errors.incrementAndGet();
+                } finally {
+                    latch.countDown();
+                }
+            });
+        }
+        latch.await();
+        executor.shutdown();
+        Assertions.assertEquals(0, errors.get());
+    }
+
+    // ---- Gson serialization tests ----
+
+    @Test
+    void testGsonRoundTrip() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        map.put(100L, 1000L);
+        map.put(200L, 2000L);
+
+        String json = GsonUtils.GSON.toJson(map);
+
+        ConcurrentLong2LongHashMap deserialized = 
GsonUtils.GSON.fromJson(json, ConcurrentLong2LongHashMap.class);
+
+        Assertions.assertEquals(2, deserialized.size());
+        Assertions.assertEquals(1000L, deserialized.get(100L));
+        Assertions.assertEquals(2000L, deserialized.get(200L));
+    }
+
+    @Test
+    void testGsonFormatCompatibleWithConcurrentHashMap() {
+        ConcurrentHashMap<Long, Long> chm = new ConcurrentHashMap<>();
+        chm.put(1L, 100L);
+        chm.put(2L, 200L);
+        String chmJson = new Gson().toJson(chm);
+
+        ConcurrentLong2LongHashMap fastMap = new ConcurrentLong2LongHashMap();
+        fastMap.put(1L, 100L);
+        fastMap.put(2L, 200L);
+        String fastJson = GsonUtils.GSON.toJson(fastMap);
+
+        Gson gson = new Gson();
+        Map<?, ?> chmParsed = gson.fromJson(chmJson, Map.class);
+        Map<?, ?> fastParsed = gson.fromJson(fastJson, Map.class);
+        Assertions.assertEquals(chmParsed, fastParsed);
+    }
+
+    @Test
+    void testDefaultReturnValueBehavior() {
+        ConcurrentLong2LongHashMap map = new ConcurrentLong2LongHashMap();
+        // Primitive get returns 0L (defaultReturnValue) for missing keys
+        Assertions.assertEquals(0L, map.get(999L));
+
+        // Store 0L explicitly
+        map.put(1L, 0L);
+        Assertions.assertTrue(map.containsKey(1L));
+        Assertions.assertEquals(0L, map.get(1L));
+
+        // Boxed get via Map<Long,Long> interface returns null for missing keys
+        Long boxedResult = map.getOrDefault(999L, map.defaultReturnValue());
+        Assertions.assertEquals(0L, boxedResult);

Review Comment:
   The comment says “Boxed get via Map<Long,Long> interface returns null for 
missing keys”, but the code uses `getOrDefault(...)` which never returns null 
(it returns the provided default). Either adjust the comment or add an 
assertion that actually exercises `((Map<Long, Long>) map).get(999L)` if that 
behavior is important.
   



##########
fe/fe-core/src/main/java/org/apache/doris/common/ConcurrentLong2LongHashMap.java:
##########
@@ -0,0 +1,486 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+import it.unimi.dsi.fastutil.HashCommon;
+import it.unimi.dsi.fastutil.longs.AbstractLong2LongMap;
+import it.unimi.dsi.fastutil.longs.Long2LongFunction;
+import it.unimi.dsi.fastutil.longs.Long2LongMap;
+import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongBinaryOperator;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongSet;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import it.unimi.dsi.fastutil.objects.ObjectSet;
+
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.LongUnaryOperator;
+
+/**
+ * A concurrent map with primitive long keys and primitive long values, backed 
by segmented
+ * {@link Long2LongOpenHashMap} instances with {@link ReentrantReadWriteLock} 
per segment.
+ *
+ * <p>This class saves ~48 bytes per entry compared to {@code 
ConcurrentHashMap<Long, Long>}
+ * by avoiding boxing of both keys and values. For fields like partition 
update row counts
+ * with millions of entries, this translates to hundreds of MB of heap savings.
+ *
+ * <p>The {@link #addTo(long, long)} method provides atomic increment 
semantics, useful for
+ * counter patterns.
+ *
+ * <p><b>Important:</b> All compound operations from both {@link Long2LongMap} 
and {@link Map}
+ * interfaces are overridden to ensure atomicity within a segment's write lock.
+ */
+public class ConcurrentLong2LongHashMap extends AbstractLong2LongMap {
+
+    private static final int DEFAULT_SEGMENT_COUNT = 16;
+    private static final int DEFAULT_INITIAL_CAPACITY_PER_SEGMENT = 16;
+
+    private final Segment[] segments;
+    private final int segmentMask;
+    private final int segmentBits;
+
+    public ConcurrentLong2LongHashMap() {
+        this(DEFAULT_SEGMENT_COUNT);
+    }
+
+    public ConcurrentLong2LongHashMap(int segmentCount) {
+        if (segmentCount <= 0 || (segmentCount & (segmentCount - 1)) != 0) {
+            throw new IllegalArgumentException("segmentCount must be a 
positive power of 2: " + segmentCount);
+        }
+        this.segmentBits = Integer.numberOfTrailingZeros(segmentCount);
+        this.segmentMask = segmentCount - 1;
+        this.segments = new Segment[segmentCount];
+        for (int i = 0; i < segmentCount; i++) {
+            segments[i] = new Segment(DEFAULT_INITIAL_CAPACITY_PER_SEGMENT);
+        }
+    }
+
+    private Segment segmentFor(long key) {
+        return segments[(int) (HashCommon.mix(key) >>> (64 - segmentBits)) & 
segmentMask];
+    }
+
+    // ---- Read operations (read-lock) ----
+
+    @Override
+    public long get(long key) {
+        Segment seg = segmentFor(key);
+        seg.lock.readLock().lock();
+        try {
+            return seg.map.get(key);
+        } finally {
+            seg.lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public long getOrDefault(long key, long defaultValue) {
+        Segment seg = segmentFor(key);
+        seg.lock.readLock().lock();
+        try {
+            return seg.map.getOrDefault(key, defaultValue);
+        } finally {
+            seg.lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean containsKey(long key) {
+        Segment seg = segmentFor(key);
+        seg.lock.readLock().lock();
+        try {
+            return seg.map.containsKey(key);
+        } finally {
+            seg.lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean containsValue(long value) {
+        for (Segment seg : segments) {
+            seg.lock.readLock().lock();
+            try {
+                if (seg.map.containsValue(value)) {
+                    return true;
+                }
+            } finally {
+                seg.lock.readLock().unlock();
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public int size() {
+        long total = 0;
+        for (Segment seg : segments) {
+            seg.lock.readLock().lock();
+            try {
+                total += seg.map.size();
+            } finally {
+                seg.lock.readLock().unlock();
+            }
+        }
+        return (int) Math.min(total, Integer.MAX_VALUE);
+    }
+
+    @Override
+    public boolean isEmpty() {
+        for (Segment seg : segments) {
+            seg.lock.readLock().lock();
+            try {
+                if (!seg.map.isEmpty()) {
+                    return false;
+                }
+            } finally {
+                seg.lock.readLock().unlock();
+            }
+        }
+        return true;
+    }
+
+    // ---- Write operations (write-lock) ----
+
+    @Override
+    public long put(long key, long value) {
+        Segment seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            return seg.map.put(key, value);
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public long remove(long key) {
+        Segment seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            return seg.map.remove(key);
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public long putIfAbsent(long key, long value) {
+        Segment seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            return seg.map.putIfAbsent(key, value);
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean replace(long key, long oldValue, long newValue) {
+        Segment seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            return seg.map.replace(key, oldValue, newValue);
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public long replace(long key, long value) {
+        Segment seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            return seg.map.replace(key, value);
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public void clear() {
+        for (Segment seg : segments) {
+            seg.lock.writeLock().lock();
+            try {
+                seg.map.clear();
+            } finally {
+                seg.lock.writeLock().unlock();
+            }
+        }
+    }
+
+    @Override
+    public void putAll(Map<? extends Long, ? extends Long> m) {
+        for (Map.Entry<? extends Long, ? extends Long> entry : m.entrySet()) {
+            put(entry.getKey().longValue(), entry.getValue().longValue());
+        }
+    }
+
+    // ---- Atomic compound operations ----
+    // Override ALL compound methods from both Long2LongMap and Map interfaces.
+
+    /**
+     * Atomically adds the given increment to the value associated with the 
key.
+     * If the key is not present, the entry is created with the increment as 
value
+     * (starting from defaultReturnValue, which is 0L by default).
+     *
+     * @return the new value after the increment
+     */
+    public long addTo(long key, long increment) {
+        Segment seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            long newValue = seg.map.addTo(key, increment) + increment;
+            return newValue;
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public long computeIfAbsent(long key, LongUnaryOperator mappingFunction) {
+        Segment seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            if (seg.map.containsKey(key)) {
+                return seg.map.get(key);
+            }
+            long newValue = mappingFunction.applyAsLong(key);
+            seg.map.put(key, newValue);
+            return newValue;
+        } finally {

Review Comment:
   `ConcurrentLong2LongHashMap` extends fastutil's function types, so callers 
can change `defaultReturnValue(...)`. The wrapper currently calls 
`seg.map.get/remove/...` directly, but each segment's `Long2LongOpenHashMap` 
defaultReturnValue remains its own (0 by default), so the wrapper's 
`defaultReturnValue()` can diverge from actual read/remove behavior. Consider 
overriding `defaultReturnValue(long)` to propagate into all segments (and 
ensure reads/removes reflect it), or explicitly document/guard that the default 
return value is fixed at 0.



##########
fe/fe-core/src/main/java/org/apache/doris/common/ConcurrentLong2ObjectHashMap.java:
##########
@@ -0,0 +1,443 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+import it.unimi.dsi.fastutil.HashCommon;
+import it.unimi.dsi.fastutil.longs.AbstractLong2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectFunction;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongSet;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
+import it.unimi.dsi.fastutil.objects.ObjectCollection;
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import it.unimi.dsi.fastutil.objects.ObjectSet;
+
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.LongFunction;
+
+/**
+ * A concurrent map with primitive long keys and object values, backed by 
segmented
+ * {@link Long2ObjectOpenHashMap} instances with {@link 
ReentrantReadWriteLock} per segment.
+ *
+ * <p>This class provides similar concurrency guarantees to {@link 
java.util.concurrent.ConcurrentHashMap}
+ * while avoiding the memory overhead of boxing long keys. For a cluster with 
millions of tablet entries,
+ * this saves ~32 bytes per entry compared to {@code ConcurrentHashMap<Long, 
V>}.
+ *
+ * <p>Iteration methods ({@link #long2ObjectEntrySet()}, {@link #keySet()}, 
{@link #values()})
+ * return snapshot copies and are weakly consistent.
+ *
+ * <p><b>Important:</b> All compound operations (computeIfAbsent, 
computeIfPresent, compute, merge)
+ * from both {@link Long2ObjectMap} and {@link Map} interfaces are overridden 
to ensure atomicity

Review Comment:
   The class Javadoc states that *all* compound operations from both 
`Long2ObjectMap` and `Map` are overridden for atomicity, but key `Map`-level 
compound methods (e.g. `remove(Object key, Object value)`, `putIfAbsent(Long, 
V)`, `computeIfPresent(Long, ...)`, `merge(Long, ...)`, etc.) are not 
implemented in this class. Either add explicit overrides that take the segment 
write-lock once, or narrow the documentation to avoid over-promising atomicity.
   



##########
fe/fe-core/src/main/java/org/apache/doris/common/ConcurrentLong2ObjectHashMap.java:
##########
@@ -0,0 +1,443 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+import it.unimi.dsi.fastutil.HashCommon;
+import it.unimi.dsi.fastutil.longs.AbstractLong2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectFunction;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongSet;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
+import it.unimi.dsi.fastutil.objects.ObjectCollection;
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import it.unimi.dsi.fastutil.objects.ObjectSet;
+
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.LongFunction;
+
+/**
+ * A concurrent map with primitive long keys and object values, backed by 
segmented
+ * {@link Long2ObjectOpenHashMap} instances with {@link 
ReentrantReadWriteLock} per segment.
+ *
+ * <p>This class provides similar concurrency guarantees to {@link 
java.util.concurrent.ConcurrentHashMap}
+ * while avoiding the memory overhead of boxing long keys. For a cluster with 
millions of tablet entries,
+ * this saves ~32 bytes per entry compared to {@code ConcurrentHashMap<Long, 
V>}.
+ *
+ * <p>Iteration methods ({@link #long2ObjectEntrySet()}, {@link #keySet()}, 
{@link #values()})
+ * return snapshot copies and are weakly consistent.
+ *
+ * <p><b>Important:</b> All compound operations (computeIfAbsent, 
computeIfPresent, compute, merge)
+ * from both {@link Long2ObjectMap} and {@link Map} interfaces are overridden 
to ensure atomicity
+ * within a segment. The default interface implementations would call get/put 
as separate locked
+ * operations, breaking atomicity.
+ *
+ * @param <V> the type of mapped values
+ */
+public class ConcurrentLong2ObjectHashMap<V> extends AbstractLong2ObjectMap<V> 
{
+
+    private static final int DEFAULT_SEGMENT_COUNT = 16;
+    private static final int DEFAULT_INITIAL_CAPACITY_PER_SEGMENT = 16;
+
+    private final Segment<V>[] segments;
+    private final int segmentMask;
+    private final int segmentBits;
+
+    public ConcurrentLong2ObjectHashMap() {
+        this(DEFAULT_SEGMENT_COUNT);
+    }
+
+    @SuppressWarnings("unchecked")
+    public ConcurrentLong2ObjectHashMap(int segmentCount) {
+        if (segmentCount <= 0 || (segmentCount & (segmentCount - 1)) != 0) {
+            throw new IllegalArgumentException("segmentCount must be a 
positive power of 2: " + segmentCount);
+        }
+        this.segmentBits = Integer.numberOfTrailingZeros(segmentCount);
+        this.segmentMask = segmentCount - 1;
+        this.segments = new Segment[segmentCount];
+        for (int i = 0; i < segmentCount; i++) {
+            segments[i] = new Segment<>(DEFAULT_INITIAL_CAPACITY_PER_SEGMENT);
+        }
+    }
+
+    private Segment<V> segmentFor(long key) {
+        return segments[(int) (HashCommon.mix(key) >>> (64 - segmentBits)) & 
segmentMask];
+    }
+
+    // ---- Read operations (read-lock) ----
+
+    @Override
+    public V get(long key) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.readLock().lock();
+        try {
+            return seg.map.get(key);
+        } finally {
+            seg.lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public V getOrDefault(long key, V defaultValue) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.readLock().lock();
+        try {
+            return seg.map.getOrDefault(key, defaultValue);
+        } finally {
+            seg.lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean containsKey(long key) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.readLock().lock();
+        try {
+            return seg.map.containsKey(key);
+        } finally {
+            seg.lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean containsValue(Object value) {
+        for (Segment<V> seg : segments) {
+            seg.lock.readLock().lock();
+            try {
+                if (seg.map.containsValue(value)) {
+                    return true;
+                }
+            } finally {
+                seg.lock.readLock().unlock();
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public int size() {
+        long total = 0;
+        for (Segment<V> seg : segments) {
+            seg.lock.readLock().lock();
+            try {
+                total += seg.map.size();
+            } finally {
+                seg.lock.readLock().unlock();
+            }
+        }
+        return (int) Math.min(total, Integer.MAX_VALUE);
+    }
+
+    @Override
+    public boolean isEmpty() {
+        for (Segment<V> seg : segments) {
+            seg.lock.readLock().lock();
+            try {
+                if (!seg.map.isEmpty()) {
+                    return false;
+                }
+            } finally {
+                seg.lock.readLock().unlock();
+            }
+        }
+        return true;
+    }
+
+    // ---- Write operations (write-lock) ----
+
+    @Override
+    public V put(long key, V value) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            return seg.map.put(key, value);
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public V remove(long key) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            return seg.map.remove(key);
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public V putIfAbsent(long key, V value) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            return seg.map.putIfAbsent(key, value);
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean replace(long key, V oldValue, V newValue) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            return seg.map.replace(key, oldValue, newValue);
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public V replace(long key, V value) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            return seg.map.replace(key, value);
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public void clear() {
+        for (Segment<V> seg : segments) {
+            seg.lock.writeLock().lock();
+            try {
+                seg.map.clear();
+            } finally {
+                seg.lock.writeLock().unlock();
+            }
+        }
+    }
+
+    @Override
+    public void putAll(Map<? extends Long, ? extends V> m) {
+        for (Map.Entry<? extends Long, ? extends V> entry : m.entrySet()) {
+            put(entry.getKey().longValue(), entry.getValue());
+        }
+    }
+
+    // ---- Atomic compound operations ----
+    // Override ALL compound methods from both Long2ObjectMap and Map 
interfaces
+    // to ensure the check-then-act is atomic within a segment's write lock.
+
+    @Override
+    public V computeIfAbsent(long key, LongFunction<? extends V> 
mappingFunction) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            V val = seg.map.get(key);
+            if (val != null || seg.map.containsKey(key)) {
+                return val;
+            }
+            V newValue = mappingFunction.apply(key);
+            seg.map.put(key, newValue);
+            return newValue;
+        } finally {
+            seg.lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public V computeIfAbsent(long key, Long2ObjectFunction<? extends V> 
mappingFunction) {
+        Segment<V> seg = segmentFor(key);
+        seg.lock.writeLock().lock();
+        try {
+            V val = seg.map.get(key);
+            if (val != null || seg.map.containsKey(key)) {
+                return val;
+            }
+            V newValue = mappingFunction.get(key);
+            seg.map.put(key, newValue);
+            return newValue;

Review Comment:
   Same issue as the other overload: if `mappingFunction.get(key)` returns 
null, `computeIfAbsent` must not store the key (it should return null and leave 
the map unchanged).



##########
fe/fe-core/src/test/java/org/apache/doris/common/ConcurrentLong2ObjectHashMapTest.java:
##########
@@ -0,0 +1,432 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.LongSet;
+import it.unimi.dsi.fastutil.objects.ObjectCollection;
+import it.unimi.dsi.fastutil.objects.ObjectSet;
+import org.apache.doris.persist.gson.GsonUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+class ConcurrentLong2ObjectHashMapTest {
+
+    @Test
+    void testPutAndGet() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        Assertions.assertNull(map.put(1L, "one"));
+        Assertions.assertEquals("one", map.get(1L));
+        Assertions.assertEquals("one", map.put(1L, "ONE"));
+        Assertions.assertEquals("ONE", map.get(1L));
+    }
+
+    @Test
+    void testGetMissingKey() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        Assertions.assertNull(map.get(999L));
+    }
+
+    @Test
+    void testGetOrDefault() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        map.put(1L, "one");
+        Assertions.assertEquals("one", map.getOrDefault(1L, "default"));
+        Assertions.assertEquals("default", map.getOrDefault(2L, "default"));
+    }
+
+    @Test
+    void testRemove() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        map.put(1L, "one");
+        Assertions.assertEquals("one", map.remove(1L));
+        Assertions.assertNull(map.get(1L));
+        Assertions.assertNull(map.remove(1L));
+    }
+
+    @Test
+    void testContainsKey() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        Assertions.assertFalse(map.containsKey(1L));
+        map.put(1L, "one");
+        Assertions.assertTrue(map.containsKey(1L));
+    }
+
+    @Test
+    void testContainsValue() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        map.put(1L, "one");
+        map.put(2L, "two");
+        Assertions.assertTrue(map.containsValue("one"));
+        Assertions.assertFalse(map.containsValue("three"));
+    }
+
+    @Test
+    void testSizeAndIsEmpty() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        Assertions.assertTrue(map.isEmpty());
+        Assertions.assertEquals(0, map.size());
+        map.put(1L, "one");
+        map.put(2L, "two");
+        Assertions.assertFalse(map.isEmpty());
+        Assertions.assertEquals(2, map.size());
+    }
+
+    @Test
+    void testClear() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        map.put(1L, "one");
+        map.put(2L, "two");
+        map.clear();
+        Assertions.assertTrue(map.isEmpty());
+        Assertions.assertEquals(0, map.size());
+    }
+
+    @Test
+    void testPutAll() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        Map<Long, String> source = new HashMap<>();
+        source.put(1L, "one");
+        source.put(2L, "two");
+        source.put(3L, "three");
+        map.putAll(source);
+        Assertions.assertEquals(3, map.size());
+        Assertions.assertEquals("two", map.get(2L));
+    }
+
+    @Test
+    void testPutIfAbsent() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        Assertions.assertNull(map.putIfAbsent(1L, "one"));
+        Assertions.assertEquals("one", map.putIfAbsent(1L, "ONE"));
+        Assertions.assertEquals("one", map.get(1L));
+    }
+
+    @Test
+    void testComputeIfAbsent() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        String val = map.computeIfAbsent(1L, k -> "computed-" + k);
+        Assertions.assertEquals("computed-1", val);
+        // Should not recompute
+        String val2 = map.computeIfAbsent(1L, k -> "recomputed-" + k);
+        Assertions.assertEquals("computed-1", val2);
+    }
+
+    @Test
+    void testComputeIfPresent() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        // Not present — should return null
+        Assertions.assertNull(map.computeIfPresent(1L, (k, v) -> v + 
"-updated"));
+
+        map.put(1L, "one");
+        String val = map.computeIfPresent(1L, (k, v) -> v + "-updated");
+        Assertions.assertEquals("one-updated", val);
+        Assertions.assertEquals("one-updated", map.get(1L));
+
+        // Return null to remove
+        Assertions.assertNull(map.computeIfPresent(1L, (k, v) -> null));
+        Assertions.assertFalse(map.containsKey(1L));
+    }
+
+    @Test
+    void testEntrySet() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        map.put(1L, "one");
+        map.put(2L, "two");
+
+        ObjectSet<Long2ObjectMap.Entry<String>> entries = 
map.long2ObjectEntrySet();
+        Assertions.assertEquals(2, entries.size());
+
+        Set<Long> keys = new HashSet<>();
+        for (Long2ObjectMap.Entry<String> entry : entries) {
+            keys.add(entry.getLongKey());
+        }
+        Assertions.assertTrue(keys.contains(1L));
+        Assertions.assertTrue(keys.contains(2L));
+    }
+
+    @Test
+    void testKeySet() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        map.put(10L, "ten");
+        map.put(20L, "twenty");
+        LongSet keys = map.keySet();
+        Assertions.assertEquals(2, keys.size());
+        Assertions.assertTrue(keys.contains(10L));
+        Assertions.assertTrue(keys.contains(20L));
+    }
+
+    @Test
+    void testValues() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        map.put(1L, "one");
+        map.put(2L, "two");
+        ObjectCollection<String> values = map.values();
+        Assertions.assertEquals(2, values.size());
+        Assertions.assertTrue(values.contains("one"));
+        Assertions.assertTrue(values.contains("two"));
+    }
+
+    @Test
+    void testStream() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        for (long i = 0; i < 100; i++) {
+            map.put(i, "val-" + i);
+        }
+        long count = map.values().stream().filter(v -> 
v.startsWith("val-")).count();
+        Assertions.assertEquals(100, count);
+    }
+
+    @Test
+    void testForEach() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        map.put(1L, "one");
+        map.put(2L, "two");
+        Map<Long, String> collected = new HashMap<>();
+        map.forEach(collected::put);
+        Assertions.assertEquals(2, collected.size());
+        Assertions.assertEquals("one", collected.get(1L));
+    }
+
+    @Test
+    void testNullValues() {
+        ConcurrentLong2ObjectHashMap<String> map = new 
ConcurrentLong2ObjectHashMap<>();
+        map.put(1L, null);
+        Assertions.assertTrue(map.containsKey(1L));
+        Assertions.assertNull(map.get(1L));

Review Comment:
   This test (and the current implementation) allows storing null values, but 
`ConcurrentHashMap` forbids nulls. Since the PR describes these maps as drop-in 
replacements for `ConcurrentHashMap<Long, V>`, consider rejecting null values 
(throw NPE on `put`/`putIfAbsent`/`replace`/`compute*` results) and 
update/remove this test, or update the docs to explicitly state that null 
values are supported and semantics differ from `ConcurrentHashMap`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to