This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 611bae8082f branch-4.1: [opt](cloud) cache cluster id per query and
drop redundant locks on getBackendId hot path #63636 (#64276)
611bae8082f is described below
commit 611bae8082fed5f55710d630e8dd7876310c46e9
Author: Xin Liao <[email protected]>
AuthorDate: Wed Jun 10 14:28:12 2026 +0800
branch-4.1: [opt](cloud) cache cluster id per query and drop redundant
locks on getBackendId hot path #63636 (#64276)
Pick apache/doris#63636
---
.../apache/doris/cloud/catalog/CloudReplica.java | 125 +++++----------------
.../apache/doris/cloud/catalog/CloudTablet.java | 15 ++-
.../doris/cloud/system/CloudSystemInfoService.java | 93 ++++++++++++++-
.../org/apache/doris/planner/OlapScanNode.java | 26 ++++-
.../org/apache/doris/planner/OlapTableSink.java | 17 ++-
.../apache/doris/service/FrontendServiceImpl.java | 35 +++++-
.../cloud/system/CloudSystemInfoServiceTest.java | 15 +++
7 files changed, 212 insertions(+), 114 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
index 22f2464e48d..07a2f416635 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
@@ -17,20 +17,17 @@
package org.apache.doris.cloud.catalog;
-import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Replica;
-import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.qe.ComputeGroupException;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.Config;
-import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.persist.gson.GsonPostProcessable;
-import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
import com.google.common.base.Strings;
import com.google.common.hash.HashCode;
@@ -38,7 +35,6 @@ import com.google.common.hash.Hashing;
import com.google.gson.annotations.SerializedName;
import lombok.Getter;
import lombok.Setter;
-import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -170,20 +166,25 @@ public class CloudReplica extends Replica implements
GsonPostProcessable {
@Override
public long getBackendId() throws ComputeGroupException {
- return getBackendIdImpl(getCurrentClusterId());
+ return getBackendIdImpl(cloudInfoService().getCurrentClusterId());
}
- public long getBackendId(String beEndpoint) {
- String clusterName = ((CloudSystemInfoService)
Env.getCurrentSystemInfo()).getClusterNameByBeAddr(beEndpoint);
- String physicalClusterName = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
- .getPhysicalCluster(clusterName);
+ // Variant for callers that have already resolved the cluster id once per
request
+ // and want to skip the per-replica
ConnectContext/priv/status/autoStart/existence pipeline.
+ public long getBackendIdWithClusterId(String clusterId) throws
ComputeGroupException {
+ return getBackendIdImpl(clusterId);
+ }
+ public long getBackendId(String beEndpoint) {
try {
- String clusterId = getCloudClusterIdByName(physicalClusterName);
+ CloudSystemInfoService infoService = cloudInfoService();
+ String clusterName =
infoService.getClusterNameByBeAddr(beEndpoint);
+ String physicalClusterName =
infoService.getPhysicalCluster(clusterName);
+ String clusterId =
infoService.resolveClusterIdByName(physicalClusterName);
return getBackendIdImpl(clusterId);
} catch (ComputeGroupException e) {
if (LOG.isDebugEnabled()) {
- LOG.debug("failed to get compute group name {}",
physicalClusterName, e);
+ LOG.debug("failed to get compute group name for endpoint {}",
beEndpoint, e);
}
return -1;
}
@@ -192,7 +193,7 @@ public class CloudReplica extends Replica implements
GsonPostProcessable {
public long getPrimaryBackendId() {
String clusterId;
try {
- clusterId = getCurrentClusterId();
+ clusterId = cloudInfoService().getCurrentClusterId();
} catch (ComputeGroupException e) {
return -1L;
}
@@ -204,6 +205,19 @@ public class CloudReplica extends Replica implements
GsonPostProcessable {
return getClusterPrimaryBackendId(clusterId);
}
+ // Returns the CloudSystemInfoService instance, or throws
ComputeGroupException when
+ // Env was set up with a base SystemInfoService (typically a unit test
that mocks the
+ // base type). Production cloud-mode FE never hits the throw branch.
+ private static CloudSystemInfoService cloudInfoService() throws
ComputeGroupException {
+ SystemInfoService info = Env.getCurrentSystemInfo();
+ if (info instanceof CloudSystemInfoService) {
+ return (CloudSystemInfoService) info;
+ }
+ throw new ComputeGroupException(
+ "current system info service is not cloud-aware",
+ ComputeGroupException.FailedTypeEnum.CONNECT_CONTEXT_NOT_SET);
+ }
+
public long getClusterPrimaryBackendId(String clusterId) {
if (isColocated()) {
try {
@@ -216,91 +230,6 @@ public class CloudReplica extends Replica implements
GsonPostProcessable {
return primaryClusterToBackend.getOrDefault(clusterId, -1L);
}
- private String getCurrentClusterId() throws ComputeGroupException {
- // Not in a connect session
- String cluster = null;
- ConnectContext context = ConnectContext.get();
- if (context != null) {
- // TODO(wb) rethinking whether should update err status.
- cluster = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
- .getPhysicalCluster(context.getCloudCluster());
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("get compute group by context {}", cluster);
- }
-
- UserIdentity currentUid = context.getCurrentUserIdentity();
- if (currentUid != null &&
!StringUtils.isEmpty(currentUid.getQualifiedUser())) {
- try {
- ((CloudEnv)
Env.getCurrentEnv()).checkCloudClusterPriv(cluster);
- } catch (Exception e) {
- LOG.warn("check compute group {} for {} auth failed.",
cluster,
- context.getCurrentUserIdentity().toString());
- throw new ComputeGroupException(
- String.format("context compute group %s check auth
failed, user is %s",
- cluster,
context.getCurrentUserIdentity().toString()),
-
ComputeGroupException.FailedTypeEnum.CURRENT_USER_NO_AUTH_TO_USE_DEFAULT_COMPUTE_GROUP);
- }
- } else {
- LOG.info("connect context user is null.");
- throw new ComputeGroupException("connect context's user is
null",
-
ComputeGroupException.FailedTypeEnum.CURRENT_USER_NO_AUTH_TO_USE_DEFAULT_COMPUTE_GROUP);
- }
-
- String clusterStatus = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
- .getCloudStatusByName(cluster);
- if (!Strings.isNullOrEmpty(clusterStatus)
- && Cloud.ClusterStatus.valueOf(clusterStatus)
- == Cloud.ClusterStatus.MANUAL_SHUTDOWN) {
- LOG.warn("auto start compute group {} in manual shutdown
status", cluster);
- throw new ComputeGroupException(
- String.format("The current compute group %s has been
manually shutdown", cluster),
-
ComputeGroupException.FailedTypeEnum.CURRENT_COMPUTE_GROUP_BEEN_MANUAL_SHUTDOWN);
- }
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("connect context is null in getBackendId");
- }
- throw new ComputeGroupException("connect context not set cluster ",
-
ComputeGroupException.FailedTypeEnum.CONNECT_CONTEXT_NOT_SET);
- }
-
- return getCloudClusterIdByName(cluster);
- }
-
- private String getCloudClusterIdByName(String cluster) throws
ComputeGroupException {
- // if cluster is SUSPENDED, wait
- String wakeUPCluster = "";
- try {
- wakeUPCluster = ((CloudSystemInfoService)
Env.getCurrentSystemInfo()).waitForAutoStart(cluster);
- } catch (DdlException e) {
- // this function cant throw exception. so just log it
- LOG.warn("cant resume compute group {}, exception", cluster, e);
- }
- if (!Strings.isNullOrEmpty(wakeUPCluster) &&
!cluster.equals(wakeUPCluster)) {
- cluster = wakeUPCluster;
- LOG.warn("get backend input compute group {} useless, so auto
start choose a new one compute group {}",
- cluster, wakeUPCluster);
- }
- // check default compute group valid.
- if (Strings.isNullOrEmpty(cluster)) {
- LOG.warn("failed to get available be, clusterName: {}", cluster);
- throw new ComputeGroupException("compute group name is empty",
-
ComputeGroupException.FailedTypeEnum.CONNECT_CONTEXT_NOT_SET_COMPUTE_GROUP);
- }
- boolean exist = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
- .getCloudClusterNames().contains(cluster);
- if (!exist) {
- // can't use this default compute group, plz change another
- LOG.warn("compute group: {} is not existed", cluster);
- throw new ComputeGroupException(
- String.format("The current compute group %s is not registered
in the system", cluster),
-
ComputeGroupException.FailedTypeEnum.CURRENT_COMPUTE_GROUP_NOT_EXIST);
- }
-
- return ((CloudSystemInfoService)
Env.getCurrentSystemInfo()).getCloudClusterIdByName(cluster);
- }
-
private long getBackendIdImpl(String clusterId) throws
ComputeGroupException {
if (Strings.isNullOrEmpty(clusterId)) {
return -1L;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTablet.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTablet.java
index c1a3ed71290..541570b079e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTablet.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletSlidingWindowAccessStats;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.UserException;
import org.apache.doris.persist.gson.GsonPostProcessable;
@@ -74,7 +75,19 @@ public class CloudTablet extends Tablet implements
GsonPostProcessable {
@Override
public Multimap<Long, Long> getNormalReplicaBackendPathMap() throws
UserException {
- Multimap<Long, Long> pathMap = super.getNormalReplicaBackendPathMap();
+ // Per-tablet entry point: resolves cluster id here for callers that
don't hoist it.
+ // High-tablet-count callers should resolve once and pass the cluster
id in via
+ // getNormalReplicaBackendPathMapByClusterId to amortize the
resolution.
+ String clusterId = ((CloudSystemInfoService)
Env.getCurrentSystemInfo()).getCurrentClusterId();
+ return getNormalReplicaBackendPathMapByClusterId(clusterId);
+ }
+
+ // Cluster id is supplied by the caller (resolved lazily once per request),
+ // bypassing the per-replica
ConnectContext/priv/status/autoStart/existence pipeline.
+ public Multimap<Long, Long>
getNormalReplicaBackendPathMapByClusterId(String clusterId) throws
UserException {
+ TabletSlidingWindowAccessStats.recordTablet(getId());
+ Multimap<Long, Long> pathMap =
super.getNormalReplicaBackendPathMapImpl(null,
+ (rep, be) -> ((CloudReplica)
rep).getBackendIdWithClusterId(clusterId));
return backendPathMapReprocess(pathMap);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
index 182aa9676a3..a60d5d46e3f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
@@ -19,6 +19,7 @@ package org.apache.doris.cloud.system;
import org.apache.doris.analysis.ModifyBackendClause;
import org.apache.doris.analysis.ModifyBackendHostNameClause;
+import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.cloud.catalog.CloudEnv;
@@ -120,18 +121,100 @@ public class CloudSystemInfoService extends
SystemInfoService {
}
public ComputeGroup getComputeGroupByName(String computeGroupName) {
- LOG.debug("get id {} computeGroupIdToComputeGroup : {} ",
computeGroupName, computeGroupIdToComputeGroup);
+ // rlock guards the compound name->id->group lookup: writers
(add/remove/rename)
+ // update both maps under wlock, and the read must observe a
consistent snapshot
+ // so callers like getPhysicalCluster don't transiently see a virtual
group name
+ // with a null group and fall back to treating it as a physical
cluster.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("get id {} computeGroupIdToComputeGroup : {} ",
computeGroupName, computeGroupIdToComputeGroup);
+ }
try {
rlock.lock();
- if (!clusterNameToId.containsKey(computeGroupName)) {
- return null;
- }
- return
computeGroupIdToComputeGroup.get(clusterNameToId.get(computeGroupName));
+ String id = clusterNameToId.get(computeGroupName);
+ return id == null ? null : computeGroupIdToComputeGroup.get(id);
} finally {
rlock.unlock();
}
}
+ public boolean containsCloudCluster(String clusterName) {
+ return !Strings.isNullOrEmpty(clusterName) &&
clusterNameToId.containsKey(clusterName);
+ }
+
+ // Resolve the cluster id for the current ConnectContext: physical-cluster
lookup,
+ // priv check, status check (reject MANUAL_SHUTDOWN), wait-for-autoStart,
existence
+ // check, finally name->id mapping. The result is identical for every
tablet/replica
+ // within a single request, so hot paths should resolve once and reuse the
cached value.
+ public String getCurrentClusterId() throws ComputeGroupException {
+ ConnectContext context = ConnectContext.get();
+ if (context == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("connect context is null in getCurrentClusterId");
+ }
+ throw new ComputeGroupException("connect context not set cluster ",
+
ComputeGroupException.FailedTypeEnum.CONNECT_CONTEXT_NOT_SET);
+ }
+
+ String cluster = getPhysicalCluster(context.getCloudCluster());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("get compute group by context {}", cluster);
+ }
+
+ UserIdentity currentUid = context.getCurrentUserIdentity();
+ if (currentUid == null ||
Strings.isNullOrEmpty(currentUid.getQualifiedUser())) {
+ LOG.info("connect context user is null.");
+ throw new ComputeGroupException("connect context's user is null",
+
ComputeGroupException.FailedTypeEnum.CURRENT_USER_NO_AUTH_TO_USE_DEFAULT_COMPUTE_GROUP);
+ }
+ try {
+ ((CloudEnv) Env.getCurrentEnv()).checkCloudClusterPriv(cluster);
+ } catch (Exception e) {
+ LOG.warn("check compute group {} for {} auth failed.", cluster,
currentUid);
+ throw new ComputeGroupException(
+ String.format("context compute group %s check auth failed,
user is %s", cluster, currentUid),
+
ComputeGroupException.FailedTypeEnum.CURRENT_USER_NO_AUTH_TO_USE_DEFAULT_COMPUTE_GROUP);
+ }
+
+ String clusterStatus = getCloudStatusByName(cluster);
+ if (!Strings.isNullOrEmpty(clusterStatus)
+ && Cloud.ClusterStatus.valueOf(clusterStatus) ==
Cloud.ClusterStatus.MANUAL_SHUTDOWN) {
+ LOG.warn("auto start compute group {} in manual shutdown status",
cluster);
+ throw new ComputeGroupException(
+ String.format("The current compute group %s has been
manually shutdown", cluster),
+
ComputeGroupException.FailedTypeEnum.CURRENT_COMPUTE_GROUP_BEEN_MANUAL_SHUTDOWN);
+ }
+
+ return resolveClusterIdByName(cluster);
+ }
+
+ // Resolve a known cluster name to its id, handling auto-start (cluster
may resume
+ // under a different name) and validating the cluster is registered.
+ public String resolveClusterIdByName(String cluster) throws
ComputeGroupException {
+ String wakeUPCluster = "";
+ try {
+ wakeUPCluster = waitForAutoStart(cluster);
+ } catch (DdlException e) {
+ LOG.warn("cant resume compute group {}, exception", cluster, e);
+ }
+ if (!Strings.isNullOrEmpty(wakeUPCluster) &&
!cluster.equals(wakeUPCluster)) {
+ cluster = wakeUPCluster;
+ LOG.warn("get backend input compute group {} useless, so auto
start choose a new one compute group {}",
+ cluster, wakeUPCluster);
+ }
+ if (Strings.isNullOrEmpty(cluster)) {
+ LOG.warn("failed to get available be, clusterName: {}", cluster);
+ throw new ComputeGroupException("compute group name is empty",
+
ComputeGroupException.FailedTypeEnum.CONNECT_CONTEXT_NOT_SET_COMPUTE_GROUP);
+ }
+ if (!containsCloudCluster(cluster)) {
+ LOG.warn("compute group: {} is not existed", cluster);
+ throw new ComputeGroupException(
+ String.format("The current compute group %s is not registered
in the system", cluster),
+
ComputeGroupException.FailedTypeEnum.CURRENT_COMPUTE_GROUP_NOT_EXIST);
+ }
+ return getCloudClusterIdByName(cluster);
+ }
+
public ComputeGroup getComputeGroupById(String computeGroupId) {
try {
rlock.lock();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 860d77d43dc..bf674dedcc9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -31,6 +31,7 @@ import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.DistributionInfo;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.Index;
import org.apache.doris.catalog.MaterializedIndex;
@@ -43,7 +44,9 @@ import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Tablet;
+import org.apache.doris.cloud.catalog.CloudReplica;
import org.apache.doris.cloud.qe.ComputeGroupException;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
@@ -515,6 +518,8 @@ public class OlapScanNode extends ScanNode {
ImmutableMap<Long, Backend> allBackends =
olapTable.getAllBackendsByAllCluster();
long partitionVisibleVersion = visibleVersion;
String partitionVisibleVersionStr = fastToString(visibleVersion);
+ // Lazy: resolved on the first CloudReplica that needs it.
+ String cachedClusterId = null;
for (Tablet tablet : tablets) {
long tabletId = tablet.getId();
long tabletVisibleVersion = partitionVisibleVersion;
@@ -583,7 +588,16 @@ public class OlapScanNode extends ScanNode {
replicas.sort(Replica.ID_COMPARATOR);
Replica replica = replicas.get(useFixReplica >=
replicas.size() ? replicas.size() - 1 : useFixReplica);
if
(context.getSessionVariable().fallbackOtherReplicaWhenFixedCorrupt) {
- long beId = replica.getBackendId();
+ long beId;
+ if (replica instanceof CloudReplica) {
+ if (cachedClusterId == null) {
+ cachedClusterId = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
+ .getCurrentClusterId();
+ }
+ beId = ((CloudReplica)
replica).getBackendIdWithClusterId(cachedClusterId);
+ } else {
+ beId = replica.getBackendId();
+ }
Backend backend = allBackends.get(beId);
// If the fixed replica is bad, then not clear the
replicas using random replica
if (backend == null || !backend.isAlive()) {
@@ -637,7 +651,15 @@ public class OlapScanNode extends ScanNode {
Backend backend = null;
long backendId = -1;
try {
- backendId = replica.getBackendId();
+ if (replica instanceof CloudReplica) {
+ if (cachedClusterId == null) {
+ cachedClusterId = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
+ .getCurrentClusterId();
+ }
+ backendId = ((CloudReplica)
replica).getBackendIdWithClusterId(cachedClusterId);
+ } else {
+ backendId = replica.getBackendId();
+ }
backend = allBackends.get(backendId);
} catch (ComputeGroupException e) {
LOG.warn("failed to get backend {} for replica {}",
backendId, replica.getId(), e);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index 8520249dd97..1f9ba83b37b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -43,7 +43,9 @@ import org.apache.doris.catalog.RandomDistributionInfo;
import org.apache.doris.catalog.RangePartitionItem;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Tablet;
+import org.apache.doris.cloud.catalog.CloudTablet;
import org.apache.doris.cloud.qe.ComputeGroupException;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
@@ -749,6 +751,8 @@ public class OlapTableSink extends DataSink {
TOlapTableLocationParam slaveLocationParam = new
TOlapTableLocationParam();
// BE id -> path hash
Multimap<Long, Long> allBePathsMap = HashMultimap.create();
+ // Lazy: resolved on the first CloudTablet that needs it.
+ String cachedClusterId = null;
for (long partitionId : partitionIds) {
Partition partition = table.getPartition(partitionId);
int loadRequiredReplicaNum =
table.getLoadRequiredReplicaNum(partition.getId());
@@ -759,7 +763,16 @@ public class OlapTableSink extends DataSink {
StringBuilder errMsgBuilder = new StringBuilder();
Multimap<Long, Long> bePathsMap = HashMultimap.create();
try {
- bePathsMap = tablet.getNormalReplicaBackendPathMap();
+ if (tablet instanceof CloudTablet) {
+ if (cachedClusterId == null) {
+ cachedClusterId = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
+ .getCurrentClusterId();
+ }
+ bePathsMap = ((CloudTablet) tablet)
+
.getNormalReplicaBackendPathMapByClusterId(cachedClusterId);
+ } else {
+ bePathsMap =
tablet.getNormalReplicaBackendPathMap();
+ }
if (bePathsMap.keySet().size() <
loadRequiredReplicaNum) {
errMsgBuilder.append("tablet
").append(tablet.getId())
.append(" alive replica num
").append(bePathsMap.keySet().size())
@@ -785,7 +798,7 @@ public class OlapTableSink extends DataSink {
} catch (ComputeGroupException e) {
LOG.warn("failed to get replica backend path for
tablet " + tablet.getId(), e);
errMsgBuilder.append(", ").append(e.toString());
- throw new
UserException(InternalErrorCode.INTERNAL_ERR, errMsgBuilder.toString());
+ throw new
UserException(InternalErrorCode.INTERNAL_ERR, errMsgBuilder.toString(), e);
}
if (!Config.isCloudMode()) {
debugWriteRandomChooseSink(tablet,
partition.getVisibleVersion(), bePathsMap);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 3957eec4474..f749cc2b0f8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -3889,6 +3889,9 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
List<TTabletLocation> tablets = new ArrayList<>();
List<TTabletLocation> slaveTablets = new ArrayList<>();
List<TOlapTablePartition> partitions = Lists.newArrayList();
+ final boolean hasBeEndpoint = request.isSetBeEndpoint();
+ // Lazy: resolved on the first CloudTablet that needs it (skipped on
cache-hit).
+ String cachedClusterId = null;
for (String partitionName : addPartitionClauseMap.keySet()) {
Partition partition = table.getPartition(partitionName);
// For thread safety, we preserve the tablet distribution
information of each partition
@@ -3932,9 +3935,17 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
// BE id -> path hash
Multimap<Long, Long> bePathsMap;
try {
- if (Config.isCloudMode() && request.isSetBeEndpoint())
{
- bePathsMap = ((CloudTablet) tablet)
-
.getNormalReplicaBackendPathMap(request.be_endpoint);
+ if (tablet instanceof CloudTablet) {
+ CloudTablet cloudTablet = (CloudTablet) tablet;
+ if (hasBeEndpoint) {
+ bePathsMap =
cloudTablet.getNormalReplicaBackendPathMap(request.be_endpoint);
+ } else {
+ if (cachedClusterId == null) {
+ cachedClusterId =
((CloudSystemInfoService) Env.getCurrentSystemInfo())
+ .getCurrentClusterId();
+ }
+ bePathsMap =
cloudTablet.getNormalReplicaBackendPathMapByClusterId(cachedClusterId);
+ }
} else {
bePathsMap =
tablet.getNormalReplicaBackendPathMap();
}
@@ -4206,6 +4217,9 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
List<TTabletLocation> tablets = new ArrayList<>();
List<TTabletLocation> slaveTablets = new ArrayList<>();
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
+ final boolean replaceHasBeEndpoint = request.isSetBeEndpoint();
+ // Lazy: resolved on the first CloudTablet that needs it.
+ String replaceCachedClusterId = null;
for (long partitionId : resultPartitionIds) {
Partition partition = olapTable.getPartition(partitionId);
// For thread safety, we preserve the tablet distribution
information of each partition
@@ -4251,9 +4265,18 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
// BE id -> path hash
Multimap<Long, Long> bePathsMap;
try {
- if (Config.isCloudMode() && request.isSetBeEndpoint())
{
- bePathsMap = ((CloudTablet) tablet)
-
.getNormalReplicaBackendPathMap(request.be_endpoint);
+ if (tablet instanceof CloudTablet) {
+ CloudTablet cloudTablet = (CloudTablet) tablet;
+ if (replaceHasBeEndpoint) {
+ bePathsMap =
cloudTablet.getNormalReplicaBackendPathMap(request.be_endpoint);
+ } else {
+ if (replaceCachedClusterId == null) {
+ replaceCachedClusterId =
((CloudSystemInfoService) Env.getCurrentSystemInfo())
+ .getCurrentClusterId();
+ }
+ bePathsMap = cloudTablet
+
.getNormalReplicaBackendPathMapByClusterId(replaceCachedClusterId);
+ }
} else {
bePathsMap =
tablet.getNormalReplicaBackendPathMap();
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
index 85bd4677a0c..11b288dfc2a 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
@@ -973,6 +973,21 @@ public class CloudSystemInfoServiceTest {
}
}
+ @Test
+ public void testContainsCloudCluster() {
+ infoService = new CloudSystemInfoService();
+ // Empty / null inputs short-circuit without touching the map.
+ Assert.assertFalse(infoService.containsCloudCluster(null));
+ Assert.assertFalse(infoService.containsCloudCluster(""));
+ // Unknown cluster name -> false.
+ Assert.assertFalse(infoService.containsCloudCluster("absent_cluster"));
+ // Register a cluster; lookup must hit.
+ infoService.addVirtualClusterInfoToMapsNoLock("cid_1", "cluster_1");
+ Assert.assertTrue(infoService.containsCloudCluster("cluster_1"));
+ // Different name in same map -> still false.
+ Assert.assertFalse(infoService.containsCloudCluster("cluster_2"));
+ }
+
/**
* Helper method to create a test ConnectContext with specific cluster name
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]