This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 67c79a70658 [fix](cloud tvf) Fix tvf query run in cloud multi cluster (#37157) 67c79a70658 is described below commit 67c79a70658b675130773fde6bcfd3fa5a490b43 Author: deardeng <565620...@qq.com> AuthorDate: Tue Jul 30 21:22:48 2024 +0800 [fix](cloud tvf) Fix tvf query run in cloud multi cluster (#37157) Fix ``` mysql> select * from numbers("number" = "10"); ERROR 1105 (HY000): errCode = 2, detailMessage = There is no scanNode Backend available.[10002: not exist] ``` --- .../doris/alter/AlterLightSchChangeHelper.java | 5 +- .../java/org/apache/doris/alter/SystemHandler.java | 10 +- .../analysis/AdminCancelRebalanceDiskStmt.java | 5 +- .../apache/doris/analysis/AdminCleanTrashStmt.java | 5 +- .../doris/analysis/AdminRebalanceDiskStmt.java | 11 +- .../apache/doris/analysis/ShowTrashDiskStmt.java | 4 +- .../org/apache/doris/analysis/ShowTrashStmt.java | 4 +- .../main/java/org/apache/doris/catalog/Env.java | 4 +- .../org/apache/doris/catalog/StorageVaultMgr.java | 13 +- .../org/apache/doris/catalog/TabletStatMgr.java | 9 +- .../org/apache/doris/clone/TabletScheduler.java | 8 +- .../doris/cloud/system/CloudSystemInfoService.java | 16 +-- .../apache/doris/common/proc/ReplicasProcNode.java | 5 +- .../apache/doris/common/proc/TabletsProcDir.java | 9 +- .../org/apache/doris/common/proc/TrashProcDir.java | 8 +- .../common/publish/ClusterStatePublisher.java | 140 --------------------- .../doris/common/publish/TopicPublisherThread.java | 8 +- .../apache/doris/common/util/AutoBucketUtils.java | 17 ++- .../doris/datasource/FederationBackendPolicy.java | 6 +- .../apache/doris/datasource/FileQueryScanNode.java | 2 +- .../doris/datasource/jdbc/JdbcExternalCatalog.java | 11 +- .../doris/httpv2/rest/manager/NodeAction.java | 2 +- .../doris/httpv2/restv2/StatisticAction.java | 17 ++- .../org/apache/doris/load/GroupCommitManager.java | 10 +- .../org/apache/doris/load/StreamLoadRecordMgr.java | 13 +- .../doris/metric/SimpleCoreMetricVisitor.java | 15 ++- .../BackendDistributedPlanWorkerManager.java | 2 +- .../planner/BackendPartitionedSchemaScanNode.java | 8 +- .../java/org/apache/doris/policy/PolicyMgr.java | 2 +- .../main/java/org/apache/doris/qe/Coordinator.java | 2 +- .../apache/doris/qe/InsertStreamTxnExecutor.java | 2 +- .../org/apache/doris/qe/QueryCancelWorker.java | 13 +- .../java/org/apache/doris/qe/ShowExecutor.java | 2 +- .../java/org/apache/doris/qe/StmtExecutor.java | 5 +- .../apache/doris/qe/cache/CacheCoordinator.java | 6 +- .../apache/doris/resource/AdmissionControl.java | 10 +- .../apache/doris/service/FrontendServiceImpl.java | 2 +- .../java/org/apache/doris/system/HeartbeatMgr.java | 10 +- .../org/apache/doris/system/SystemInfoService.java | 97 ++++++-------- .../tablefunction/DataGenTableValuedFunction.java | 4 +- .../ExternalFileTableValuedFunction.java | 13 +- .../tablefunction/NumbersTableValuedFunction.java | 2 +- .../java/org/apache/doris/alter/AlterTest.java | 2 +- .../analysis/AdminCancelRebalanceDiskStmtTest.java | 8 +- .../org/apache/doris/analysis/CopyIntoTest.java | 2 +- .../CreateTableElasticOnStorageMediumTest.java | 4 +- .../doris/catalog/DynamicPartitionTableTest.java | 5 +- .../apache/doris/catalog/ModifyBackendTest.java | 8 +- .../doris/clone/AddReplicaChoseMediumTest.java | 2 +- .../org/apache/doris/clone/BalanceStatistic.java | 5 +- .../ColocateTableCheckerAndBalancerPerfTest.java | 2 +- .../org/apache/doris/clone/DecommissionTest.java | 4 +- .../doris/clone/DiskReblanceWhenSchedulerIdle.java | 2 +- .../doris/clone/TabletRepairAndBalanceTest.java | 2 +- .../doris/clone/TabletReplicaTooSlowTest.java | 2 +- .../doris/cluster/DecommissionBackendTest.java | 32 ++--- .../doris/cluster/SystemInfoServiceTest.java | 2 +- .../doris/common/util/AutoBucketUtilsTest.java | 7 +- .../doris/load/sync/canal/CanalSyncDataTest.java | 2 +- .../doris/planner/FederationBackendPolicyTest.java | 6 +- .../apache/doris/planner/ResourceTagQueryTest.java | 2 +- .../doris/utframe/DemoMultiBackendsTest.java | 6 +- .../org/apache/doris/regression/suite/Suite.groovy | 1 - .../suites/cloud_p0/multi_cluster/test_tvf.groovy | 86 +++++++++++++ 64 files changed, 403 insertions(+), 326 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterLightSchChangeHelper.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterLightSchChangeHelper.java index 70a322a8c5f..29130934817 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterLightSchChangeHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterLightSchChangeHelper.java @@ -27,6 +27,7 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Tablet; import org.apache.doris.common.Pair; +import org.apache.doris.common.UserException; import org.apache.doris.persist.AlterLightSchemaChangeInfo; import org.apache.doris.proto.InternalService.PFetchColIdsRequest; import org.apache.doris.proto.InternalService.PFetchColIdsRequest.Builder; @@ -156,14 +157,14 @@ public class AlterLightSchChangeHelper { Map<Long, Future<PFetchColIdsResponse>> beIdToRespFuture = new HashMap<>(); try { for (Long beId : beIdToRequest.keySet()) { - final Backend backend = Env.getCurrentSystemInfo().getIdToBackend().get(beId); + final Backend backend = Env.getCurrentSystemInfo().getAllBackendsByAllCluster().get(beId); final TNetworkAddress address = new TNetworkAddress(Objects.requireNonNull(backend).getHost(), backend.getBrpcPort()); final Future<PFetchColIdsResponse> responseFuture = BackendServiceProxy.getInstance() .getColumnIdsByTabletIds(address, beIdToRequest.get(beId)); beIdToRespFuture.put(beId, responseFuture); } - } catch (RpcException e) { + } catch (RpcException | UserException e) { throw new IllegalStateException("fetch columnIds RPC failed", e); } // wait for and get results diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java index 26842806483..e0909088f8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java @@ -288,7 +288,15 @@ public class SystemHandler extends AlterHandler { Set<Tag> decommissionTags = decommissionBackends.stream().map(be -> be.getLocationTag()) .collect(Collectors.toSet()); Map<Tag, Integer> tagAvailBackendNums = Maps.newHashMap(); - for (Backend backend : Env.getCurrentSystemInfo().getAllBackends()) { + List<Backend> bes; + try { + bes = Env.getCurrentSystemInfo().getBackendsByCurrentCluster().values().asList(); + } catch (UserException e) { + LOG.warn("Failed to get current cluster backend by current cluster.", e); + return; + } + + for (Backend backend : bes) { long beId = backend.getId(); if (!backend.isScheduleAvailable() || decommissionBackends.stream().anyMatch(be -> be.getId() == beId)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmt.java index 648e7ab47f1..874a1af1368 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmt.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; import org.apache.doris.common.util.NetUtils; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; @@ -36,8 +37,8 @@ import java.util.Map; public class AdminCancelRebalanceDiskStmt extends DdlStmt { private List<Backend> backends = Lists.newArrayList(); - public AdminCancelRebalanceDiskStmt(List<String> backends) { - ImmutableMap<Long, Backend> backendsInfo = Env.getCurrentSystemInfo().getIdToBackend(); + public AdminCancelRebalanceDiskStmt(List<String> backends) throws UserException { + ImmutableMap<Long, Backend> backendsInfo = Env.getCurrentSystemInfo().getAllBackendsByAllCluster(); Map<String, Long> backendsID = new HashMap<String, Long>(); for (Backend backend : backendsInfo.values()) { backendsID.put(NetUtils.getHostPortInAccessibleFormat(backend.getHost(), backend.getHeartbeatPort()), diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCleanTrashStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCleanTrashStmt.java index 64f1cccf5b6..b6d5cc2ce19 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCleanTrashStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCleanTrashStmt.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; import org.apache.doris.common.util.NetUtils; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; @@ -36,8 +37,8 @@ import java.util.Map; public class AdminCleanTrashStmt extends DdlStmt { private List<Backend> backends = Lists.newArrayList(); - public AdminCleanTrashStmt(List<String> backends) { - ImmutableMap<Long, Backend> backendsInfo = Env.getCurrentSystemInfo().getIdToBackend(); + public AdminCleanTrashStmt(List<String> backends) throws UserException { + ImmutableMap<Long, Backend> backendsInfo = Env.getCurrentSystemInfo().getAllBackendsByAllCluster(); Map<String, Long> backendsID = new HashMap<String, Long>(); for (Backend backend : backendsInfo.values()) { backendsID.put( diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminRebalanceDiskStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminRebalanceDiskStmt.java index 69f230f33b6..aa48a9a1fc1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminRebalanceDiskStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminRebalanceDiskStmt.java @@ -28,17 +28,26 @@ import org.apache.doris.system.Backend; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.HashMap; import java.util.List; import java.util.Map; public class AdminRebalanceDiskStmt extends DdlStmt { + private static final Logger LOG = LogManager.getLogger(AdminRebalanceDiskStmt.class); private List<Backend> backends = Lists.newArrayList(); private long timeoutS = 0; public AdminRebalanceDiskStmt(List<String> backends) { - ImmutableMap<Long, Backend> backendsInfo = Env.getCurrentSystemInfo().getIdToBackend(); + ImmutableMap<Long, Backend> backendsInfo; + try { + backendsInfo = Env.getCurrentSystemInfo().getAllBackendsByAllCluster(); + } catch (AnalysisException e) { + LOG.warn("failed to get backends,", e); + return; + } Map<String, Long> backendsID = new HashMap<String, Long>(); for (Backend backend : backendsInfo.values()) { backendsID.put( diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTrashDiskStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTrashDiskStmt.java index cdd3243dfc2..f5fad57d0f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTrashDiskStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTrashDiskStmt.java @@ -36,8 +36,8 @@ public class ShowTrashDiskStmt extends ShowStmt { private Backend backend; - public ShowTrashDiskStmt(String backendQuery) { - ImmutableMap<Long, Backend> backendsInfo = Env.getCurrentSystemInfo().getIdToBackend(); + public ShowTrashDiskStmt(String backendQuery) throws AnalysisException { + ImmutableMap<Long, Backend> backendsInfo = Env.getCurrentSystemInfo().getAllBackendsByAllCluster(); for (Backend backend : backendsInfo.values()) { String backendStr = NetUtils.getHostPortInAccessibleFormat(backend.getHost(), backend.getHeartbeatPort()); if (backendQuery.equals(backendStr)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTrashStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTrashStmt.java index c69980504d9..3071a657c53 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTrashStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTrashStmt.java @@ -37,8 +37,8 @@ import java.util.List; public class ShowTrashStmt extends ShowStmt { private List<Backend> backends = Lists.newArrayList(); - public ShowTrashStmt() { - ImmutableMap<Long, Backend> backendsInfo = Env.getCurrentSystemInfo().getIdToBackend(); + public ShowTrashStmt() throws AnalysisException { + ImmutableMap<Long, Backend> backendsInfo = Env.getCurrentSystemInfo().getAllBackendsByAllCluster(); for (Backend backend : backendsInfo.values()) { this.backends.add(backend); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index cc7d0065e8b..99132500672 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -6154,8 +6154,8 @@ public class Env { AgentTaskExecutor.submit(batchTask); } - public void cleanUDFCacheTask(DropFunctionStmt stmt) { - ImmutableMap<Long, Backend> backendsInfo = Env.getCurrentSystemInfo().getIdToBackend(); + public void cleanUDFCacheTask(DropFunctionStmt stmt) throws UserException { + ImmutableMap<Long, Backend> backendsInfo = Env.getCurrentSystemInfo().getAllBackendsByAllCluster(); String functionSignature = stmt.signatureString(); AgentBatchTask batchTask = new AgentBatchTask(); for (Backend backend : backendsInfo.values()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVaultMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVaultMgr.java index d2b78109f02..2bec2839c86 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVaultMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVaultMgr.java @@ -25,10 +25,12 @@ import org.apache.doris.cloud.proto.Cloud.AlterObjStoreInfoRequest.Operation; import org.apache.doris.cloud.rpc.MetaServiceProxy; import org.apache.doris.common.DdlException; import org.apache.doris.common.Pair; +import org.apache.doris.common.UserException; import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.doris.proto.InternalService.PAlterVaultSyncRequest; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; +import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TNetworkAddress; @@ -37,6 +39,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -228,7 +231,15 @@ public class StorageVaultMgr { } private void alterSyncVaultTask() { - systemInfoService.getAllBackends().forEach(backend -> { + List<Backend> bes; + try { + // get system all backends + bes = systemInfoService.getAllBackendsByAllCluster().values().asList(); + } catch (UserException e) { + LOG.warn("failed to get current cluster backends: {}", e); + return; + } + bes.forEach(backend -> { TNetworkAddress address = backend.getBrpcAddress(); try { BackendServiceProxy.getInstance().alterVaultSync(address, PAlterVaultSyncRequest.newBuilder().build()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java index bf24d2bb390..030dc4d7a0e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java @@ -18,6 +18,7 @@ package org.apache.doris.catalog; import org.apache.doris.catalog.MaterializedIndex.IndexExtState; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ClientPool; import org.apache.doris.common.Config; import org.apache.doris.common.util.MasterDaemon; @@ -51,7 +52,13 @@ public class TabletStatMgr extends MasterDaemon { @Override protected void runAfterCatalogReady() { - ImmutableMap<Long, Backend> backends = Env.getCurrentSystemInfo().getIdToBackend(); + ImmutableMap<Long, Backend> backends; + try { + backends = Env.getCurrentSystemInfo().getAllBackendsByAllCluster(); + } catch (AnalysisException e) { + LOG.warn("can't get backends info", e); + return; + } long start = System.currentTimeMillis(); taskPool.submit(() -> { // no need to get tablet stat if backend is not alive diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index df0fb2309b3..96be72914ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -180,7 +180,13 @@ public class TabletScheduler extends MasterDaemon { * update working slots at the beginning of each round */ private boolean updateWorkingSlots() { - ImmutableMap<Long, Backend> backends = infoService.getAllBackendsMap(); + ImmutableMap<Long, Backend> backends; + try { + backends = infoService.getAllBackendsByAllCluster(); + } catch (AnalysisException e) { + LOG.warn("failed to get backends with current cluster", e); + return false; + } for (Backend backend : backends.values()) { if (!backend.hasPathHash() && backend.isAlive()) { // when upgrading, backend may not get path info yet. so return false and wait for next round. diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java index bf6ed760874..48728efb003 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java @@ -24,6 +24,7 @@ import org.apache.doris.cloud.proto.Cloud; import org.apache.doris.cloud.proto.Cloud.ClusterPB; import org.apache.doris.cloud.proto.Cloud.InstanceInfoPB; import org.apache.doris.cloud.rpc.MetaServiceProxy; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; @@ -368,25 +369,18 @@ public class CloudSystemInfoService extends SystemInfoService { } @Override - public List<Backend> getBackendsByCurrentCluster() throws UserException { + public ImmutableMap<Long, Backend> getBackendsByCurrentCluster() throws AnalysisException { ConnectContext ctx = ConnectContext.get(); if (ctx == null) { - throw new UserException("connect context is null"); + throw new AnalysisException("connect context is null"); } String cluster = ctx.getCurrentCloudCluster(); if (Strings.isNullOrEmpty(cluster)) { - throw new UserException("cluster name is empty"); + throw new AnalysisException("cluster name is empty"); } - //((CloudEnv) Env.getCurrentEnv()).checkCloudClusterPriv(cluster); - - return getBackendsByClusterName(cluster); - } - - @Override - public ImmutableMap<Long, Backend> getBackendsWithIdByCurrentCluster() throws UserException { - List<Backend> backends = getBackendsByCurrentCluster(); + List<Backend> backends = getBackendsByClusterName(cluster); Map<Long, Backend> idToBackend = Maps.newHashMap(); for (Backend be : backends) { idToBackend.put(be.getId(), be); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java index edf2a9d3517..8f38b34a2b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.TabletMeta; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.util.NetUtils; import org.apache.doris.common.util.TimeUtils; @@ -57,8 +58,8 @@ public class ReplicasProcNode implements ProcNodeInterface { } @Override - public ProcResult fetchResult() { - ImmutableMap<Long, Backend> backendMap = Env.getCurrentSystemInfo().getIdToBackend(); + public ProcResult fetchResult() throws AnalysisException { + ImmutableMap<Long, Backend> backendMap = Env.getCurrentSystemInfo().getAllBackendsByAllCluster(); BaseProcResult result = new BaseProcResult(); result.setNames(TITLE_NAMES); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java index 46c89eb3253..12c1adf71d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java @@ -65,10 +65,11 @@ public class TabletsProcDir implements ProcDirInterface { this.index = index; } - public List<List<Comparable>> fetchComparableResult(long version, long backendId, Replica.ReplicaState state) { + public List<List<Comparable>> fetchComparableResult(long version, long backendId, Replica.ReplicaState state) + throws AnalysisException { Preconditions.checkNotNull(table); Preconditions.checkNotNull(index); - ImmutableMap<Long, Backend> backendMap = Env.getCurrentSystemInfo().getIdToBackend(); + ImmutableMap<Long, Backend> backendMap = Env.getCurrentSystemInfo().getAllBackendsByAllCluster(); List<List<Comparable>> tabletInfos = new ArrayList<List<Comparable>>(); Map<Long, String> pathHashToRoot = new HashMap<>(); @@ -179,12 +180,12 @@ public class TabletsProcDir implements ProcDirInterface { return tabletInfos; } - private List<List<Comparable>> fetchComparableResult() { + private List<List<Comparable>> fetchComparableResult() throws AnalysisException { return fetchComparableResult(-1, -1, null); } @Override - public ProcResult fetchResult() { + public ProcResult fetchResult() throws AnalysisException { List<List<Comparable>> tabletInfos = fetchComparableResult(); // sort by tabletId, replicaId ListComparator<List<Comparable>> comparator = new ListComparator<List<Comparable>>(0, 1); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TrashProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TrashProcDir.java index 18605b37c0e..37e5e345181 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TrashProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TrashProcDir.java @@ -50,7 +50,13 @@ public class TrashProcDir implements ProcDirInterface { private List<Backend> backends = Lists.newArrayList(); public TrashProcDir() { - ImmutableMap<Long, Backend> backendsInfo = Env.getCurrentSystemInfo().getIdToBackend(); + ImmutableMap<Long, Backend> backendsInfo; + try { + backendsInfo = Env.getCurrentSystemInfo().getAllBackendsByAllCluster(); + } catch (AnalysisException e) { + LOG.warn("Can't get backends info", e); + return; + } for (Backend backend : backendsInfo.values()) { this.backends.add(backend); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/publish/ClusterStatePublisher.java b/fe/fe-core/src/main/java/org/apache/doris/common/publish/ClusterStatePublisher.java deleted file mode 100644 index 70a3721d822..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/common/publish/ClusterStatePublisher.java +++ /dev/null @@ -1,140 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.common.publish; - -import org.apache.doris.catalog.Env; -import org.apache.doris.common.ClientPool; -import org.apache.doris.common.ThreadPoolManager; -import org.apache.doris.system.Backend; -import org.apache.doris.system.SystemInfoService; -import org.apache.doris.thrift.BackendService; -import org.apache.doris.thrift.TAgentPublishRequest; -import org.apache.doris.thrift.TAgentResult; -import org.apache.doris.thrift.TNetworkAddress; -import org.apache.doris.thrift.TStatusCode; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.thrift.TException; - -import java.util.Arrays; -import java.util.Collection; -import java.util.concurrent.ExecutorService; - -// This class intend to publish the state of cluster to backends. -public class ClusterStatePublisher { - private static final Logger LOG = LogManager.getLogger(ClusterStatePublisher.class); - private static volatile ClusterStatePublisher INSTANCE; - - private ExecutorService executor = ThreadPoolManager - .newDaemonFixedThreadPool(5, 256, "cluster-state-publisher", true); - - private SystemInfoService clusterInfoService; - - // Use public for unit test easily. - public ClusterStatePublisher(SystemInfoService clusterInfoService) { - this.clusterInfoService = clusterInfoService; - } - - public static ClusterStatePublisher getInstance() { - if (INSTANCE == null) { - synchronized (ClusterStatePublisher.class) { - if (INSTANCE == null) { - INSTANCE = new ClusterStatePublisher(Env.getCurrentSystemInfo()); - } - } - } - return INSTANCE; - } - - public void publish(ClusterStateUpdate state, Listener listener, int timeoutMs) { - Collection<Backend> nodesToPublish = clusterInfoService.getIdToBackend().values(); - AckResponseHandler handler = new AckResponseHandler(nodesToPublish, listener); - for (Backend node : nodesToPublish) { - executor.submit(new PublishWorker(state, node, handler)); - } - try { - if (!handler.awaitAllInMs(timeoutMs)) { - Backend[] backends = handler.pendingNodes(); - if (backends.length > 0) { - LOG.warn("timed out waiting for all nodes to publish. (pending nodes: {})", - Arrays.toString(backends)); - } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - public class PublishWorker implements Runnable { - private ClusterStateUpdate stateUpdate; - private Backend node; - private ResponseHandler handler; - - public PublishWorker(ClusterStateUpdate stateUpdate, Backend node, ResponseHandler handler) { - this.stateUpdate = stateUpdate; - this.node = node; - this.handler = handler; - } - - @Override - public void run() { - // Here to publish all worker - TNetworkAddress addr = new TNetworkAddress(node.getHost(), node.getBePort()); - BackendService.Client client = null; - try { - client = ClientPool.backendPool.borrowObject(addr); - } catch (Exception e) { - LOG.warn("Fetch a agent client failed. backend=[{}] reason=[{}]", addr, e); - handler.onFailure(node, e); - return; - } - try { - TAgentPublishRequest request = stateUpdate.toThrift(); - TAgentResult tAgentResult = null; - try { - tAgentResult = client.publishClusterState(request); - } catch (TException e) { - // Ok, lets try another time - if (!ClientPool.backendPool.reopen(client)) { - // Failed another time, throw this - throw e; - } - tAgentResult = client.publishClusterState(request); - } - if (tAgentResult.getStatus().getStatusCode() != TStatusCode.OK) { - // Success execute, no dirty data possibility - LOG.warn("Backend execute publish failed. backend=[{}], message=[{}]", - addr, tAgentResult.getStatus().getErrorMsgs()); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Success publish to backend([{}])", addr); - } - // Publish here - handler.onResponse(node); - } catch (TException e) { - LOG.warn("A thrift exception happened when publish to a backend. backend=[{}], reason=[{}]", addr, e); - handler.onFailure(node, e); - ClientPool.backendPool.invalidateObject(addr, client); - client = null; - } finally { - ClientPool.backendPool.returnObject(addr, client); - } - } - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java index dde45e44e29..f9fdc808498 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java @@ -73,7 +73,13 @@ public class TopicPublisherThread extends MasterDaemon { // because it may means workload group/policy is dropped // step 2: publish topic info to all be - Collection<Backend> nodesToPublish = clusterInfoService.getIdToBackend().values(); + Collection<Backend> nodesToPublish; + try { + nodesToPublish = clusterInfoService.getAllBackendsByAllCluster().values(); + } catch (Exception e) { + LOG.warn("get backends failed", e); + return; + } AckResponseHandler handler = new AckResponseHandler(nodesToPublish); for (Backend be : nodesToPublish) { executor.submit(new TopicPublishWorker(request, be, handler)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/AutoBucketUtils.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/AutoBucketUtils.java index f9291c4cea8..19c4c4bf369 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/AutoBucketUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/AutoBucketUtils.java @@ -20,6 +20,7 @@ package org.apache.doris.common.util; import org.apache.doris.catalog.DiskInfo; import org.apache.doris.catalog.DiskInfo.DiskState; import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; @@ -37,7 +38,13 @@ public class AutoBucketUtils { private static int getBENum() { SystemInfoService infoService = Env.getCurrentSystemInfo(); - ImmutableMap<Long, Backend> backends = infoService.getAllBackendsMap(); + ImmutableMap<Long, Backend> backends; + try { + backends = infoService.getAllBackendsByAllCluster(); + } catch (AnalysisException e) { + logger.warn("failed to get backends with current cluster", e); + return 0; + } int activeBENum = 0; for (Backend backend : backends.values()) { @@ -50,7 +57,13 @@ public class AutoBucketUtils { private static int getBucketsNumByBEDisks() { SystemInfoService infoService = Env.getCurrentSystemInfo(); - ImmutableMap<Long, Backend> backends = infoService.getAllBackendsMap(); + ImmutableMap<Long, Backend> backends; + try { + backends = infoService.getAllBackendsByAllCluster(); + } catch (AnalysisException e) { + logger.warn("failed to get backends with current cluster", e); + return 0; + } int buckets = 0; for (Backend backend : backends.values()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java index 7938fba4d28..a2b902fd744 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java @@ -184,9 +184,11 @@ public class FederationBackendPolicy { } public void init(BeSelectionPolicy policy) throws UserException { - backends.addAll(policy.getCandidateBackends(Env.getCurrentSystemInfo().getBackendsByCurrentCluster())); + backends.addAll(policy.getCandidateBackends(Env.getCurrentSystemInfo() + .getBackendsByCurrentCluster().values().asList())); if (backends.isEmpty()) { - throw new UserException("No available backends"); + throw new UserException("No available backends, " + + "in cloud maybe this cluster has been dropped, please `use @otherClusterName` switch it"); } for (Backend backend : backends) { assignedWeightPerBackend.put(backend, 0L); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index df3fbca56d4..97fff0cf1a2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -284,7 +284,7 @@ public abstract class FileQueryScanNode extends FileScanNode { TScanRangeLocation location = new TScanRangeLocation(); long backendId = ConnectContext.get().getBackendId(); - Backend backend = Env.getCurrentSystemInfo().getIdToBackend().get(backendId); + Backend backend = Env.getCurrentSystemInfo().getBackendsByCurrentCluster().get(backendId); location.setBackendId(backendId); location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort())); curLocations.addToLocations(location); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java index 568c9e8480c..32da83e1b37 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.JdbcResource; import org.apache.doris.catalog.JdbcTable; import org.apache.doris.catalog.TableIf.TableType; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.datasource.CatalogProperty; @@ -346,10 +347,14 @@ public class JdbcExternalCatalog extends ExternalCatalog { private void testBeToJdbcConnection() throws DdlException { Backend aliveBe = null; - for (Backend be : Env.getCurrentSystemInfo().getIdToBackend().values()) { - if (be.isAlive()) { - aliveBe = be; + try { + for (Backend be : Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values()) { + if (be.isAlive()) { + aliveBe = be; + } } + } catch (AnalysisException e) { + throw new DdlException(e.getMessage()); } if (aliveBe == null) { throw new DdlException("Test BE Connection to JDBC Failed: No Alive backends"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java index 15d8e51e2c7..814b00b49ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java @@ -627,7 +627,7 @@ public class NodeAction extends RestBaseController { } else if ("DROP".equals(action)) { currentSystemInfo.dropBackends(hostInfos); } else if ("DECOMMISSION".equals(action)) { - ImmutableMap<Long, Backend> backendsInCluster = currentSystemInfo.getAllBackendsMap(); + ImmutableMap<Long, Backend> backendsInCluster = currentSystemInfo.getAllBackendsByAllCluster(); backendsInCluster.forEach((k, v) -> { hostInfos.stream() .filter(h -> v.getHost().equals(h.getHost()) && v.getHeartbeatPort() == h.getPort()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/StatisticAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/StatisticAction.java index 1c0c2d8f817..dcc847275c8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/StatisticAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/StatisticAction.java @@ -19,6 +19,7 @@ package org.apache.doris.httpv2.restv2; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; +import org.apache.doris.common.UserException; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.httpv2.entity.ResponseEntityBuilder; import org.apache.doris.httpv2.rest.RestBaseController; @@ -80,7 +81,13 @@ public class StatisticAction extends RestBaseController { private long getDiskOccupancy(SystemInfoService infoService) { long diskOccupancy = 0; - List<Backend> backends = infoService.getAllBackends(); + List<Backend> backends; + try { + backends = infoService.getAllBackendsByAllCluster().values().asList(); + } catch (UserException e) { + LOG.warn("failed to get backends by current cluster", e); + return 0; + } for (Backend be : backends) { diskOccupancy += be.getDataUsedCapacityB(); } @@ -89,7 +96,13 @@ public class StatisticAction extends RestBaseController { private long getRemainDisk(SystemInfoService infoService) { long remainDisk = 0; - List<Backend> backends = infoService.getAllBackends(); + List<Backend> backends; + try { + backends = infoService.getAllBackendsByAllCluster().values().asList(); + } catch (UserException e) { + LOG.warn("failed to get backends by current cluster", e); + return 0; + } for (Backend be : backends) { remainDisk += be.getAvailableCapacityB(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java index ac365f9166f..20f7b9ed9be 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java @@ -20,6 +20,7 @@ package org.apache.doris.load; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; import org.apache.doris.cloud.system.CloudSystemInfoService; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; @@ -279,7 +280,14 @@ public class GroupCommitManager { return cachedBackendId; } - List<Backend> backends = new ArrayList<>((Env.getCurrentSystemInfo()).getAllBackends()); + List<Backend> backends = new ArrayList<>(); + try { + backends = new ArrayList<>(Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList()); + } catch (AnalysisException e) { + LOG.warn("failed to get backends by all cluster", e); + throw new LoadException(e.getMessage()); + } + if (backends.isEmpty()) { throw new LoadException("No alive backend"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java index 6c53f354af8..1e1dc192cee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java @@ -20,6 +20,7 @@ package org.apache.doris.load; import org.apache.doris.analysis.ShowStreamLoadStmt.StreamLoadState; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ClientPool; import org.apache.doris.common.Config; import org.apache.doris.common.UserException; @@ -243,7 +244,13 @@ public class StreamLoadRecordMgr extends MasterDaemon { @Override protected void runAfterCatalogReady() { - ImmutableMap<Long, Backend> backends = Env.getCurrentSystemInfo().getIdToBackend(); + ImmutableMap<Long, Backend> backends; + try { + backends = Env.getCurrentSystemInfo().getAllBackendsByAllCluster(); + } catch (AnalysisException e) { + LOG.warn("Failed to load backends from system info", e); + return; + } long start = System.currentTimeMillis(); int pullRecordSize = 0; Map<Long, Long> beIdToLastStreamLoad = Maps.newHashMap(); @@ -354,8 +361,8 @@ public class StreamLoadRecordMgr extends MasterDaemon { } } - public void replayFetchStreamLoadRecord(FetchStreamLoadRecord fetchStreamLoadRecord) { - ImmutableMap<Long, Backend> backends = Env.getCurrentSystemInfo().getIdToBackend(); + public void replayFetchStreamLoadRecord(FetchStreamLoadRecord fetchStreamLoadRecord) throws AnalysisException { + ImmutableMap<Long, Backend> backends = Env.getCurrentSystemInfo().getAllBackendsByAllCluster(); Map<Long, Long> beIdToLastStreamLoad = fetchStreamLoadRecord.getBeIdToLastStreamLoad(); for (Backend backend : backends.values()) { if (beIdToLastStreamLoad.containsKey(backend.getId())) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/SimpleCoreMetricVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/metric/SimpleCoreMetricVisitor.java index 162828d37cd..07cd8353df6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/SimpleCoreMetricVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/SimpleCoreMetricVisitor.java @@ -18,6 +18,7 @@ package org.apache.doris.metric; import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; import org.apache.doris.monitor.jvm.JvmStats; import org.apache.doris.monitor.jvm.JvmStats.MemoryPool; import org.apache.doris.monitor.jvm.JvmStats.Threads; @@ -26,6 +27,8 @@ import com.codahale.metrics.Histogram; import com.codahale.metrics.Snapshot; import com.google.common.base.Joiner; import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.Iterator; import java.util.Map; @@ -38,7 +41,7 @@ import java.util.Map; * query_latency_ms_75 LONG 2 */ public class SimpleCoreMetricVisitor extends MetricVisitor { - + private static final Logger LOG = LogManager.getLogger(SimpleCoreMetricVisitor.class); private static final String TYPE_LONG = "LONG"; private static final String TYPE_DOUBLE = "DOUBLE"; @@ -128,8 +131,14 @@ public class SimpleCoreMetricVisitor extends MetricVisitor { @Override public void visitNodeInfo() { long feDeadNum = Env.getCurrentEnv().getFrontends(null).stream().filter(f -> !f.isAlive()).count(); - long beDeadNum = Env.getCurrentSystemInfo().getIdToBackend().values().stream().filter(b -> !b.isAlive()) - .count(); + long beDeadNum = 0; + try { + beDeadNum = Env.getCurrentSystemInfo().getAllBackendsByAllCluster() + .values().stream().filter(b -> !b.isAlive()) + .count(); + } catch (AnalysisException e) { + LOG.warn("failed get backend, ", e); + } long brokerDeadNum = Env.getCurrentEnv().getBrokerMgr().getAllBrokers().stream().filter(b -> !b.isAlive) .count(); sb.append("doris_fe_frontend_dead_num").append(" ").append(feDeadNum).append("\n"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendDistributedPlanWorkerManager.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendDistributedPlanWorkerManager.java index 190d6d898a9..7acbe653e98 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendDistributedPlanWorkerManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendDistributedPlanWorkerManager.java @@ -32,7 +32,7 @@ import java.util.function.Supplier; public class BackendDistributedPlanWorkerManager implements DistributedPlanWorkerManager { private final Supplier<ImmutableMap<Long, Backend>> backends = Suppliers.memoize(() -> { try { - return Env.getCurrentSystemInfo().getBackendsWithIdByCurrentCluster(); + return Env.getCurrentSystemInfo().getAllBackendsByAllCluster(); } catch (Exception t) { throw new NereidsException("Can not get backends: " + t, t); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java index 803afe05d8d..ab2798e2ba7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java @@ -120,7 +120,7 @@ public class BackendPartitionedSchemaScanNode extends SchemaScanNode { scanRangeLocations = new ArrayList<>(); for (Long partitionID : selectedPartitionIds) { Long backendId = partitionIDToBackendID.get(partitionID); - Backend be = Env.getCurrentSystemInfo().getIdToBackend().get(backendId); + Backend be = Env.getCurrentSystemInfo().getBackendsByCurrentCluster().get(backendId); if (!be.isAlive()) { throw new AnalysisException("backend " + be.getId() + " is not alive."); } @@ -134,7 +134,7 @@ public class BackendPartitionedSchemaScanNode extends SchemaScanNode { } } - private void computePartitionInfo() throws AnalysisException { + private void computePartitionInfo() throws UserException { List<Column> partitionColumns = new ArrayList<>(); for (SlotDescriptor slotDesc : desc.getSlots()) { if (BEACKEND_ID_COLUMN_SET.contains(slotDesc.getColumn().getName().toLowerCase())) { @@ -155,11 +155,11 @@ public class BackendPartitionedSchemaScanNode extends SchemaScanNode { * @param partitionColumns The Columns we want to create partitionInfo * @throws AnalysisException */ - private void createPartitionInfo(List<Column> partitionColumns) throws AnalysisException { + private void createPartitionInfo(List<Column> partitionColumns) throws UserException { backendPartitionInfo = new PartitionInfo(PartitionType.LIST, partitionColumns); partitionIDToBackendID = new HashMap<>(); long partitionID = 0; - for (Backend be : Env.getCurrentSystemInfo().getIdToBackend().values()) { + for (Backend be : Env.getCurrentSystemInfo().getBackendsByCurrentCluster().values()) { if (be.isAlive()) { // create partition key PartitionKey partitionKey = new PartitionKey(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java index 361a84f9fe8..6e8bd4f08cb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java @@ -731,7 +731,7 @@ public class PolicyMgr implements Writable { // log alter Env.getCurrentEnv().getEditLog().logAlterStoragePolicy(storagePolicy); AgentBatchTask batchTask = new AgentBatchTask(); - for (long backendId : Env.getCurrentSystemInfo().getIdToBackend().keySet()) { + for (long backendId : Env.getCurrentSystemInfo().getAllBackendsByAllCluster().keySet()) { PushStoragePolicyTask pushStoragePolicyTask = new PushStoragePolicyTask(backendId, Collections.singletonList(storagePolicy), Collections.emptyList(), Collections.emptyList()); batchTask.addTask(pushStoragePolicyTask); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index b07e2d7304c..4ee46d3bec3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -565,7 +565,7 @@ public class Coordinator implements CoordInterface { currentConnectFE = coordAddress; } - this.idToBackend = Env.getCurrentSystemInfo().getBackendsWithIdByCurrentCluster(); + this.idToBackend = Env.getCurrentSystemInfo().getBackendsByCurrentCluster(); if (LOG.isDebugEnabled()) { int backendNum = idToBackend.size(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java index e25323440bb..3cba759abc5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java @@ -110,7 +110,7 @@ public class InsertStreamTxnExecutor { throw new UserException("No available backend to match the policy: " + policy); } - Backend backend = Env.getCurrentSystemInfo().getIdToBackend().get(beIds.get(0)); + Backend backend = Env.getCurrentSystemInfo().getBackendsByCurrentCluster().get(beIds.get(0)); txnConf.setUserIp(backend.getHost()); txnEntry.setBackend(backend); TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryCancelWorker.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryCancelWorker.java index 7bb02b0c6c8..1edde03203e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryCancelWorker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryCancelWorker.java @@ -22,9 +22,14 @@ import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.util.List; + public class QueryCancelWorker extends MasterDaemon { + private static final Logger LOG = LogManager.getLogger(QueryCancelWorker.class); private SystemInfoService systemInfoService; public QueryCancelWorker(SystemInfoService systemInfoService) { @@ -33,7 +38,13 @@ public class QueryCancelWorker extends MasterDaemon { @Override protected void runAfterCatalogReady() { - List<Backend> allBackends = systemInfoService.getAllBackends(); + List<Backend> allBackends; + try { + allBackends = systemInfoService.getAllBackendsByAllCluster().values().asList(); + } catch (Exception e) { + LOG.warn("failed to get backends by current cluster", e); + return; + } for (Coordinator co : QeProcessorImpl.INSTANCE.getAllCoordinators()) { Status status = co.shouldCancel(allBackends); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index 50a21fe4e3a..7340f2affcc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -2824,7 +2824,7 @@ public class ShowExecutor { private void handleAdminShowTabletStorageFormat() throws AnalysisException { List<List<String>> resultRowSet = Lists.newArrayList(); - for (Backend be : Env.getCurrentSystemInfo().getIdToBackend().values()) { + for (Backend be : Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values()) { if (be.isQueryAvailable() && be.isLoadAvailable()) { AgentClient client = new AgentClient(be.getHost(), be.getBePort()); TCheckStorageFormatResult result = client.checkStorageFormat(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 485f1ae7a8c..adb8ecf61bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1643,7 +1643,8 @@ public class StmtExecutor { throw new UserException(e.getMessage()); } LOG.info("kill query {}", queryId); - Collection<Backend> nodesToPublish = Env.getCurrentSystemInfo().getIdToBackend().values(); + Collection<Backend> nodesToPublish = Env.getCurrentSystemInfo() + .getAllBackendsByAllCluster().values(); for (Backend be : nodesToPublish) { if (be.isAlive()) { try { @@ -2084,7 +2085,7 @@ public class StmtExecutor { // 4. get BE TNetworkAddress address = null; - for (Backend be : Env.getCurrentSystemInfo().getIdToBackend().values()) { + for (Backend be : Env.getCurrentSystemInfo().getBackendsByCurrentCluster().values()) { if (be.isAlive()) { address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java index 529454b9fa3..11fc547e6b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java @@ -18,6 +18,7 @@ package org.apache.doris.qe.cache; import org.apache.doris.catalog.Env; +import org.apache.doris.common.UserException; import org.apache.doris.proto.Types; import org.apache.doris.qe.SimpleScheduler; import org.apache.doris.system.Backend; @@ -110,7 +111,7 @@ public class CacheCoordinator { } try { belock.lock(); - ImmutableMap<Long, Backend> idToBackend = Env.getCurrentSystemInfo().getIdToBackend(); + ImmutableMap<Long, Backend> idToBackend = Env.getCurrentSystemInfo().getAllBackendsByAllCluster(); if (idToBackend != null) { if (!debugModel) { clearBackend(idToBackend); @@ -120,6 +121,9 @@ public class CacheCoordinator { } } this.lastRefreshTime = System.currentTimeMillis(); + } catch (UserException e) { + LOG.warn("cant get backend", e); + throw new RuntimeException(e); } finally { belock.unlock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java b/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java index ad4e9f94c05..1b9e8913c7e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java @@ -17,6 +17,7 @@ package org.apache.doris.resource; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.Status; import org.apache.doris.common.util.MasterDaemon; @@ -31,7 +32,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.ArrayList; -import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @@ -70,7 +70,13 @@ public class AdmissionControl extends MasterDaemon { this.isAllBeMemoryEnough = true; return; } - Collection<Backend> backends = clusterInfoService.getIdToBackend().values(); + List<Backend> backends; + try { + backends = clusterInfoService.getAllBackendsByAllCluster().values().asList(); + } catch (AnalysisException e) { + LOG.warn("get backends failed", e); + throw new RuntimeException(e); + } this.currentMemoryLimit = Config.query_queue_by_be_used_memory; boolean tmpIsAllBeMemoryEnough = true; List<Future<InternalService.PGetBeResourceResponse>> futureList = new ArrayList(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 8bf07e81e27..5f59f4c1a69 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -3887,7 +3887,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { result.setStatus(new TStatus(TStatusCode.OK)); final SystemInfoService systemInfoService = Env.getCurrentSystemInfo(); - List<Backend> backends = systemInfoService.getAllBackends(); + List<Backend> backends = systemInfoService.getAllBackendsByAllCluster().values().asList(); for (Backend backend : backends) { TBackend tBackend = new TBackend(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 6a8008d6cbc..515db76b096 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -23,6 +23,7 @@ import org.apache.doris.common.ClientPool; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.ThreadPoolManager; +import org.apache.doris.common.UserException; import org.apache.doris.common.Version; import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.MasterDaemon; @@ -108,7 +109,14 @@ public class HeartbeatMgr extends MasterDaemon { List<TFrontendInfo> feInfos = Env.getCurrentEnv().getFrontendInfos(); List<Future<HeartbeatResponse>> hbResponses = Lists.newArrayList(); // send backend heartbeat - for (Backend backend : nodeMgr.getIdToBackend().values()) { + List<Backend> bes; + try { + bes = nodeMgr.getAllBackendsByAllCluster().values().asList(); + } catch (UserException e) { + LOG.warn("can not get backends", e); + return; + } + for (Backend backend : bes) { BackendHeartbeatHandler handler = new BackendHeartbeatHandler(backend, feInfos); hbResponses.add(executor.submit(handler)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java index a9b48888e3f..836d516c942 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -199,7 +199,7 @@ public class SystemInfoService { // for test public void addBackend(Backend backend) { - Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef); + Map<Long, Backend> copiedBackends = Maps.newHashMap(getAllClusterBackendsNoException()); copiedBackends.put(backend.getId(), backend); ImmutableMap<Long, Backend> newIdToBackend = ImmutableMap.copyOf(copiedBackends); idToBackendRef = newIdToBackend; @@ -209,7 +209,7 @@ public class SystemInfoService { private void addBackend(String host, int heartbeatPort, Map<String, String> tagMap) { Backend newBackend = new Backend(Env.getCurrentEnv().getNextId(), host, heartbeatPort); // update idToBackend - Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef); + Map<Long, Backend> copiedBackends = Maps.newHashMap(getAllClusterBackendsNoException()); copiedBackends.put(newBackend.getId(), newBackend); ImmutableMap<Long, Backend> newIdToBackend = ImmutableMap.copyOf(copiedBackends); idToBackendRef = newIdToBackend; @@ -271,7 +271,7 @@ public class SystemInfoService { .getHostPortInAccessibleFormat(host, heartbeatPort) + "]"); } // update idToBackend - Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef); + Map<Long, Backend> copiedBackends = Maps.newHashMap(getAllClusterBackendsNoException()); copiedBackends.remove(droppedBackend.getId()); ImmutableMap<Long, Backend> newIdToBackend = ImmutableMap.copyOf(copiedBackends); idToBackendRef = newIdToBackend; @@ -299,27 +299,11 @@ public class SystemInfoService { } public Backend getBackend(long backendId) { - return idToBackendRef.get(backendId); - } - - public boolean checkBackendLoadAvailable(long backendId) { - Backend backend = idToBackendRef.get(backendId); - if (backend == null || !backend.isLoadAvailable()) { - return false; - } - return true; - } - - public boolean checkBackendQueryAvailable(long backendId) { - Backend backend = idToBackendRef.get(backendId); - if (backend == null || !backend.isQueryAvailable()) { - return false; - } - return true; + return getAllClusterBackendsNoException().get(backendId); } public boolean checkBackendScheduleAvailable(long backendId) { - Backend backend = idToBackendRef.get(backendId); + Backend backend = getAllClusterBackendsNoException().get(backendId); if (backend == null || !backend.isScheduleAvailable()) { return false; } @@ -327,7 +311,7 @@ public class SystemInfoService { } public boolean checkBackendAlive(long backendId) { - Backend backend = idToBackendRef.get(backendId); + Backend backend = getAllClusterBackendsNoException().get(backendId); if (backend == null || !backend.isAlive()) { return false; } @@ -335,7 +319,7 @@ public class SystemInfoService { } public Backend getBackendWithHeartbeatPort(String host, int heartPort) { - ImmutableMap<Long, Backend> idToBackend = idToBackendRef; + ImmutableMap<Long, Backend> idToBackend = getAllClusterBackendsNoException(); for (Backend backend : idToBackend.values()) { if (backend.getHost().equals(host) && backend.getHeartbeatPort() == heartPort) { return backend; @@ -345,7 +329,7 @@ public class SystemInfoService { } public Backend getBackendWithBePort(String ip, int bePort) { - ImmutableMap<Long, Backend> idToBackend = idToBackendRef; + ImmutableMap<Long, Backend> idToBackend = getAllClusterBackendsNoException(); for (Backend backend : idToBackend.values()) { if (backend.getHost().equals(ip) && backend.getBePort() == bePort) { return backend; @@ -355,7 +339,7 @@ public class SystemInfoService { } public Backend getBackendWithHttpPort(String ip, int httpPort) { - ImmutableMap<Long, Backend> idToBackend = idToBackendRef; + ImmutableMap<Long, Backend> idToBackend = getAllClusterBackendsNoException(); for (Backend backend : idToBackend.values()) { if (backend.getHost().equals(ip) && backend.getHttpPort() == httpPort) { return backend; @@ -377,7 +361,7 @@ public class SystemInfoService { } public List<Long> getAllBackendIds(boolean needAlive) { - ImmutableMap<Long, Backend> idToBackend = idToBackendRef; + ImmutableMap<Long, Backend> idToBackend = getAllClusterBackendsNoException(); List<Long> backendIds = Lists.newArrayList(idToBackend.keySet()); if (!needAlive) { return backendIds; @@ -394,7 +378,7 @@ public class SystemInfoService { } public List<Long> getDecommissionedBackendIds() { - ImmutableMap<Long, Backend> idToBackend = idToBackendRef; + ImmutableMap<Long, Backend> idToBackend = getAllClusterBackendsNoException(); List<Long> backendIds = Lists.newArrayList(idToBackend.keySet()); Iterator<Long> iter = backendIds.iterator(); @@ -407,22 +391,20 @@ public class SystemInfoService { return backendIds; } - public List<Backend> getAllBackends() { - return Lists.newArrayList(idToBackendRef.values()); - } - public List<Backend> getMixBackends() { - return idToBackendRef.values().stream().filter(backend -> backend.isMixNode()).collect(Collectors.toList()); + return getAllClusterBackendsNoException().values() + .stream().filter(backend -> backend.isMixNode()).collect(Collectors.toList()); } public List<Backend> getCnBackends() { - return idToBackendRef.values().stream().filter(backend -> backend.isComputeNode()).collect(Collectors.toList()); + return getAllClusterBackendsNoException() + .values().stream().filter(Backend::isComputeNode).collect(Collectors.toList()); } // return num of backends that from different hosts public int getStorageBackendNumFromDiffHosts(boolean aliveOnly) { Set<String> hosts = Sets.newHashSet(); - ImmutableMap<Long, Backend> idToBackend = idToBackendRef; + ImmutableMap<Long, Backend> idToBackend = getAllClusterBackendsNoException(); for (Backend backend : idToBackend.values()) { if ((aliveOnly && !backend.isAlive()) || backend.isComputeNode()) { continue; @@ -493,7 +475,7 @@ public class SystemInfoService { TStorageMedium storageMedium, boolean isStorageMediumSpecified, boolean isOnlyForCheck) throws DdlException { - Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef); + Map<Long, Backend> copiedBackends = Maps.newHashMap(getAllClusterBackendsNoException()); Map<Tag, List<Long>> chosenBackendIds = Maps.newHashMap(); Map<Tag, Short> allocMap = replicaAlloc.getAllocMap(); short totalReplicaNum = 0; @@ -572,7 +554,7 @@ public class SystemInfoService { */ public List<Long> selectBackendIdsByPolicy(BeSelectionPolicy policy, int number) { Preconditions.checkArgument(number >= -1); - List<Backend> candidates = policy.getCandidateBackends(idToBackendRef.values()); + List<Backend> candidates = policy.getCandidateBackends(getAllClusterBackendsNoException().values()); if (candidates.size() < number || candidates.isEmpty()) { if (LOG.isDebugEnabled()) { LOG.debug("Not match policy: {}. candidates num: {}, expected: {}", policy, candidates.size(), number); @@ -651,14 +633,6 @@ public class SystemInfoService { } } - public ImmutableMap<Long, Backend> getIdToBackend() { - return idToBackendRef; - } - - public ImmutableMap<Long, Backend> getAllBackendsMap() { - return idToBackendRef; - } - public long getBackendReportVersion(long backendId) { AtomicLong atomicLong = null; if ((atomicLong = idToReportVersionRef.get(backendId)) == null) { @@ -685,7 +659,7 @@ public class SystemInfoService { } public long saveBackends(CountingDataOutputStream dos, long checksum) throws IOException { - ImmutableMap<Long, Backend> idToBackend = idToBackendRef; + ImmutableMap<Long, Backend> idToBackend = getAllClusterBackendsNoException(); int backendCount = idToBackend.size(); checksum ^= backendCount; dos.writeInt(backendCount); @@ -751,7 +725,7 @@ public class SystemInfoService { public void replayAddBackend(Backend newBackend) { // update idToBackend - Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef); + Map<Long, Backend> copiedBackends = Maps.newHashMap(getAllClusterBackendsNoException()); copiedBackends.put(newBackend.getId(), newBackend); ImmutableMap<Long, Backend> newIdToBackend = ImmutableMap.copyOf(copiedBackends); idToBackendRef = newIdToBackend; @@ -768,7 +742,7 @@ public class SystemInfoService { LOG.debug("replayDropBackend: {}", backend); } // update idToBackend - Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef); + Map<Long, Backend> copiedBackends = Maps.newHashMap(getAllClusterBackendsNoException()); copiedBackends.remove(backend.getId()); ImmutableMap<Long, Backend> newIdToBackend = ImmutableMap.copyOf(copiedBackends); idToBackendRef = newIdToBackend; @@ -805,7 +779,7 @@ public class SystemInfoService { private long getAvailableCapacityB() { long capacity = 0L; - for (Backend backend : idToBackendRef.values()) { + for (Backend backend : getAllClusterBackendsNoException().values()) { // Here we do not check if backend is alive, // We suppose the dead backends will back to alive later. if (backend.isDecommissioned()) { @@ -825,12 +799,21 @@ public class SystemInfoService { } } + private ImmutableMap<Long, Backend> getAllClusterBackendsNoException() { + try { + return getAllBackendsByAllCluster(); + } catch (AnalysisException e) { + LOG.warn("getAllClusterBackendsNoException: ", e); + return ImmutableMap.of(); + } + } + /* * Try to randomly get a backend id by given host. * If not found, return -1 */ public long getBackendIdByHost(String host) { - ImmutableMap<Long, Backend> idToBackend = idToBackendRef; + ImmutableMap<Long, Backend> idToBackend = getAllClusterBackendsNoException(); List<Backend> selectedBackends = Lists.newArrayList(); for (Backend backend : idToBackend.values()) { if (backend.getHost().equals(host)) { @@ -995,19 +978,19 @@ public class SystemInfoService { } // CloudSystemInfoService override - public List<Backend> getBackendsByCurrentCluster() throws UserException { - return idToBackendRef.values().stream().collect(Collectors.toList()); + public ImmutableMap<Long, Backend> getBackendsByCurrentCluster() throws AnalysisException { + return idToBackendRef; } - // CloudSystemInfoService override - public ImmutableMap<Long, Backend> getBackendsWithIdByCurrentCluster() throws UserException { - return getIdToBackend(); + // Cloud and NonCloud get all bes + public ImmutableMap<Long, Backend> getAllBackendsByAllCluster() throws AnalysisException { + return idToBackendRef; } public int getMinPipelineExecutorSize() { List<Backend> currentBackends = null; try { - currentBackends = getBackendsByCurrentCluster(); + currentBackends = getAllBackendsByAllCluster().values().asList(); } catch (UserException e) { if (LOG.isDebugEnabled()) { LOG.debug("get current cluster backends failed: ", e); @@ -1026,8 +1009,4 @@ public class SystemInfoService { } return minPipelineExecutorSize; } - - public long aliveBECount() { - return idToBackendRef.values().stream().filter(Backend::isAlive).count(); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java index fc2a6d6dd3e..629b410e676 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java @@ -18,7 +18,7 @@ package org.apache.doris.tablefunction; import org.apache.doris.analysis.TupleDescriptor; -import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; import org.apache.doris.planner.DataGenScanNode; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; @@ -27,7 +27,7 @@ import org.apache.doris.thrift.TDataGenFunctionName; import java.util.List; public abstract class DataGenTableValuedFunction extends TableValuedFunctionIf { - public abstract List<TableValuedFunctionTask> getTasks() throws AnalysisException; + public abstract List<TableValuedFunctionTask> getTasks() throws UserException; public abstract TDataGenFunctionName getDataGenFunctionName(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java index 4c2866eb2b4..f586056dc58 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java @@ -71,6 +71,7 @@ import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TTextSerdeType; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -396,9 +397,17 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio protected Backend getBackend() { // For the http stream task, we should obtain the be for processing the task + ImmutableMap<Long, Backend> beIdToBe; + try { + beIdToBe = Env.getCurrentSystemInfo().getBackendsByCurrentCluster(); + } catch (AnalysisException e) { + LOG.warn("get backend failed, ", e); + return null; + } + if (getTFileType() == TFileType.FILE_STREAM) { long backendId = ConnectContext.get().getBackendId(); - Backend be = Env.getCurrentSystemInfo().getIdToBackend().get(backendId); + Backend be = beIdToBe.get(backendId); if (be == null || !be.isAlive()) { LOG.warn("Backend {} is not alive", backendId); return null; @@ -406,7 +415,7 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio return be; } } - for (Backend be : Env.getCurrentSystemInfo().getIdToBackend().values()) { + for (Backend be : beIdToBe.values()) { if (be.isAlive()) { return be; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/NumbersTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/NumbersTableValuedFunction.java index 8d4a627043e..b40658e4796 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/NumbersTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/NumbersTableValuedFunction.java @@ -116,7 +116,7 @@ public class NumbersTableValuedFunction extends DataGenTableValuedFunction { @Override public List<TableValuedFunctionTask> getTasks() throws AnalysisException { List<Backend> backendList = Lists.newArrayList(); - for (Backend be : Env.getCurrentSystemInfo().getIdToBackend().values()) { + for (Backend be : Env.getCurrentSystemInfo().getBackendsByCurrentCluster().values()) { if (be.isAlive()) { backendList.add(be); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java b/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java index 177522638ea..35e8b6b91e6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java @@ -94,7 +94,7 @@ public class AlterTest { Config.enable_odbc_mysql_broker_table = true; UtFrameUtils.createDorisClusterWithMultiTag(runningDir, 5); - List<Backend> backends = Env.getCurrentSystemInfo().getIdToBackend().values().asList(); + List<Backend> backends = Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList(); Map<String, String> tagMap = Maps.newHashMap(); tagMap.put(Tag.TYPE_LOCATION, "group_a"); diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmtTest.java index aace70b6e3e..dfc0ed04269 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmtTest.java @@ -19,7 +19,7 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.Env; import org.apache.doris.clone.RebalancerTestUtil; -import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; import org.apache.doris.mysql.privilege.AccessControllerManager; import org.apache.doris.mysql.privilege.MockedAuth; import org.apache.doris.qe.ConnectContext; @@ -52,7 +52,7 @@ public class AdminCancelRebalanceDiskStmtTest { } @Test - public void testParticularBackends() throws AnalysisException { + public void testParticularBackends() throws UserException { List<String> backends = Lists.newArrayList( "192.168.0.10003:9051", "192.168.0.10004:9051", "192.168.0.10005:9051", "192.168.0.10006:9051"); final AdminCancelRebalanceDiskStmt stmt = new AdminCancelRebalanceDiskStmt(backends); @@ -61,7 +61,7 @@ public class AdminCancelRebalanceDiskStmtTest { } @Test - public void testEmpty() throws AnalysisException { + public void testEmpty() throws UserException { List<String> backends = Lists.newArrayList(); final AdminCancelRebalanceDiskStmt stmt = new AdminCancelRebalanceDiskStmt(backends); stmt.analyze(analyzer); @@ -69,7 +69,7 @@ public class AdminCancelRebalanceDiskStmtTest { } @Test - public void testNull() throws AnalysisException { + public void testNull() throws UserException { final AdminCancelRebalanceDiskStmt stmt = new AdminCancelRebalanceDiskStmt(null); stmt.analyze(analyzer); Assert.assertEquals(4, stmt.getBackends().size()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CopyIntoTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CopyIntoTest.java index 83328403c41..066bec09dc3 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CopyIntoTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CopyIntoTest.java @@ -247,7 +247,7 @@ public class CopyIntoTest extends TestWithFeService { minTimes = 0; result = stages; - Env.getCurrentSystemInfo().getAllBackendsMap(); + Env.getCurrentSystemInfo().getAllBackendsByAllCluster(); minTimes = 0; result = idToBackendRef; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableElasticOnStorageMediumTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableElasticOnStorageMediumTest.java index 8b09b4b65b5..d9d41e34c0c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableElasticOnStorageMediumTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableElasticOnStorageMediumTest.java @@ -36,7 +36,7 @@ public class CreateTableElasticOnStorageMediumTest extends TestWithFeService { public void setStorageMediumToSSDTest() throws Exception { SystemInfoService clusterInfo = Env.getCurrentEnv().getClusterInfo(); - List<Backend> allBackends = clusterInfo.getAllBackends(); + List<Backend> allBackends = clusterInfo.getAllBackendsByAllCluster().values().asList(); // set all backends' storage medium to SSD for (Backend backend : allBackends) { if (backend.hasPathHash()) { @@ -59,7 +59,7 @@ public class CreateTableElasticOnStorageMediumTest extends TestWithFeService { public void setStorageMediumToHDDTest() throws Exception { SystemInfoService clusterInfo = Env.getCurrentEnv().getClusterInfo(); - List<Backend> allBackends = clusterInfo.getAllBackends(); + List<Backend> allBackends = clusterInfo.getAllBackendsByAllCluster().values().asList(); // set all backends' storage medium to SSD for (Backend backend : allBackends) { if (backend.hasPathHash()) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java index 419707f7cee..79093d6ed4b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java @@ -29,6 +29,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.ExceptionChecker; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; +import org.apache.doris.common.UserException; import org.apache.doris.qe.ConnectContext; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TStorageMedium; @@ -87,8 +88,8 @@ public class DynamicPartitionTableTest { UtFrameUtils.cleanDorisFeDir(runningDir); } - private static void changeBeDisk(TStorageMedium storageMedium) { - List<Backend> backends = Env.getCurrentSystemInfo().getAllBackends(); + private static void changeBeDisk(TStorageMedium storageMedium) throws UserException { + List<Backend> backends = Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList(); for (Backend be : backends) { for (DiskInfo diskInfo : be.getDisks().values()) { diskInfo.setStorageMedium(storageMedium); diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java index e98e6f74545..d83ba15c5bc 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java @@ -68,7 +68,7 @@ public class ModifyBackendTest { @Test public void testModifyBackendTag() throws Exception { SystemInfoService infoService = Env.getCurrentSystemInfo(); - List<Backend> backends = infoService.getAllBackends(); + List<Backend> backends = infoService.getAllBackendsByAllCluster().values().asList(); Assert.assertEquals(1, backends.size()); String beHostPort = backends.get(0).getHost() + ":" + backends.get(0).getHeartbeatPort(); @@ -76,7 +76,7 @@ public class ModifyBackendTest { String stmtStr = "alter system modify backend \"" + beHostPort + "\" set ('tag.location' = 'zone1')"; AlterSystemStmt stmt = (AlterSystemStmt) UtFrameUtils.parseAndAnalyzeStmt(stmtStr, connectContext); DdlExecutor.execute(Env.getCurrentEnv(), stmt); - backends = infoService.getAllBackends(); + backends = infoService.getAllBackendsByAllCluster().values().asList(); Assert.assertEquals(1, backends.size()); // create table @@ -175,13 +175,13 @@ public class ModifyBackendTest { @Test public void testModifyBackendAvailableProperty() throws Exception { SystemInfoService infoService = Env.getCurrentSystemInfo(); - List<Backend> backends = infoService.getAllBackends(); + List<Backend> backends = infoService.getAllBackendsByAllCluster().values().asList(); String beHostPort = backends.get(0).getHost() + ":" + backends.get(0).getHeartbeatPort(); // modify backend available property String stmtStr = "alter system modify backend \"" + beHostPort + "\" set ('disable_query' = 'true', 'disable_load' = 'true')"; AlterSystemStmt stmt = (AlterSystemStmt) UtFrameUtils.parseAndAnalyzeStmt(stmtStr, connectContext); DdlExecutor.execute(Env.getCurrentEnv(), stmt); - Backend backend = infoService.getAllBackends().get(0); + Backend backend = infoService.getAllBackendsByAllCluster().values().asList().get(0); Assert.assertFalse(backend.isQueryAvailable()); Assert.assertFalse(backend.isLoadAvailable()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/AddReplicaChoseMediumTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/AddReplicaChoseMediumTest.java index dee048223b5..6441b9e9bba 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/AddReplicaChoseMediumTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/AddReplicaChoseMediumTest.java @@ -50,7 +50,7 @@ public class AddReplicaChoseMediumTest extends TestWithFeService { @Test public void testAddReplicaChoseMedium() throws Exception { TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); - List<Backend> backends = Env.getCurrentSystemInfo().getAllBackends(); + List<Backend> backends = Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList(); Assertions.assertEquals(backendNum(), backends.size()); for (Backend be : backends) { Assertions.assertEquals(0, invertedIndex.getTabletNumByBackendId(be.getId())); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/BalanceStatistic.java b/fe/fe-core/src/test/java/org/apache/doris/clone/BalanceStatistic.java index 174001a1bcc..d903c68fcff 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/BalanceStatistic.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/BalanceStatistic.java @@ -19,6 +19,7 @@ package org.apache.doris.clone; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Replica; +import org.apache.doris.common.UserException; import org.apache.doris.system.Backend; import com.google.common.collect.Maps; @@ -37,10 +38,10 @@ public class BalanceStatistic { this.backendTotalReplicaNum = backendTotalReplicaNum; } - public static BalanceStatistic getCurrentBalanceStatistic() { + public static BalanceStatistic getCurrentBalanceStatistic() throws UserException { Map<Long, Long> backendTotalDataSize = Maps.newHashMap(); Map<Long, Integer> backendTotalReplicaNum = Maps.newHashMap(); - List<Backend> backends = Env.getCurrentSystemInfo().getIdToBackend().values().asList(); + List<Backend> backends = Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList(); backends.forEach(be -> { backendTotalDataSize.put(be.getId(), 0L); backendTotalReplicaNum.put(be.getId(), 0); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerPerfTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerPerfTest.java index c560c62003a..0ff5adadb4c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerPerfTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerPerfTest.java @@ -65,7 +65,7 @@ public class ColocateTableCheckerAndBalancerPerfTest { Config.disable_tablet_scheduler = true; UtFrameUtils.createDorisClusterWithMultiTag(runningDir, 6); - backends = Env.getCurrentSystemInfo().getIdToBackend().values().asList(); + backends = Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList(); for (Backend be : backends) { for (DiskInfo diskInfo : be.getDisks().values()) { diskInfo.setTotalCapacityB(10L << 40); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java index d97e3a09549..81e29d3abf3 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java @@ -80,7 +80,7 @@ public class DecommissionTest { // 127.0.0.3 // 127.0.0.4 UtFrameUtils.createDorisClusterWithMultiTag(runningDir, 4); - List<Backend> backends = Env.getCurrentSystemInfo().getAllBackends(); + List<Backend> backends = Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList(); for (Backend be : backends) { Map<String, TDisk> backendDisks = Maps.newHashMap(); TDisk tDisk1 = new TDisk(); @@ -139,7 +139,7 @@ public class DecommissionTest { int totalReplicaNum = 1 * 2400; checkBalance(1, totalReplicaNum, 4); - Backend backend = Env.getCurrentSystemInfo().getAllBackends().get(0); + Backend backend = Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList().get(0); String decommissionStmtStr = "alter system decommission backend \"" + backend.getHost() + ":" + backend.getHeartbeatPort() + "\""; AlterSystemStmt decommissionStmt = diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java index 40b6683da3d..860a36f8f63 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java @@ -65,7 +65,7 @@ public class DiskReblanceWhenSchedulerIdle extends TestWithFeService { public void testDiskReblanceWhenSchedulerIdle() throws Exception { // case start TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); - List<Backend> backends = Env.getCurrentSystemInfo().getAllBackends(); + List<Backend> backends = Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList(); Assertions.assertEquals(backendNum(), backends.size()); for (Backend be : backends) { Assertions.assertEquals(0, invertedIndex.getTabletNumByBackendId(be.getId())); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java index 356d8ed6422..c02afa1db08 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java @@ -131,7 +131,7 @@ public class TabletRepairAndBalanceTest { Env.getCurrentEnv().createDb(createDbStmt); // must set disk info, or the tablet scheduler won't work - backends = Env.getCurrentSystemInfo().getAllBackends(); + backends = Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList(); for (Backend be : backends) { Map<String, TDisk> backendDisks = Maps.newHashMap(); TDisk tDisk1 = new TDisk(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java index 7d918ef7db5..97f3f0d878a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java @@ -92,7 +92,7 @@ public class TabletReplicaTooSlowTest { Env.getCurrentEnv().createDb(createDbStmt); // must set disk info, or the tablet scheduler won't work - backends = Env.getCurrentSystemInfo().getAllBackends(); + backends = Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList(); for (Backend be : backends) { Map<String, TDisk> backendDisks = Maps.newHashMap(); TDisk tDisk1 = new TDisk(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java index d4bbf6e8899..28a3c713a19 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java @@ -74,7 +74,7 @@ public class DecommissionBackendTest extends TestWithFeService { // 1. create connect context connectContext = createDefaultCtx(); - ImmutableMap<Long, Backend> idToBackendRef = Env.getCurrentSystemInfo().getIdToBackend(); + ImmutableMap<Long, Backend> idToBackendRef = Env.getCurrentSystemInfo().getAllBackendsByAllCluster(); Assertions.assertEquals(backendNum(), idToBackendRef.size()); // 2. create database db1 @@ -105,11 +105,11 @@ public class DecommissionBackendTest extends TestWithFeService { Assertions.assertTrue(srcBackend.isDecommissioned()); long startTimestamp = System.currentTimeMillis(); while (System.currentTimeMillis() - startTimestamp < 90000 - && Env.getCurrentSystemInfo().getIdToBackend().containsKey(srcBackend.getId())) { + && Env.getCurrentSystemInfo().getAllBackendsByAllCluster().containsKey(srcBackend.getId())) { Thread.sleep(1000); } - Assertions.assertEquals(backendNum() - 1, Env.getCurrentSystemInfo().getIdToBackend().size()); + Assertions.assertEquals(backendNum() - 1, Env.getCurrentSystemInfo().getAllBackendsByAllCluster().size()); // For now, we have pre-built internal table: analysis_job and column_statistics Assertions.assertEquals(tabletNum, @@ -117,14 +117,14 @@ public class DecommissionBackendTest extends TestWithFeService { // 6. add backend addNewBackend(); - Assertions.assertEquals(backendNum(), Env.getCurrentSystemInfo().getIdToBackend().size()); + Assertions.assertEquals(backendNum(), Env.getCurrentSystemInfo().getAllBackendsByAllCluster().size()); } @Test public void testDecommissionBackendById() throws Exception { // 1. create connect context connectContext = createDefaultCtx(); - ImmutableMap<Long, Backend> idToBackendRef = Env.getCurrentSystemInfo().getIdToBackend(); + ImmutableMap<Long, Backend> idToBackendRef = Env.getCurrentSystemInfo().getAllBackendsByAllCluster(); Assertions.assertEquals(backendNum(), idToBackendRef.size()); // 2. create database db1 @@ -158,15 +158,15 @@ public class DecommissionBackendTest extends TestWithFeService { Assertions.assertTrue(srcBackend.isDecommissioned()); long startTimestamp = System.currentTimeMillis(); while (System.currentTimeMillis() - startTimestamp < 90000 - && Env.getCurrentSystemInfo().getIdToBackend().containsKey(srcBackend.getId())) { + && Env.getCurrentSystemInfo().getAllBackendsByAllCluster().containsKey(srcBackend.getId())) { Thread.sleep(1000); } - Assertions.assertEquals(backendNum() - 1, Env.getCurrentSystemInfo().getIdToBackend().size()); + Assertions.assertEquals(backendNum() - 1, Env.getCurrentSystemInfo().getAllBackendsByAllCluster().size()); // add backend addNewBackend(); - Assertions.assertEquals(backendNum(), Env.getCurrentSystemInfo().getIdToBackend().size()); + Assertions.assertEquals(backendNum(), Env.getCurrentSystemInfo().getAllBackendsByAllCluster().size()); } @@ -177,7 +177,7 @@ public class DecommissionBackendTest extends TestWithFeService { SystemInfoService infoService = Env.getCurrentSystemInfo(); - ImmutableMap<Long, Backend> idToBackendRef = infoService.getIdToBackend(); + ImmutableMap<Long, Backend> idToBackendRef = infoService.getAllBackendsByAllCluster(); Assertions.assertEquals(backendNum(), idToBackendRef.size()); // 2. create database db3 @@ -220,12 +220,12 @@ public class DecommissionBackendTest extends TestWithFeService { long startTimestamp = System.currentTimeMillis(); while (System.currentTimeMillis() - startTimestamp < 90000 - && Env.getCurrentSystemInfo().getIdToBackend().containsKey(srcBackend.getId())) { + && Env.getCurrentSystemInfo().getAllBackendsByAllCluster().containsKey(srcBackend.getId())) { Thread.sleep(1000); } // BE has been dropped successfully - Assertions.assertEquals(backendNum() - 1, Env.getCurrentSystemInfo().getIdToBackend().size()); + Assertions.assertEquals(backendNum() - 1, Env.getCurrentSystemInfo().getAllBackendsByAllCluster().size()); // tbl1 has been dropped successfully final String sql = "show create table db3.tbl1;"; @@ -242,7 +242,7 @@ public class DecommissionBackendTest extends TestWithFeService { dropTable("db3.tbl1", false); addNewBackend(); - Assertions.assertEquals(backendNum(), Env.getCurrentSystemInfo().getIdToBackend().size()); + Assertions.assertEquals(backendNum(), Env.getCurrentSystemInfo().getAllBackendsByAllCluster().size()); } @Test @@ -250,7 +250,7 @@ public class DecommissionBackendTest extends TestWithFeService { // 1. create connect context connectContext = createDefaultCtx(); - ImmutableMap<Long, Backend> idToBackendRef = Env.getCurrentSystemInfo().getIdToBackend(); + ImmutableMap<Long, Backend> idToBackendRef = Env.getCurrentSystemInfo().getAllBackendsByAllCluster(); Assertions.assertEquals(backendNum(), idToBackendRef.size()); // 2. create database db1 @@ -321,11 +321,11 @@ public class DecommissionBackendTest extends TestWithFeService { Assertions.assertTrue(srcBackend.isDecommissioned()); long startTimestamp = System.currentTimeMillis(); while (System.currentTimeMillis() - startTimestamp < 90000 - && Env.getCurrentSystemInfo().getIdToBackend().containsKey(srcBackend.getId())) { + && Env.getCurrentSystemInfo().getAllBackendsByAllCluster().containsKey(srcBackend.getId())) { Thread.sleep(1000); } - Assertions.assertEquals(backendNum() - 1, Env.getCurrentSystemInfo().getIdToBackend().size()); + Assertions.assertEquals(backendNum() - 1, Env.getCurrentSystemInfo().getAllBackendsByAllCluster().size()); // For now, we have pre-built internal table: analysis_job and column_statistics Assertions.assertEquals(tabletNum, @@ -337,6 +337,6 @@ public class DecommissionBackendTest extends TestWithFeService { // 6. add backend addNewBackend(); - Assertions.assertEquals(backendNum(), Env.getCurrentSystemInfo().getIdToBackend().size()); + Assertions.assertEquals(backendNum(), Env.getCurrentSystemInfo().getAllBackendsByAllCluster().size()); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/cluster/SystemInfoServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/cluster/SystemInfoServiceTest.java index 40c207631e7..c48ba030e77 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cluster/SystemInfoServiceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cluster/SystemInfoServiceTest.java @@ -311,7 +311,7 @@ public class SystemInfoServiceTest { DataInputStream dis = new DataInputStream(new BufferedInputStream(new FileInputStream(file))); long checksum2 = systemInfoService.loadBackends(dis, 0); Assert.assertEquals(checksum1, checksum2); - Assert.assertEquals(1, systemInfoService.getIdToBackend().size()); + Assert.assertEquals(1, systemInfoService.getAllBackendsByAllCluster().size()); Backend back2 = systemInfoService.getBackend(1); Assert.assertEquals(back1, back2); dis.close(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/AutoBucketUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/AutoBucketUtilsTest.java index 3a239287342..f03a4282d9a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/AutoBucketUtilsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/AutoBucketUtilsTest.java @@ -18,6 +18,7 @@ package org.apache.doris.common.util; import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.persist.EditLog; @@ -63,7 +64,7 @@ public class AutoBucketUtilsTest { private static void createClusterWithBackends(int beNum, int diskNum, long diskCapacity) throws Exception { UtFrameUtils.createDorisClusterWithMultiTag(runningDir, beNum); // must set disk info, or the tablet scheduler won't work - backends = Env.getCurrentSystemInfo().getAllBackends(); + backends = Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList(); for (Backend be : backends) { setDiskInfos(diskNum, diskCapacity, be); } @@ -101,7 +102,7 @@ public class AutoBucketUtilsTest { } private void expectations(Env env, EditLog editLog, SystemInfoService systemInfoService, - ImmutableMap<Long, Backend> backends) { + ImmutableMap<Long, Backend> backends) throws AnalysisException { new Expectations() { { Env.getServingEnv(); @@ -112,7 +113,7 @@ public class AutoBucketUtilsTest { minTimes = 0; result = systemInfoService; - systemInfoService.getAllBackendsMap(); + systemInfoService.getAllBackendsByAllCluster(); minTimes = 0; result = backends; diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java index 1f4b49b2026..00467541372 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java @@ -156,7 +156,7 @@ public class CanalSyncDataTest { minTimes = 0; result = backendIds; - systemInfoService.getIdToBackend(); + systemInfoService.getAllBackendsByAllCluster(); minTimes = 0; result = backendMap; diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java index 82f46862674..6933511d4e3 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java @@ -357,9 +357,10 @@ public class FederationBackendPolicyTest { int localHostNum = random.nextInt(3 - 1) + 1; Set<String> localHosts = new HashSet<>(); String localHost; + List<Backend> backends = service.getAllBackendsByAllCluster().values().asList(); for (int j = 0; j < localHostNum; ++j) { do { - localHost = service.getAllBackends().get(random.nextInt(service.getAllBackends().size())).getHost(); + localHost = backends.get(random.nextInt(backends.size())).getHost(); } while (!localHosts.add(localHost)); totalLocalHosts.add(localHost); } @@ -480,9 +481,10 @@ public class FederationBackendPolicyTest { int localHostNum = random.nextInt(3 - 1) + 1; Set<String> localHosts = new HashSet<>(); String localHost; + List<Backend> backends = service.getAllBackendsByAllCluster().values().asList(); for (int j = 0; j < localHostNum; ++j) { do { - localHost = service.getAllBackends().get(random.nextInt(service.getAllBackends().size())).getHost(); + localHost = backends.get(random.nextInt(backends.size())).getHost(); } while (!localHosts.add(localHost)); totalLocalHosts.add(localHost); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java index 55ee219db61..207bddae1b3 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java @@ -109,7 +109,7 @@ public class ResourceTagQueryTest { Env.getCurrentEnv().createDb(createDbStmt); // must set disk info, or the tablet scheduler won't work - backends = Env.getCurrentSystemInfo().getAllBackends(); + backends = Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList(); for (Backend be : backends) { Map<String, TDisk> backendDisks = Maps.newHashMap(); TDisk tDisk1 = new TDisk(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java index 7481e9ffd19..d3238328814 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java @@ -29,8 +29,8 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.TabletMeta; import org.apache.doris.common.Config; -import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.UserException; import org.apache.doris.common.proc.BackendsProcDir; import org.apache.doris.common.proc.ProcResult; import org.apache.doris.planner.OlapScanNode; @@ -80,7 +80,7 @@ public class DemoMultiBackendsTest { @BeforeClass public static void beforeClass() throws EnvVarNotSetException, IOException, - FeStartException, NotInitException, DdlException, InterruptedException { + FeStartException, NotInitException, UserException, InterruptedException { FeConstants.runningUnitTest = true; FeConstants.default_scheduler_interval_millisecond = 100; Config.tablet_checker_interval_ms = 1000; @@ -89,7 +89,7 @@ public class DemoMultiBackendsTest { UtFrameUtils.createDorisClusterWithMultiTag(runningDir, 3); // must set disk info, or the tablet scheduler won't work - backends = Env.getCurrentSystemInfo().getAllBackends(); + backends = Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList(); for (Backend be : backends) { Map<String, TDisk> backendDisks = Maps.newHashMap(); TDisk tDisk1 = new TDisk(); diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 2cd27b0968d..ddec61e6d43 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -1636,7 +1636,6 @@ class Suite implements GroovyInterceptable { } else { endpoint context.config.metaServiceHttpAddress } - endpoint context.config.metaServiceHttpAddress uri "/MetaService/http/drop_cluster?token=${token}" body request_body check check_func diff --git a/regression-test/suites/cloud_p0/multi_cluster/test_tvf.groovy b/regression-test/suites/cloud_p0/multi_cluster/test_tvf.groovy new file mode 100644 index 00000000000..13af1209e99 --- /dev/null +++ b/regression-test/suites/cloud_p0/multi_cluster/test_tvf.groovy @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import groovy.json.JsonSlurper + +suite('test_tvf_in_cloud', 'multi_cluster') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + ] + options.cloudMode = true + + def testCase = { + for (def i = 0; i < 100; i++) { + def ret = sql """select * from numbers("number" = "100")""" + assertEquals(ret.size(), 100) + test { + // current cloud not implement it + sql """select START_VERSION,END_VERSION from information_schema.rowsets""" + exception "_get_all_rowsets is not implemented" + } + } + } + + docker(options) { + def clusterName = "newcluster1" + // 添加一个新的cluster add_new_cluster + cluster.addBackend(3, clusterName) + + def result = sql """show clusters""" + logger.info("show cluster1 : {}", result) + def tag = getCloudBeTagByName(clusterName) + logger.info("tag = {}", tag) + + def jsonSlurper = new JsonSlurper() + def jsonObject = jsonSlurper.parseText(tag) + def cloudClusterId = jsonObject.cloud_cluster_id + // multi cluster env + + // current cluster + testCase.call() + // use other cluster + def ret = sql_return_maparray """show clusters""" + def currentCluster = ret.stream().filter(cluster -> cluster.is_current == "TRUE").findFirst().orElse(null) + def otherCluster = ret.stream().filter(cluster -> cluster.is_current == "FALSE").findFirst().orElse(null) + assertTrue(otherCluster != null) + sql """use @${otherCluster.cluster}""" + testCase.call() + + // 调用http api 将add_new_cluster 下掉 + def ms = cluster.getAllMetaservices().get(0) + logger.info("ms addr={}, port={}", ms.host, ms.httpPort) + drop_cluster(clusterName, cloudClusterId, ms) + Thread.sleep(5000) + result = sql """show clusters""" + logger.info("show cluster2 : {}", result) + + // single cluster env + // use old clusterName, has been droped + test { + sql """select * from numbers("number" = "100")""" + exception "in cloud maybe this cluster has been dropped" + } + // switch to old cluster + sql """use @${currentCluster.cluster}""" + testCase.call() + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org