This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 1f197f6c931 [fix](be) Fix time-sharing executor queued task count 
(#64855)
1f197f6c931 is described below

commit 1f197f6c931217f546463371865c4d756b1a6058
Author: Raiden <[email protected]>
AuthorDate: Fri Jun 26 18:47:36 2026 +0800

    [fix](be) Fix time-sharing executor queued task count (#64855)
    
    Keep the executor queued-task metric consistent when queued splits are
    removed before execution. The fix routes queue offer/remove operations
    through helpers that update _total_queued_tasks together with the split
    queue and token state.
    
    Add a regression test that enqueues splits with no worker threads,
    removes their tasks, and verifies the queue count returns to zero so
    later submissions are not rejected as full.
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 .../time_sharing/time_sharing_task_executor.cpp    |  44 +++++++--
 .../time_sharing/time_sharing_task_executor.h      |   9 ++
 .../time_sharing_task_executor_test.cpp            | 101 ++++++++++++++++++++-
 3 files changed, 146 insertions(+), 8 deletions(-)

diff --git 
a/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp 
b/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp
index 5f22ba2e6f2..494568785e6 100644
--- a/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp
+++ b/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp
@@ -277,7 +277,7 @@ TimeSharingTaskExecutor::~TimeSharingTaskExecutor() {
         }
         {
             std::unique_lock<std::mutex> l(_lock);
-            _tokenless->_entries->remove_all(splits_to_destroy);
+            _remove_queued_splits_unlocked(splits_to_destroy);
         }
     }
 
@@ -422,7 +422,7 @@ Status 
TimeSharingTaskExecutor::_do_submit(std::shared_ptr<PrioritizedSplitRunne
     DCHECK(state == SplitThreadPoolToken::State::IDLE ||
            state == SplitThreadPoolToken::State::RUNNING);
     split->submit_time_watch().start();
-    _tokenless->_entries->offer(std::move(split));
+    _offer_split_unlocked(std::move(split));
     if (state == SplitThreadPoolToken::State::IDLE) {
         _tokenless->transition(SplitThreadPoolToken::State::RUNNING);
     }
@@ -434,8 +434,6 @@ Status 
TimeSharingTaskExecutor::_do_submit(std::shared_ptr<PrioritizedSplitRunne
     //    1. If it is a SERIAL token, and there are unsubmitted tasks, submit 
them to the queue.
     //    2. If it is a CONCURRENT token, and there are still unsubmitted 
tasks, and the upper limit of concurrency is not reached,
     //       then submitted to the queue.
-    _total_queued_tasks++;
-
     // Wake up an idle thread for this task. Choosing the thread at the front 
of
     // the list ensures LIFO semantics as idling threads are also added to the 
front.
     //
@@ -571,7 +569,8 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
                         lock.unlock();
                         l.lock();
                         if (_tokenless->state() == 
SplitThreadPoolToken::State::RUNNING) {
-                            _tokenless->_entries->offer(split);
+                            split->submit_time_watch().reset();
+                            _offer_split_unlocked(split);
                         }
                         l.unlock();
                     } else {
@@ -587,7 +586,8 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
                                 split->reset_level_priority();
                                 std::unique_lock<std::mutex> l(_lock);
                                 if (_tokenless->state() == 
SplitThreadPoolToken::State::RUNNING) {
-                                    _tokenless->_entries->offer(split);
+                                    split->submit_time_watch().reset();
+                                    _offer_split_unlocked(split);
                                 }
                             } else {
                                 LOG(WARNING) << "blocked split is failed, 
split_id: "
@@ -771,7 +771,7 @@ Status 
TimeSharingTaskExecutor::remove_task(std::shared_ptr<TaskHandle> task_han
         }
         {
             std::unique_lock<std::mutex> l(_lock);
-            _tokenless->_entries->remove_all(splits_to_destroy);
+            _remove_queued_splits_unlocked(splits_to_destroy);
         }
     }
 
@@ -847,6 +847,36 @@ Status 
TimeSharingTaskExecutor::re_enqueue_split(std::shared_ptr<TaskHandle> tas
     return _do_submit(prioritized_split);
 }
 
+void 
TimeSharingTaskExecutor::_offer_split_unlocked(std::shared_ptr<PrioritizedSplitRunner>
 split) {
+    _tokenless->_entries->offer(std::move(split));
+    ++_total_queued_tasks;
+}
+
+void TimeSharingTaskExecutor::_remove_queued_splits_unlocked(
+        const std::vector<std::shared_ptr<PrioritizedSplitRunner>>& splits) {
+    if (splits.empty()) {
+        return;
+    }
+
+    const size_t queue_size_before = _tokenless->_entries->size();
+    _tokenless->_entries->remove_all(splits);
+    const size_t queue_size_after = _tokenless->_entries->size();
+    DCHECK_GE(queue_size_before, queue_size_after);
+
+    const auto removed = static_cast<int>(queue_size_before - 
queue_size_after);
+    DCHECK_GE(_total_queued_tasks, removed);
+    _total_queued_tasks -= removed;
+
+    if (_tokenless->state() == SplitThreadPoolToken::State::RUNNING &&
+        _tokenless->_active_threads == 0 && _tokenless->_entries->size() == 0) 
{
+        _tokenless->transition(SplitThreadPoolToken::State::IDLE);
+    }
+
+    if (_total_queued_tasks == 0 && _active_threads == 0) {
+        _idle_cond.notify_all();
+    }
+}
+
 void 
TimeSharingTaskExecutor::_split_finished(std::shared_ptr<PrioritizedSplitRunner>
 split,
                                               const Status& status) {
     _completed_splits_per_level[split->priority().level()]++;
diff --git 
a/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.h 
b/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.h
index ba38ddb04da..13a42a1385c 100644
--- a/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.h
+++ b/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.h
@@ -294,6 +294,15 @@ private:
     // // Submits a task to be run via token.
     Status _do_submit(std::shared_ptr<PrioritizedSplitRunner> split);
 
+    // Offer a split to the executor queue and keep _total_queued_tasks 
consistent.
+    // REQUIRES: _lock is held.
+    void _offer_split_unlocked(std::shared_ptr<PrioritizedSplitRunner> split);
+
+    // Remove queued splits and keep _total_queued_tasks/token state 
consistent.
+    // REQUIRES: _lock is held.
+    void _remove_queued_splits_unlocked(
+            const std::vector<std::shared_ptr<PrioritizedSplitRunner>>& 
splits);
+
     //NOTE: not thread safe, caller should keep it thread-safe by using lock
     Status _try_create_thread(int thread_num, std::lock_guard<std::mutex>&);
 
diff --git 
a/be/test/exec/executor/time_sharing/time_sharing_task_executor_test.cpp 
b/be/test/exec/executor/time_sharing/time_sharing_task_executor_test.cpp
index 0459121408e..ede32923d41 100644
--- a/be/test/exec/executor/time_sharing/time_sharing_task_executor_test.cpp
+++ b/be/test/exec/executor/time_sharing/time_sharing_task_executor_test.cpp
@@ -28,6 +28,7 @@
 #include <random>
 #include <thread>
 
+#include "common/exception.h"
 #include "exec/scan/task_executor/ticker.h"
 #include "exec/scan/task_executor/time_sharing/time_sharing_task_handle.h"
 
@@ -289,13 +290,59 @@ private:
     ListenableFuture<Void> _completion_future {};
 };
 
+class ThrowingSplitRunner : public SplitRunner {
+public:
+    explicit ThrowingSplitRunner(Status status) : _status(std::move(status)) {}
+
+    Status init() override { return Status::OK(); }
+
+    Result<SharedListenableFuture<Void>> process_for(std::chrono::nanoseconds) 
override {
+        throw Exception(_status);
+    }
+
+    void close(const Status& status) override {}
+
+    bool is_finished() override { return false; }
+
+    Status finished_status() override { return _status; }
+
+    std::string get_info() const override { return ""; }
+
+private:
+    Status _status;
+};
+
+class QueueOnlySplitRunner : public SplitRunner {
+public:
+    Status init() override { return Status::OK(); }
+
+    Result<SharedListenableFuture<Void>> process_for(std::chrono::nanoseconds) 
override {
+        _started = true;
+        _finished = true;
+        return SharedListenableFuture<Void>::create_ready();
+    }
+
+    void close(const Status& status) override {}
+
+    bool is_finished() override { return _finished.load(); }
+
+    Status finished_status() override { return Status::OK(); }
+
+    std::string get_info() const override { return "queue_only_split"; }
+
+    bool is_started() const { return _started.load(); }
+
+private:
+    std::atomic<bool> _started {false};
+    std::atomic<bool> _finished {false};
+};
+
 class TimeSharingTaskExecutorTest : public testing::Test {
 protected:
     void SetUp() override {}
 
     void TearDown() override {}
 
-private:
     template <typename Container>
     void assert_split_states(int end_index, const Container& splits) {
         for (int i = 0; i <= end_index; ++i) {
@@ -323,6 +370,58 @@ private:
     }
 };
 
+TEST_F(TimeSharingTaskExecutorTest, test_remove_task_clears_queued_task_count) 
{
+    auto ticker = std::make_shared<TestingTicker>();
+
+    TimeSharingTaskExecutor::ThreadConfig thread_config;
+    thread_config.thread_name = "leak_repro";
+    thread_config.workload_group = "normal";
+    thread_config.max_thread_num = 0;
+    thread_config.min_thread_num = 0;
+    thread_config.max_queue_size = 2;
+    TimeSharingTaskExecutor executor(thread_config, 0, 1, 1, ticker);
+    ASSERT_TRUE(executor.init().ok());
+    ASSERT_TRUE(executor.start().ok());
+
+    try {
+        for (int i = 0; i < thread_config.max_queue_size; ++i) {
+            auto task_handle = TEST_TRY(executor.create_task(
+                    TaskId("removed_task_" + std::to_string(i)), []() { return 
0.0; }, 1,
+                    std::chrono::milliseconds(1), std::optional<int>(1)));
+            auto split = std::make_shared<QueueOnlySplitRunner>();
+
+            auto enqueue_result = executor.enqueue_splits(task_handle, false, 
{split});
+            ASSERT_TRUE(enqueue_result.has_value()) << enqueue_result.error();
+            EXPECT_EQ(executor.waiting_splits_size(), 1);
+
+            ASSERT_TRUE(executor.remove_task(task_handle).ok());
+            EXPECT_FALSE(split->is_started());
+            EXPECT_EQ(executor.waiting_splits_size(), 0);
+            EXPECT_EQ(executor.get_queue_size(), 0);
+        }
+
+        EXPECT_EQ(executor.num_active_threads(), 0);
+        EXPECT_EQ(executor.waiting_splits_size(), 0);
+        EXPECT_EQ(executor.get_queue_size(), 0);
+
+        auto task_handle = TEST_TRY(executor.create_task(
+                TaskId("next_task"), []() { return 0.0; }, 1, 
std::chrono::milliseconds(1),
+                std::optional<int>(1)));
+        auto split = std::make_shared<QueueOnlySplitRunner>();
+
+        auto enqueue_result = executor.enqueue_splits(task_handle, false, 
{split});
+        ASSERT_TRUE(enqueue_result.has_value()) << enqueue_result.error();
+        EXPECT_EQ(executor.waiting_splits_size(), 1);
+        EXPECT_EQ(executor.get_queue_size(), 1);
+
+        static_cast<void>(executor.remove_task(task_handle));
+    } catch (...) {
+        executor.stop();
+        throw;
+    }
+    executor.stop();
+}
+
 TEST_F(TimeSharingTaskExecutorTest, test_tasks_complete) {
     auto ticker = std::make_shared<TestingTicker>();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to