github-actions[bot] commented on code in PR #60567:
URL: https://github.com/apache/doris/pull/60567#discussion_r3121982896


##########
fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java:
##########
@@ -212,19 +213,23 @@ public void unregisterQuery(TUniqueId queryId) {
     @Override
     public Map<String, QueryStatisticsItem> getQueryStatistics() {
         final Map<String, QueryStatisticsItem> querySet = Maps.newHashMap();
+        final Map<String, TQueryStatistics> queryStatisticsMap =

Review Comment:
   This call path makes `show proc '/current_queries'` rely on 
`WorkloadRuntimeStatusMgr.getQueryStatisticsMap()`, but that helper is still 
explicitly written under the assumption that it only runs before 
`clearReportTimeoutBeStatistics()` and therefore skips null 
checks/snapshotting. Once we invoke it from a synchronous proc/REST request, 
the daemon thread can remove a BE/query between the `keySet()` snapshot and the 
`get(...)`, so `beToQueryStatsMap.get(beId)` or `queryStatsMap.get(queryId)` 
can become `null` and the listing intermittently fails with NPE. Please expose 
a thread-safe snapshot accessor before calling it here.



##########
fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java:
##########
@@ -49,7 +49,7 @@ private ProcService() {
         root.register("trash", new TrashProcDir());
         root.register("monitor", new MonitorProcDir());
         root.register("current_queries", new CurrentQueryStatisticsProcDir());
-        root.register("current_query_stmts", new 
CurrentQueryStatementsProcNode());
+        root.register("current_query_stmts", new 
CurrentQueryStatisticsProcDir());

Review Comment:
   `current_query_stmts` used to expose the 8-column statement-only view from 
`CurrentQueryStatementsProcNode`. Repointing it to 
`CurrentQueryStatisticsProcDir` changes both schema and semantics, so any 
existing `show proc '/current_query_stmts'` caller will break immediately. If 
the goal is to enrich `current_queries`, this proc entry should stay on the old 
node for compatibility.



##########
fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java:
##########
@@ -47,15 +56,7 @@ public boolean register(String name, ProcNodeInterface node) 
{
 
     @Override
     public ProcNodeInterface lookup(String name) throws AnalysisException {
-        if (Strings.isNullOrEmpty(name)) {
-            return null;
-        }
-        final Map<String, QueryStatisticsItem> statistic = 
QeProcessorImpl.INSTANCE.getQueryStatistics();
-        final QueryStatisticsItem item = statistic.get(name);
-        if (item == null) {
-            throw new AnalysisException(name + " doesn't exist.");
-        }
-        return new CurrentQuerySqlProcDir(item);

Review Comment:
   This removes the existing proc subtree entirely. Before this PR, `show proc 
'/current_queries/<query_id>'` returned the SQL row and `show proc 
'/current_queries/<query_id>/fragments'` returned per-fragment instance stats 
via `CurrentQuerySqlProcDir` / `CurrentQueryFragmentProcNode`. Throwing here 
makes both paths fail for every caller, which is a user-visible regression 
unrelated to the new progress columns. Please keep the old lookup path working 
(or add a new proc entry for the richer top-level view) instead of 
hard-disabling it.



##########
be/src/runtime/query_context.cpp:
##########
@@ -558,4 +558,29 @@ Status QueryContext::reset_global_rf(const 
google::protobuf::RepeatedField<int32
     return Status::OK();
 }
 
+void QueryContext::get_task_counts(int* total, int* finished) {
+    *total = 0;
+    *finished = 0;
+    {
+        // Load counts from already closed and cleaned-up fragments (archived 
in QueryContext)
+        std::lock_guard<std::mutex> lock(_task_counts_lock);
+        *total = _finished_task_counts.total;
+        *finished = _finished_task_counts.finished;
+    }
+    // Add counts from currently active fragments
+    std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
+    for (auto& [_, fragment_ctx_weak] : _fragment_id_to_pipeline_ctx) {
+        if (auto fragment_ctx = fragment_ctx_weak.lock()) {
+            *total += fragment_ctx->get_total_tasks();
+            *finished += fragment_ctx->get_closed_tasks();

Review Comment:
   `_closed_tasks` is mutated under `PipelineFragmentContext::_task_mutex` in 
`decrement_running_task()`, but this read is only protected by 
`_pipeline_map_write_lock`, which does not synchronize with that mutex. 
`RuntimeQueryStatisticsMgr` can call this concurrently with task completion, so 
`FinishedTasks` currently has a real C++ data race / UB. There is also an 
accounting bug here: `_close_fragment_instance()` archives a finished fragment 
into `_finished_task_counts` before the weak_ptr disappears from 
`_fragment_id_to_pipeline_ctx`, so this loop can briefly double-count the same 
fragment. Please use a synchronized accessor (or an atomic counter) and make 
sure finished fragments are no longer visible in the live map before archiving 
them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to