This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7056f99d46a [profile](Brokerload) Support get broker load profile 
(#35110)
7056f99d46a is described below

commit 7056f99d46abe9865ec5a614c30892d5f1c3737a
Author: zhiqiang <seuhezhiqi...@163.com>
AuthorDate: Wed May 22 15:15:57 2024 +0800

    [profile](Brokerload) Support get broker load profile (#35110)
---
 .../doris/common/profile/ExecutionProfile.java     | 11 +++
 .../org/apache/doris/common/profile/Profile.java   |  1 +
 .../apache/doris/common/util/ProfileManager.java   | 83 +++++++++++++++++-----
 .../apache/doris/load/loadv2/BrokerLoadJob.java    |  4 +-
 .../java/org/apache/doris/load/loadv2/LoadJob.java | 16 +++++
 .../main/java/org/apache/doris/qe/Coordinator.java |  4 ++
 .../java/org/apache/doris/qe/QeProcessorImpl.java  |  2 +-
 7 files changed, 100 insertions(+), 21 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
index 76420045be6..e8b450b530c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
@@ -250,8 +250,14 @@ public class ExecutionProfile {
     }
 
     public Status updateProfile(TQueryProfile profile, TNetworkAddress 
backendHBAddress, boolean isDone) {
+        if (!profile.isSetQueryId()) {
+            LOG.warn("QueryId is not set");
+            return new Status(TStatusCode.INVALID_ARGUMENT, "QueryId is not 
set");
+        }
+
         if (isPipelineXProfile) {
             if (!profile.isSetFragmentIdToProfile()) {
+                LOG.warn("{} FragmentIdToProfile is not set", 
DebugUtil.printId(profile.getQueryId()));
                 return new Status(TStatusCode.INVALID_ARGUMENT, 
"FragmentIdToProfile is not set");
             }
 
@@ -266,6 +272,7 @@ public class ExecutionProfile {
                     RuntimeProfile profileNode = new RuntimeProfile(name);
                     taskProfile.add(profileNode);
                     if (!pipelineProfile.isSetProfile()) {
+                        LOG.warn("Profile is not set, {}", 
DebugUtil.printId(profile.getQueryId()));
                         return new Status(TStatusCode.INVALID_ARGUMENT, 
"Profile is not set");
                     }
 
@@ -278,10 +285,13 @@ public class ExecutionProfile {
             }
         } else {
             if (!profile.isSetInstanceProfiles() || 
!profile.isSetFragmentInstanceIds()) {
+                LOG.warn("InstanceIdToProfile is not set, {}", 
DebugUtil.printId(profile.getQueryId()));
                 return new Status(TStatusCode.INVALID_ARGUMENT, 
"InstanceIdToProfile is not set");
             }
 
             if (profile.fragment_instance_ids.size() != 
profile.instance_profiles.size()) {
+                LOG.warn("InstanceIdToProfile size is not equal, {}",
+                        DebugUtil.printId(profile.getQueryId()));
                 return new Status(TStatusCode.INVALID_ARGUMENT, 
"InstanceIdToProfile size is not equal");
             }
 
@@ -289,6 +299,7 @@ public class ExecutionProfile {
                 TUniqueId instanceId = 
profile.getFragmentInstanceIds().get(idx);
                 TRuntimeProfileTree instanceProfile = 
profile.getInstanceProfiles().get(idx);
                 if (instanceProfile == null) {
+                    LOG.warn("Profile is not set {}", 
DebugUtil.printId(profile.getQueryId()));
                     return new Status(TStatusCode.INVALID_ARGUMENT, "Profile 
is not set");
                 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
index da0790ff57c..f94e0d235ef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
@@ -140,6 +140,7 @@ public class Profile {
             }
         }
         try {
+            // For load task, they will have multiple execution_profiles.
             for (ExecutionProfile executionProfile : executionProfiles) {
                 builder.append("\n");
                 executionProfile.getRoot().prettyPrint(builder, "");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java
index 5b36d81dd68..eaeb3635c58 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.common.util;
 
+import org.apache.doris.catalog.Env;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.AuthenticationException;
 import org.apache.doris.common.ClientPool;
@@ -29,6 +30,7 @@ import org.apache.doris.common.profile.Profile;
 import org.apache.doris.common.profile.ProfileTreeBuilder;
 import org.apache.doris.common.profile.ProfileTreeNode;
 import org.apache.doris.common.profile.SummaryProfile;
+import org.apache.doris.load.loadv2.LoadJob;
 import org.apache.doris.nereids.stats.StatsErrorEstimator;
 import org.apache.doris.qe.CoordInterface;
 import org.apache.doris.qe.QeProcessorImpl;
@@ -326,28 +328,72 @@ public class ProfileManager {
         return resp;
     }
 
-    public String getProfile(String queryID) {
-        TUniqueId thriftQueryId = Util.parseTUniqueIdFromString(queryID);
-        List<TNetworkAddress> involvedBackends = null;
-        if (thriftQueryId != null) {
-            CoordInterface coor = 
QeProcessorImpl.INSTANCE.getCoordinator(thriftQueryId);
-            if (coor != null) {
-                involvedBackends = coor.getInvolvedBackends();
-            }
+    private List<Future<TGetRealtimeExecStatusResponse>> 
createFetchRealTimeProfileTasks(String id) {
+        // For query, id is queryId, for load, id is LoadLoadingTaskId
+        class QueryIdAndAddress {
+            public TUniqueId id;
+            public TNetworkAddress beAddress;
         }
 
         List<Future<TGetRealtimeExecStatusResponse>> futures = 
Lists.newArrayList();
+        TUniqueId queryId = Util.parseTUniqueIdFromString(id);
+        List<QueryIdAndAddress> involvedBackends = Lists.newArrayList();
+
+        if (queryId != null) {
+            CoordInterface coor = 
QeProcessorImpl.INSTANCE.getCoordinator(queryId);
 
-        if (involvedBackends != null) {
-            for (TNetworkAddress beAddress : involvedBackends) {
-                Callable<TGetRealtimeExecStatusResponse> task = () -> {
-                    return getRealtimeQueryProfile(thriftQueryId, beAddress);
-                };
-                Future<TGetRealtimeExecStatusResponse> future = 
fetchRealTimeProfileExecutor.submit(task);
-                futures.add(future);
+            if (coor != null) {
+                for (TNetworkAddress addr : coor.getInvolvedBackends()) {
+                    QueryIdAndAddress tmp = new QueryIdAndAddress();
+                    tmp.id = queryId;
+                    tmp.beAddress = addr;
+                    involvedBackends.add(tmp);
+                }
             }
+        } else {
+            Long loadJobId = (long) -1;
+            try {
+                loadJobId = Long.parseLong(id);
+            } catch (Exception e) {
+                return futures;
+            }
+
+            LoadJob loadJob = 
Env.getCurrentEnv().getLoadManager().getLoadJob(loadJobId);
+            if (loadJob.getLoadTaskIds() == null) {
+                return futures;
+            }
+
+            for (TUniqueId taskId : loadJob.getLoadTaskIds()) {
+                CoordInterface coor = 
QeProcessorImpl.INSTANCE.getCoordinator(taskId);
+                if (coor != null) {
+                    if (coor.getInvolvedBackends() != null) {
+                        for (TNetworkAddress beAddress : 
coor.getInvolvedBackends()) {
+                            QueryIdAndAddress tmp = new QueryIdAndAddress();
+                            tmp.id = taskId;
+                            tmp.beAddress = beAddress;
+                            involvedBackends.add(tmp);
+                        }
+                    } else {
+                        LOG.warn("Involved backends is null, load job {}, task 
{}", id, DebugUtil.printId(taskId));
+                    }
+                }
+            }
+        }
+
+        for (QueryIdAndAddress idAndAddress : involvedBackends) {
+            Callable<TGetRealtimeExecStatusResponse> task = () -> {
+                return getRealtimeQueryProfile(idAndAddress.id, 
idAndAddress.beAddress);
+            };
+            Future<TGetRealtimeExecStatusResponse> future = 
fetchRealTimeProfileExecutor.submit(task);
+            futures.add(future);
         }
 
+        return futures;
+    }
+
+    public String getProfile(String id) {
+        List<Future<TGetRealtimeExecStatusResponse>> futures = 
createFetchRealTimeProfileTasks(id);
+
         // beAddr of reportExecStatus of QeProcessorImpl is meaningless, so 
assign a dummy address
         // to avoid compile failing.
         TNetworkAddress dummyAddr = new TNetworkAddress();
@@ -358,18 +404,17 @@ public class ProfileManager {
                     
QeProcessorImpl.INSTANCE.reportExecStatus(resp.getReportExecStatusParams(), 
dummyAddr);
                 }
             } catch (Exception e) {
-                LOG.warn("Failed to get real-time profile, query {}, error: 
{}",
-                        DebugUtil.printId(thriftQueryId), e.getMessage(), e);
+                LOG.warn("Failed to get real-time profile, id {}, error: {}", 
id, e.getMessage(), e);
             }
         }
 
         if (!futures.isEmpty()) {
-            LOG.info("Get real-time exec status finished, query {}", queryID);
+            LOG.info("Get real-time exec status finished, id {}", id);
         }
 
         readLock.lock();
         try {
-            ProfileElement element = queryIdToProfileMap.get(queryID);
+            ProfileElement element = queryIdToProfileMap.get(id);
             if (element == null) {
                 return null;
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index fd70e03caa7..275cd371cdd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -238,6 +238,8 @@ public class BrokerLoadJob extends BulkLoadJob {
             this.jobProfile = new Profile("BrokerLoadJob " + id + ". " + 
label, true,
                     
Integer.valueOf(sessionVariables.getOrDefault(SessionVariable.PROFILE_LEVEL, 
"3")),
                     false);
+            // profile is registered in ProfileManager, so that we can get 
realtime profile
+            jobProfile.updateSummary(loadStartTimestamp, 
getSummaryInfo(false), false, null);
         }
         ProgressManager progressManager = Env.getCurrentProgressManager();
         progressManager.registerProgressSimple(String.valueOf(id));
@@ -391,7 +393,7 @@ public class BrokerLoadJob extends BulkLoadJob {
             builder.endTime(TimeUtils.longToTimeString(currentTimestamp));
             builder.totalTime(DebugUtil.getPrettyStringMs(currentTimestamp - 
createTimestamp));
         }
-        builder.taskState("FINISHED");
+        builder.taskState(isFinished ? "FINISHED" : "RUNNING");
         builder.user(getUserInfo() != null ? getUserInfo().getQualifiedUser() 
: "N/A");
         builder.defaultDb(getDefaultDb());
         builder.sqlStatement(getOriginStmt().originStmt);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index 65befadb13a..f28394957bd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -700,6 +700,22 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
         failMsg = loadJobFinalOperation.getFailMsg();
     }
 
+    public List<TUniqueId> getLoadTaskIds() {
+        readLock();
+        try {
+            List<TUniqueId> res = Lists.newArrayList();
+            for (LoadTask task : idToTasks.values()) {
+                if (task instanceof LoadLoadingTask) {
+                    LoadLoadingTask loadLoadingTask = (LoadLoadingTask) task;
+                    res.add(loadLoadingTask.getLoadId());
+                }
+            }
+            return res;
+        } finally {
+            readUnlock();
+        }
+    }
+
     public List<Comparable> getShowInfo() throws DdlException {
         readLock();
         try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 07b5daefaa1..4467a7150ad 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -389,6 +389,10 @@ public class Coordinator implements CoordInterface {
         // 
https://github.com/apache/doris/blob/bd6f5b6a0e5f1b12744607336123d7f97eb76af9/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java#L155
         this.enablePipelineEngine = Config.enable_pipeline_load;
         this.enablePipelineXEngine = Config.enable_pipeline_load;
+        // make sure Coordinator can update profile correctlly
+        if (this.enablePipelineXEngine) {
+            this.executionProfile.setPipelineX();
+        }
     }
 
     private void setFromUserProperty(ConnectContext connectContext) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index f6b4671b20a..b762fdda87c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -247,7 +247,7 @@ public final class QeProcessorImpl implements QeProcessor {
                 }
             } else {
                 LOG.warn("Invalid report profile req, this is a logical error, 
BE must set backendId and isDone"
-                            + " at same time, query id: {}" + 
DebugUtil.printId(params.query_id));
+                            + " at same time, query id: {}", 
DebugUtil.printId(params.query_id));
             }
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to