This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch feature/cloud-replica-memory-opt in repository https://gitbox.apache.org/repos/asf/doris.git
commit 7f44a9d89fff117b4455ad7332007697422f2d1a Author: Yongqiang YANG <[email protected]> AuthorDate: Thu Mar 12 06:06:46 2026 -0700 [opt](memory) Lazy-init ConcurrentHashMaps in CloudReplica Replace eager ConcurrentHashMap initialization with null defaults and lazy allocation via double-checked locking. For millions of CloudReplica instances, this saves ~120 bytes per replica when maps are empty. - primaryClusterToBackend: null by default, allocated on first put - secondaryClusterToBackends: null by default, allocated on first put - Add volatile for thread-safe lazy initialization - Add null-safe access patterns throughout all methods - Use initial capacity 2 (vs default 16) for small map optimization Co-Authored-By: Claude Opus 4.6 <[email protected]> --- .../apache/doris/cloud/catalog/CloudReplica.java | 92 +++++++++++++++++----- 1 file changed, 71 insertions(+), 21 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java index afd2fd2501f..b5f119cc803 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java @@ -56,7 +56,7 @@ public class CloudReplica extends Replica implements GsonPostProcessable { @SerializedName(value = "bes") private ConcurrentHashMap<String, List<Long>> primaryClusterToBackends = null; @SerializedName(value = "be") - private ConcurrentHashMap<String, Long> primaryClusterToBackend = new ConcurrentHashMap<>(); + private volatile ConcurrentHashMap<String, Long> primaryClusterToBackend; @SerializedName(value = "dbId") private long dbId = -1; @SerializedName(value = "tableId") @@ -76,8 +76,7 @@ public class CloudReplica extends Replica implements GsonPostProcessable { private Map<String, List<Long>> memClusterToBackends = null; // clusterId, secondaryBe, changeTimestamp - private Map<String, Pair<Long, Long>> secondaryClusterToBackends - = new ConcurrentHashMap<String, Pair<Long, Long>>(); + private volatile Map<String, Pair<Long, Long>> secondaryClusterToBackends; public CloudReplica() { } @@ -99,6 +98,34 @@ public class CloudReplica extends Replica implements GsonPostProcessable { this.idx = idx; } + private ConcurrentHashMap<String, Long> getOrCreatePrimaryMap() { + ConcurrentHashMap<String, Long> map = primaryClusterToBackend; + if (map == null) { + synchronized (this) { + map = primaryClusterToBackend; + if (map == null) { + map = new ConcurrentHashMap<>(2); + primaryClusterToBackend = map; + } + } + } + return map; + } + + private Map<String, Pair<Long, Long>> getOrCreateSecondaryMap() { + Map<String, Pair<Long, Long>> map = secondaryClusterToBackends; + if (map == null) { + synchronized (this) { + map = secondaryClusterToBackends; + if (map == null) { + map = new ConcurrentHashMap<>(2); + secondaryClusterToBackends = map; + } + } + } + return map; + } + private boolean isColocated() { return Env.getCurrentColocateIndex().isColocateTableNoLock(tableId); } @@ -206,7 +233,8 @@ public class CloudReplica extends Replica implements GsonPostProcessable { } } - return primaryClusterToBackend.getOrDefault(clusterId, -1L); + ConcurrentHashMap<String, Long> map = primaryClusterToBackend; + return map != null ? map.getOrDefault(clusterId, -1L) : -1L; } private String getCurrentClusterId() throws ComputeGroupException { @@ -316,8 +344,9 @@ public class CloudReplica extends Replica implements GsonPostProcessable { backendId = memClusterToBackends.get(clusterId).get(indexRand); } - if (!replicaEnough && !allowColdRead && primaryClusterToBackend.containsKey(clusterId)) { - backendId = primaryClusterToBackend.get(clusterId); + ConcurrentHashMap<String, Long> priMap = primaryClusterToBackend; + if (!replicaEnough && !allowColdRead && priMap != null && priMap.containsKey(clusterId)) { + backendId = priMap.get(clusterId); } if (backendId > 0) { @@ -393,7 +422,11 @@ public class CloudReplica extends Replica implements GsonPostProcessable { } public Backend getSecondaryBackend(String clusterId) { - Pair<Long, Long> secondBeAndChangeTimestamp = secondaryClusterToBackends.get(clusterId); + Map<String, Pair<Long, Long>> secMap = secondaryClusterToBackends; + if (secMap == null) { + return null; + } + Pair<Long, Long> secondBeAndChangeTimestamp = secMap.get(clusterId); if (secondBeAndChangeTimestamp == null) { return null; } @@ -575,8 +608,11 @@ public class CloudReplica extends Replica implements GsonPostProcessable { } public void updateClusterToPrimaryBe(String cluster, long beId) { - primaryClusterToBackend.put(cluster, beId); - secondaryClusterToBackends.remove(cluster); + getOrCreatePrimaryMap().put(cluster, beId); + Map<String, Pair<Long, Long>> secMap = secondaryClusterToBackends; + if (secMap != null) { + secMap.remove(cluster); + } } /** @@ -590,19 +626,29 @@ public class CloudReplica extends Replica implements GsonPostProcessable { LOG.debug("add to secondary clusterId {}, beId {}, changeTimestamp {}, replica info {}", cluster, beId, changeTimestamp, this); } - secondaryClusterToBackends.put(cluster, Pair.of(beId, changeTimestamp)); + getOrCreateSecondaryMap().put(cluster, Pair.of(beId, changeTimestamp)); } public void clearClusterToBe(String cluster) { - primaryClusterToBackend.remove(cluster); - secondaryClusterToBackends.remove(cluster); + ConcurrentHashMap<String, Long> priMap = primaryClusterToBackend; + if (priMap != null) { + priMap.remove(cluster); + } + Map<String, Pair<Long, Long>> secMap = secondaryClusterToBackends; + if (secMap != null) { + secMap.remove(cluster); + } } // ATTN: This func is only used by redundant tablet report clean in bes. // Only the master node will do the diff logic, // so just only need to clean up secondaryClusterToBackends on the master node. public void checkAndClearSecondaryClusterToBe(String clusterId, long expireTimestamp) { - Pair<Long, Long> secondBeAndChangeTimestamp = secondaryClusterToBackends.get(clusterId); + Map<String, Pair<Long, Long>> secMap = secondaryClusterToBackends; + if (secMap == null) { + return; + } + Pair<Long, Long> secondBeAndChangeTimestamp = secMap.get(clusterId); if (secondBeAndChangeTimestamp == null) { return; } @@ -612,19 +658,22 @@ public class CloudReplica extends Replica implements GsonPostProcessable { if (changeTimestamp < expireTimestamp) { LOG.debug("remove clusterId {} secondary beId {} changeTimestamp {} expireTimestamp {} replica info {}", clusterId, beId, changeTimestamp, expireTimestamp, this); - secondaryClusterToBackends.remove(clusterId); + secMap.remove(clusterId); return; } } public List<Backend> getAllPrimaryBes() { List<Backend> result = new ArrayList<Backend>(); - primaryClusterToBackend.forEach((clusterId, beId) -> { - if (beId != -1) { - Backend backend = Env.getCurrentSystemInfo().getBackend(beId); - result.add(backend); - } - }); + ConcurrentHashMap<String, Long> map = primaryClusterToBackend; + if (map != null) { + map.forEach((clusterId, beId) -> { + if (beId != -1) { + Backend backend = Env.getCurrentSystemInfo().getBackend(beId); + result.add(backend); + } + }); + } return result; } @@ -655,11 +704,12 @@ public class CloudReplica extends Replica implements GsonPostProcessable { this.getId(), this.primaryClusterToBackends, this.primaryClusterToBackend); } if (primaryClusterToBackends != null) { + ConcurrentHashMap<String, Long> map = getOrCreatePrimaryMap(); for (Map.Entry<String, List<Long>> entry : primaryClusterToBackends.entrySet()) { String clusterId = entry.getKey(); List<Long> beIds = entry.getValue(); if (beIds != null && !beIds.isEmpty()) { - primaryClusterToBackend.put(clusterId, beIds.get(0)); + map.put(clusterId, beIds.get(0)); } } this.primaryClusterToBackends = null; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
