This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 1f197f6c931 [fix](be) Fix time-sharing executor queued task count
(#64855)
1f197f6c931 is described below
commit 1f197f6c931217f546463371865c4d756b1a6058
Author: Raiden <[email protected]>
AuthorDate: Fri Jun 26 18:47:36 2026 +0800
[fix](be) Fix time-sharing executor queued task count (#64855)
Keep the executor queued-task metric consistent when queued splits are
removed before execution. The fix routes queue offer/remove operations
through helpers that update _total_queued_tasks together with the split
queue and token state.
Add a regression test that enqueues splits with no worker threads,
removes their tasks, and verifies the queue count returns to zero so
later submissions are not rejected as full.
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
.../time_sharing/time_sharing_task_executor.cpp | 44 +++++++--
.../time_sharing/time_sharing_task_executor.h | 9 ++
.../time_sharing_task_executor_test.cpp | 101 ++++++++++++++++++++-
3 files changed, 146 insertions(+), 8 deletions(-)
diff --git
a/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp
b/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp
index 5f22ba2e6f2..494568785e6 100644
--- a/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp
+++ b/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp
@@ -277,7 +277,7 @@ TimeSharingTaskExecutor::~TimeSharingTaskExecutor() {
}
{
std::unique_lock<std::mutex> l(_lock);
- _tokenless->_entries->remove_all(splits_to_destroy);
+ _remove_queued_splits_unlocked(splits_to_destroy);
}
}
@@ -422,7 +422,7 @@ Status
TimeSharingTaskExecutor::_do_submit(std::shared_ptr<PrioritizedSplitRunne
DCHECK(state == SplitThreadPoolToken::State::IDLE ||
state == SplitThreadPoolToken::State::RUNNING);
split->submit_time_watch().start();
- _tokenless->_entries->offer(std::move(split));
+ _offer_split_unlocked(std::move(split));
if (state == SplitThreadPoolToken::State::IDLE) {
_tokenless->transition(SplitThreadPoolToken::State::RUNNING);
}
@@ -434,8 +434,6 @@ Status
TimeSharingTaskExecutor::_do_submit(std::shared_ptr<PrioritizedSplitRunne
// 1. If it is a SERIAL token, and there are unsubmitted tasks, submit
them to the queue.
// 2. If it is a CONCURRENT token, and there are still unsubmitted
tasks, and the upper limit of concurrency is not reached,
// then submitted to the queue.
- _total_queued_tasks++;
-
// Wake up an idle thread for this task. Choosing the thread at the front
of
// the list ensures LIFO semantics as idling threads are also added to the
front.
//
@@ -571,7 +569,8 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
lock.unlock();
l.lock();
if (_tokenless->state() ==
SplitThreadPoolToken::State::RUNNING) {
- _tokenless->_entries->offer(split);
+ split->submit_time_watch().reset();
+ _offer_split_unlocked(split);
}
l.unlock();
} else {
@@ -587,7 +586,8 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
split->reset_level_priority();
std::unique_lock<std::mutex> l(_lock);
if (_tokenless->state() ==
SplitThreadPoolToken::State::RUNNING) {
- _tokenless->_entries->offer(split);
+ split->submit_time_watch().reset();
+ _offer_split_unlocked(split);
}
} else {
LOG(WARNING) << "blocked split is failed,
split_id: "
@@ -771,7 +771,7 @@ Status
TimeSharingTaskExecutor::remove_task(std::shared_ptr<TaskHandle> task_han
}
{
std::unique_lock<std::mutex> l(_lock);
- _tokenless->_entries->remove_all(splits_to_destroy);
+ _remove_queued_splits_unlocked(splits_to_destroy);
}
}
@@ -847,6 +847,36 @@ Status
TimeSharingTaskExecutor::re_enqueue_split(std::shared_ptr<TaskHandle> tas
return _do_submit(prioritized_split);
}
+void
TimeSharingTaskExecutor::_offer_split_unlocked(std::shared_ptr<PrioritizedSplitRunner>
split) {
+ _tokenless->_entries->offer(std::move(split));
+ ++_total_queued_tasks;
+}
+
+void TimeSharingTaskExecutor::_remove_queued_splits_unlocked(
+ const std::vector<std::shared_ptr<PrioritizedSplitRunner>>& splits) {
+ if (splits.empty()) {
+ return;
+ }
+
+ const size_t queue_size_before = _tokenless->_entries->size();
+ _tokenless->_entries->remove_all(splits);
+ const size_t queue_size_after = _tokenless->_entries->size();
+ DCHECK_GE(queue_size_before, queue_size_after);
+
+ const auto removed = static_cast<int>(queue_size_before -
queue_size_after);
+ DCHECK_GE(_total_queued_tasks, removed);
+ _total_queued_tasks -= removed;
+
+ if (_tokenless->state() == SplitThreadPoolToken::State::RUNNING &&
+ _tokenless->_active_threads == 0 && _tokenless->_entries->size() == 0)
{
+ _tokenless->transition(SplitThreadPoolToken::State::IDLE);
+ }
+
+ if (_total_queued_tasks == 0 && _active_threads == 0) {
+ _idle_cond.notify_all();
+ }
+}
+
void
TimeSharingTaskExecutor::_split_finished(std::shared_ptr<PrioritizedSplitRunner>
split,
const Status& status) {
_completed_splits_per_level[split->priority().level()]++;
diff --git
a/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.h
b/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.h
index ba38ddb04da..13a42a1385c 100644
--- a/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.h
+++ b/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.h
@@ -294,6 +294,15 @@ private:
// // Submits a task to be run via token.
Status _do_submit(std::shared_ptr<PrioritizedSplitRunner> split);
+ // Offer a split to the executor queue and keep _total_queued_tasks
consistent.
+ // REQUIRES: _lock is held.
+ void _offer_split_unlocked(std::shared_ptr<PrioritizedSplitRunner> split);
+
+ // Remove queued splits and keep _total_queued_tasks/token state
consistent.
+ // REQUIRES: _lock is held.
+ void _remove_queued_splits_unlocked(
+ const std::vector<std::shared_ptr<PrioritizedSplitRunner>>&
splits);
+
//NOTE: not thread safe, caller should keep it thread-safe by using lock
Status _try_create_thread(int thread_num, std::lock_guard<std::mutex>&);
diff --git
a/be/test/exec/executor/time_sharing/time_sharing_task_executor_test.cpp
b/be/test/exec/executor/time_sharing/time_sharing_task_executor_test.cpp
index 0459121408e..ede32923d41 100644
--- a/be/test/exec/executor/time_sharing/time_sharing_task_executor_test.cpp
+++ b/be/test/exec/executor/time_sharing/time_sharing_task_executor_test.cpp
@@ -28,6 +28,7 @@
#include <random>
#include <thread>
+#include "common/exception.h"
#include "exec/scan/task_executor/ticker.h"
#include "exec/scan/task_executor/time_sharing/time_sharing_task_handle.h"
@@ -289,13 +290,59 @@ private:
ListenableFuture<Void> _completion_future {};
};
+class ThrowingSplitRunner : public SplitRunner {
+public:
+ explicit ThrowingSplitRunner(Status status) : _status(std::move(status)) {}
+
+ Status init() override { return Status::OK(); }
+
+ Result<SharedListenableFuture<Void>> process_for(std::chrono::nanoseconds)
override {
+ throw Exception(_status);
+ }
+
+ void close(const Status& status) override {}
+
+ bool is_finished() override { return false; }
+
+ Status finished_status() override { return _status; }
+
+ std::string get_info() const override { return ""; }
+
+private:
+ Status _status;
+};
+
+class QueueOnlySplitRunner : public SplitRunner {
+public:
+ Status init() override { return Status::OK(); }
+
+ Result<SharedListenableFuture<Void>> process_for(std::chrono::nanoseconds)
override {
+ _started = true;
+ _finished = true;
+ return SharedListenableFuture<Void>::create_ready();
+ }
+
+ void close(const Status& status) override {}
+
+ bool is_finished() override { return _finished.load(); }
+
+ Status finished_status() override { return Status::OK(); }
+
+ std::string get_info() const override { return "queue_only_split"; }
+
+ bool is_started() const { return _started.load(); }
+
+private:
+ std::atomic<bool> _started {false};
+ std::atomic<bool> _finished {false};
+};
+
class TimeSharingTaskExecutorTest : public testing::Test {
protected:
void SetUp() override {}
void TearDown() override {}
-private:
template <typename Container>
void assert_split_states(int end_index, const Container& splits) {
for (int i = 0; i <= end_index; ++i) {
@@ -323,6 +370,58 @@ private:
}
};
+TEST_F(TimeSharingTaskExecutorTest, test_remove_task_clears_queued_task_count)
{
+ auto ticker = std::make_shared<TestingTicker>();
+
+ TimeSharingTaskExecutor::ThreadConfig thread_config;
+ thread_config.thread_name = "leak_repro";
+ thread_config.workload_group = "normal";
+ thread_config.max_thread_num = 0;
+ thread_config.min_thread_num = 0;
+ thread_config.max_queue_size = 2;
+ TimeSharingTaskExecutor executor(thread_config, 0, 1, 1, ticker);
+ ASSERT_TRUE(executor.init().ok());
+ ASSERT_TRUE(executor.start().ok());
+
+ try {
+ for (int i = 0; i < thread_config.max_queue_size; ++i) {
+ auto task_handle = TEST_TRY(executor.create_task(
+ TaskId("removed_task_" + std::to_string(i)), []() { return
0.0; }, 1,
+ std::chrono::milliseconds(1), std::optional<int>(1)));
+ auto split = std::make_shared<QueueOnlySplitRunner>();
+
+ auto enqueue_result = executor.enqueue_splits(task_handle, false,
{split});
+ ASSERT_TRUE(enqueue_result.has_value()) << enqueue_result.error();
+ EXPECT_EQ(executor.waiting_splits_size(), 1);
+
+ ASSERT_TRUE(executor.remove_task(task_handle).ok());
+ EXPECT_FALSE(split->is_started());
+ EXPECT_EQ(executor.waiting_splits_size(), 0);
+ EXPECT_EQ(executor.get_queue_size(), 0);
+ }
+
+ EXPECT_EQ(executor.num_active_threads(), 0);
+ EXPECT_EQ(executor.waiting_splits_size(), 0);
+ EXPECT_EQ(executor.get_queue_size(), 0);
+
+ auto task_handle = TEST_TRY(executor.create_task(
+ TaskId("next_task"), []() { return 0.0; }, 1,
std::chrono::milliseconds(1),
+ std::optional<int>(1)));
+ auto split = std::make_shared<QueueOnlySplitRunner>();
+
+ auto enqueue_result = executor.enqueue_splits(task_handle, false,
{split});
+ ASSERT_TRUE(enqueue_result.has_value()) << enqueue_result.error();
+ EXPECT_EQ(executor.waiting_splits_size(), 1);
+ EXPECT_EQ(executor.get_queue_size(), 1);
+
+ static_cast<void>(executor.remove_task(task_handle));
+ } catch (...) {
+ executor.stop();
+ throw;
+ }
+ executor.stop();
+}
+
TEST_F(TimeSharingTaskExecutorTest, test_tasks_complete) {
auto ticker = std::make_shared<TestingTicker>();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]