This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch opt/cloud-replica-central-index in repository https://gitbox.apache.org/repos/asf/doris.git
commit bd6991b4b4cf51df2c215a12579c76cc331fc3e3 Author: Yongqiang YANG <[email protected]> AuthorDate: Mon Mar 30 22:12:53 2026 -0700 [opt](cloud) Restore memClusterToBackends cache and add post-serialize cleanup - Restore per-replica memClusterToBackends transient cache for multi-replica hash assignments (not serialized, should not have been removed) - Add GsonPreProcessable.gsonPostSerialize() hook to null out primaryClusterToBackend after checkpoint serialization, preventing memory leak where transient HashMap survives after write Co-Authored-By: Claude Opus 4.6 <[email protected]> --- .../doris/persist/gson/GsonPreProcessable.java | 3 +++ .../apache/doris/cloud/catalog/CloudReplica.java | 31 +++++++++++++++++++++- .../org/apache/doris/persist/gson/GsonUtils.java | 3 +++ 3 files changed, 36 insertions(+), 1 deletion(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/persist/gson/GsonPreProcessable.java b/fe/fe-common/src/main/java/org/apache/doris/persist/gson/GsonPreProcessable.java index 19a8450e62e..5a255bb47a6 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/persist/gson/GsonPreProcessable.java +++ b/fe/fe-common/src/main/java/org/apache/doris/persist/gson/GsonPreProcessable.java @@ -21,4 +21,7 @@ import java.io.IOException; public interface GsonPreProcessable { public void gsonPreProcess() throws IOException; + + /** Called after serialization completes, to clean up any transient state set by gsonPreProcess(). */ + default void gsonPostSerialize() throws IOException {} } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java index e2ab84cae12..557d54f4461 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java @@ -94,6 +94,8 @@ public class CloudReplica extends Replica implements GsonPostProcessable, GsonPr @SerializedName(value = "rsc") private long rowsetCount = 1L; // [0-1] rowset + private Map<String, List<Long>> memClusterToBackends = null; + private static final Random rand = new Random(); public CloudReplica() { @@ -329,8 +331,16 @@ public class CloudReplica extends Replica implements GsonPostProcessable, GsonPr int coldReadRand = rand.nextInt(100); boolean allowColdRead = coldReadRand < Config.cloud_cold_read_percent; + initMemClusterToBackends(); + boolean replicaEnough = memClusterToBackends.get(clusterId) != null + && memClusterToBackends.get(clusterId).size() > indexRand; + long backendId = -1; - if (!allowColdRead) { + if (replicaEnough) { + backendId = memClusterToBackends.get(clusterId).get(indexRand); + } + + if (!replicaEnough && !allowColdRead) { long primaryBe = getCloudInvertedIndex().getPrimaryBeId(clusterId, getId()); if (primaryBe != -1L) { backendId = primaryBe; @@ -484,6 +494,17 @@ public class CloudReplica extends Replica implements GsonPostProcessable, GsonPr return (hashValue % beNum + beNum) % beNum; } + private void initMemClusterToBackends() { + // the enable_cloud_multi_replica is not used now + if (memClusterToBackends == null) { + synchronized (this) { + if (memClusterToBackends == null) { + memClusterToBackends = new ConcurrentHashMap<>(); + } + } + } + } + private List<Long> hashReplicaToBes(String clusterId, boolean isBackGround, int replicaNum) throws ComputeGroupException { // TODO(luwei) list should be sorted @@ -543,9 +564,12 @@ public class CloudReplica extends Replica implements GsonPostProcessable, GsonPr LOG.info("picked beId {}, replicaId {}, partId {}, beNum {}, replicaIdx {}, picked Index {}, hashVal {}", pickedBeId, getId(), partitionId, availableBes.size(), idx, index, hashCode == null ? -1 : hashCode.asLong()); + // save to memClusterToBackends map bes.add(pickedBeId); } + memClusterToBackends.put(clusterId, bes); + return bes; } @@ -663,6 +687,11 @@ public class CloudReplica extends Replica implements GsonPostProcessable, GsonPr this.primaryClusterToBackend = snapshot.isEmpty() ? null : new HashMap<>(snapshot); } + @Override + public void gsonPostSerialize() throws IOException { + this.primaryClusterToBackend = null; + } + @Override public void gsonPostProcess() throws IOException { if (LOG.isDebugEnabled()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index 67c0a1fcb53..fa0c50ac1bd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -932,6 +932,9 @@ public class GsonUtils { ((GsonPreProcessable) value).gsonPreProcess(); } delegate.write(out, value); + if (value instanceof GsonPreProcessable) { + ((GsonPreProcessable) value).gsonPostSerialize(); + } } public T read(JsonReader reader) throws IOException { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
