This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 7056f99d46a [profile](Brokerload) Support get broker load profile (#35110) 7056f99d46a is described below commit 7056f99d46abe9865ec5a614c30892d5f1c3737a Author: zhiqiang <seuhezhiqi...@163.com> AuthorDate: Wed May 22 15:15:57 2024 +0800 [profile](Brokerload) Support get broker load profile (#35110) --- .../doris/common/profile/ExecutionProfile.java | 11 +++ .../org/apache/doris/common/profile/Profile.java | 1 + .../apache/doris/common/util/ProfileManager.java | 83 +++++++++++++++++----- .../apache/doris/load/loadv2/BrokerLoadJob.java | 4 +- .../java/org/apache/doris/load/loadv2/LoadJob.java | 16 +++++ .../main/java/org/apache/doris/qe/Coordinator.java | 4 ++ .../java/org/apache/doris/qe/QeProcessorImpl.java | 2 +- 7 files changed, 100 insertions(+), 21 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java index 76420045be6..e8b450b530c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java @@ -250,8 +250,14 @@ public class ExecutionProfile { } public Status updateProfile(TQueryProfile profile, TNetworkAddress backendHBAddress, boolean isDone) { + if (!profile.isSetQueryId()) { + LOG.warn("QueryId is not set"); + return new Status(TStatusCode.INVALID_ARGUMENT, "QueryId is not set"); + } + if (isPipelineXProfile) { if (!profile.isSetFragmentIdToProfile()) { + LOG.warn("{} FragmentIdToProfile is not set", DebugUtil.printId(profile.getQueryId())); return new Status(TStatusCode.INVALID_ARGUMENT, "FragmentIdToProfile is not set"); } @@ -266,6 +272,7 @@ public class ExecutionProfile { RuntimeProfile profileNode = new RuntimeProfile(name); taskProfile.add(profileNode); if (!pipelineProfile.isSetProfile()) { + LOG.warn("Profile is not set, {}", DebugUtil.printId(profile.getQueryId())); return new Status(TStatusCode.INVALID_ARGUMENT, "Profile is not set"); } @@ -278,10 +285,13 @@ public class ExecutionProfile { } } else { if (!profile.isSetInstanceProfiles() || !profile.isSetFragmentInstanceIds()) { + LOG.warn("InstanceIdToProfile is not set, {}", DebugUtil.printId(profile.getQueryId())); return new Status(TStatusCode.INVALID_ARGUMENT, "InstanceIdToProfile is not set"); } if (profile.fragment_instance_ids.size() != profile.instance_profiles.size()) { + LOG.warn("InstanceIdToProfile size is not equal, {}", + DebugUtil.printId(profile.getQueryId())); return new Status(TStatusCode.INVALID_ARGUMENT, "InstanceIdToProfile size is not equal"); } @@ -289,6 +299,7 @@ public class ExecutionProfile { TUniqueId instanceId = profile.getFragmentInstanceIds().get(idx); TRuntimeProfileTree instanceProfile = profile.getInstanceProfiles().get(idx); if (instanceProfile == null) { + LOG.warn("Profile is not set {}", DebugUtil.printId(profile.getQueryId())); return new Status(TStatusCode.INVALID_ARGUMENT, "Profile is not set"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java index da0790ff57c..f94e0d235ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java @@ -140,6 +140,7 @@ public class Profile { } } try { + // For load task, they will have multiple execution_profiles. for (ExecutionProfile executionProfile : executionProfiles) { builder.append("\n"); executionProfile.getRoot().prettyPrint(builder, ""); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java index 5b36d81dd68..eaeb3635c58 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java @@ -17,6 +17,7 @@ package org.apache.doris.common.util; +import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.AuthenticationException; import org.apache.doris.common.ClientPool; @@ -29,6 +30,7 @@ import org.apache.doris.common.profile.Profile; import org.apache.doris.common.profile.ProfileTreeBuilder; import org.apache.doris.common.profile.ProfileTreeNode; import org.apache.doris.common.profile.SummaryProfile; +import org.apache.doris.load.loadv2.LoadJob; import org.apache.doris.nereids.stats.StatsErrorEstimator; import org.apache.doris.qe.CoordInterface; import org.apache.doris.qe.QeProcessorImpl; @@ -326,28 +328,72 @@ public class ProfileManager { return resp; } - public String getProfile(String queryID) { - TUniqueId thriftQueryId = Util.parseTUniqueIdFromString(queryID); - List<TNetworkAddress> involvedBackends = null; - if (thriftQueryId != null) { - CoordInterface coor = QeProcessorImpl.INSTANCE.getCoordinator(thriftQueryId); - if (coor != null) { - involvedBackends = coor.getInvolvedBackends(); - } + private List<Future<TGetRealtimeExecStatusResponse>> createFetchRealTimeProfileTasks(String id) { + // For query, id is queryId, for load, id is LoadLoadingTaskId + class QueryIdAndAddress { + public TUniqueId id; + public TNetworkAddress beAddress; } List<Future<TGetRealtimeExecStatusResponse>> futures = Lists.newArrayList(); + TUniqueId queryId = Util.parseTUniqueIdFromString(id); + List<QueryIdAndAddress> involvedBackends = Lists.newArrayList(); + + if (queryId != null) { + CoordInterface coor = QeProcessorImpl.INSTANCE.getCoordinator(queryId); - if (involvedBackends != null) { - for (TNetworkAddress beAddress : involvedBackends) { - Callable<TGetRealtimeExecStatusResponse> task = () -> { - return getRealtimeQueryProfile(thriftQueryId, beAddress); - }; - Future<TGetRealtimeExecStatusResponse> future = fetchRealTimeProfileExecutor.submit(task); - futures.add(future); + if (coor != null) { + for (TNetworkAddress addr : coor.getInvolvedBackends()) { + QueryIdAndAddress tmp = new QueryIdAndAddress(); + tmp.id = queryId; + tmp.beAddress = addr; + involvedBackends.add(tmp); + } } + } else { + Long loadJobId = (long) -1; + try { + loadJobId = Long.parseLong(id); + } catch (Exception e) { + return futures; + } + + LoadJob loadJob = Env.getCurrentEnv().getLoadManager().getLoadJob(loadJobId); + if (loadJob.getLoadTaskIds() == null) { + return futures; + } + + for (TUniqueId taskId : loadJob.getLoadTaskIds()) { + CoordInterface coor = QeProcessorImpl.INSTANCE.getCoordinator(taskId); + if (coor != null) { + if (coor.getInvolvedBackends() != null) { + for (TNetworkAddress beAddress : coor.getInvolvedBackends()) { + QueryIdAndAddress tmp = new QueryIdAndAddress(); + tmp.id = taskId; + tmp.beAddress = beAddress; + involvedBackends.add(tmp); + } + } else { + LOG.warn("Involved backends is null, load job {}, task {}", id, DebugUtil.printId(taskId)); + } + } + } + } + + for (QueryIdAndAddress idAndAddress : involvedBackends) { + Callable<TGetRealtimeExecStatusResponse> task = () -> { + return getRealtimeQueryProfile(idAndAddress.id, idAndAddress.beAddress); + }; + Future<TGetRealtimeExecStatusResponse> future = fetchRealTimeProfileExecutor.submit(task); + futures.add(future); } + return futures; + } + + public String getProfile(String id) { + List<Future<TGetRealtimeExecStatusResponse>> futures = createFetchRealTimeProfileTasks(id); + // beAddr of reportExecStatus of QeProcessorImpl is meaningless, so assign a dummy address // to avoid compile failing. TNetworkAddress dummyAddr = new TNetworkAddress(); @@ -358,18 +404,17 @@ public class ProfileManager { QeProcessorImpl.INSTANCE.reportExecStatus(resp.getReportExecStatusParams(), dummyAddr); } } catch (Exception e) { - LOG.warn("Failed to get real-time profile, query {}, error: {}", - DebugUtil.printId(thriftQueryId), e.getMessage(), e); + LOG.warn("Failed to get real-time profile, id {}, error: {}", id, e.getMessage(), e); } } if (!futures.isEmpty()) { - LOG.info("Get real-time exec status finished, query {}", queryID); + LOG.info("Get real-time exec status finished, id {}", id); } readLock.lock(); try { - ProfileElement element = queryIdToProfileMap.get(queryID); + ProfileElement element = queryIdToProfileMap.get(id); if (element == null) { return null; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index fd70e03caa7..275cd371cdd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -238,6 +238,8 @@ public class BrokerLoadJob extends BulkLoadJob { this.jobProfile = new Profile("BrokerLoadJob " + id + ". " + label, true, Integer.valueOf(sessionVariables.getOrDefault(SessionVariable.PROFILE_LEVEL, "3")), false); + // profile is registered in ProfileManager, so that we can get realtime profile + jobProfile.updateSummary(loadStartTimestamp, getSummaryInfo(false), false, null); } ProgressManager progressManager = Env.getCurrentProgressManager(); progressManager.registerProgressSimple(String.valueOf(id)); @@ -391,7 +393,7 @@ public class BrokerLoadJob extends BulkLoadJob { builder.endTime(TimeUtils.longToTimeString(currentTimestamp)); builder.totalTime(DebugUtil.getPrettyStringMs(currentTimestamp - createTimestamp)); } - builder.taskState("FINISHED"); + builder.taskState(isFinished ? "FINISHED" : "RUNNING"); builder.user(getUserInfo() != null ? getUserInfo().getQualifiedUser() : "N/A"); builder.defaultDb(getDefaultDb()); builder.sqlStatement(getOriginStmt().originStmt); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 65befadb13a..f28394957bd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -700,6 +700,22 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements failMsg = loadJobFinalOperation.getFailMsg(); } + public List<TUniqueId> getLoadTaskIds() { + readLock(); + try { + List<TUniqueId> res = Lists.newArrayList(); + for (LoadTask task : idToTasks.values()) { + if (task instanceof LoadLoadingTask) { + LoadLoadingTask loadLoadingTask = (LoadLoadingTask) task; + res.add(loadLoadingTask.getLoadId()); + } + } + return res; + } finally { + readUnlock(); + } + } + public List<Comparable> getShowInfo() throws DdlException { readLock(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 07b5daefaa1..4467a7150ad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -389,6 +389,10 @@ public class Coordinator implements CoordInterface { // https://github.com/apache/doris/blob/bd6f5b6a0e5f1b12744607336123d7f97eb76af9/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java#L155 this.enablePipelineEngine = Config.enable_pipeline_load; this.enablePipelineXEngine = Config.enable_pipeline_load; + // make sure Coordinator can update profile correctlly + if (this.enablePipelineXEngine) { + this.executionProfile.setPipelineX(); + } } private void setFromUserProperty(ConnectContext connectContext) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index f6b4671b20a..b762fdda87c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -247,7 +247,7 @@ public final class QeProcessorImpl implements QeProcessor { } } else { LOG.warn("Invalid report profile req, this is a logical error, BE must set backendId and isDone" - + " at same time, query id: {}" + DebugUtil.printId(params.query_id)); + + " at same time, query id: {}", DebugUtil.printId(params.query_id)); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org