This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 826e832500d [UT](pipeline) Add UT cases for task execution (#49866)
826e832500d is described below
commit 826e832500da6edd0a2255bb8551c5ee5c1b7906
Author: Gabriel <[email protected]>
AuthorDate: Wed Apr 9 14:39:05 2025 +0800
[UT](pipeline) Add UT cases for task execution (#49866)
---
be/src/common/status.h | 4 -
be/src/pipeline/exec/operator.h | 38 +++++-
be/src/pipeline/pipeline_task.cpp | 12 +-
be/test/pipeline/pipeline_task_test.cpp | 215 +++++++++++++++++++++++++++++++-
4 files changed, 253 insertions(+), 16 deletions(-)
diff --git a/be/src/common/status.h b/be/src/common/status.h
index 841d5090699..5e1761a5e57 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -274,8 +274,6 @@ namespace ErrorCode {
E(SEGCOMPACTION_INIT_READER, -3117, false); \
E(SEGCOMPACTION_INIT_WRITER, -3118, false); \
E(SEGCOMPACTION_FAILED, -3119, false); \
- E(PIP_WAIT_FOR_RF, -3120, false); \
- E(PIP_WAIT_FOR_SC, -3121, false); \
E(ROWSET_ADD_TO_BINLOG_FAILED, -3122, true); \
E(ROWSET_BINLOG_NOT_ONLY_ONE_VERSION, -3123, true); \
E(INVERTED_INDEX_INVALID_PARAMETERS, -6000, false); \
@@ -492,8 +490,6 @@ public:
ERROR_CTOR(NotSupported, NOT_IMPLEMENTED_ERROR)
ERROR_CTOR_NOSTACK(EndOfFile, END_OF_FILE)
ERROR_CTOR(InternalError, INTERNAL_ERROR)
- ERROR_CTOR_NOSTACK(WaitForRf, PIP_WAIT_FOR_RF)
- ERROR_CTOR_NOSTACK(WaitForScannerContext, PIP_WAIT_FOR_SC)
ERROR_CTOR(RuntimeError, RUNTIME_ERROR)
ERROR_CTOR_NOSTACK(Cancelled, CANCELLED)
ERROR_CTOR(MemoryLimitExceeded, MEM_LIMIT_EXCEEDED)
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 0db9ee635e9..3062d0fc623 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -1094,8 +1094,24 @@ public:
ENABLE_FACTORY_CREATOR(DummyOperatorLocalState);
DummyOperatorLocalState(RuntimeState* state, OperatorXBase* parent)
- : PipelineXLocalState<FakeSharedState>(state, parent) {}
+ : PipelineXLocalState<FakeSharedState>(state, parent) {
+ _tmp_dependency = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
+ "DummyOperatorDependency",
true);
+ _finish_dependency = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
+
"DummyOperatorDependency", true);
+ _filter_dependency = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
+
"DummyOperatorDependency", true);
+ }
+ Dependency* finishdependency() override { return _finish_dependency.get();
}
~DummyOperatorLocalState() = default;
+
+ std::vector<Dependency*> dependencies() const override { return
{_tmp_dependency.get()}; }
+ std::vector<Dependency*> filter_dependencies() override { return
{_filter_dependency.get()}; }
+
+private:
+ std::shared_ptr<Dependency> _tmp_dependency;
+ std::shared_ptr<Dependency> _finish_dependency;
+ std::shared_ptr<Dependency> _filter_dependency;
};
class DummyOperator final : public OperatorX<DummyOperatorLocalState> {
@@ -1108,17 +1124,31 @@ public:
*eos = _eos;
return Status::OK();
}
+ void set_low_memory_mode(RuntimeState* state) override { _low_memory_mode
= true; }
private:
friend class AssertNumRowsLocalState;
bool _eos = false;
+ bool _low_memory_mode = false;
};
class DummySinkLocalState final : public
PipelineXSinkLocalState<BasicSharedState> {
public:
using Base = PipelineXSinkLocalState<BasicSharedState>;
ENABLE_FACTORY_CREATOR(DummySinkLocalState);
- DummySinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) :
Base(parent, state) {}
+ DummySinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) :
Base(parent, state) {
+ _tmp_dependency = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
+ "DummyOperatorDependency",
true);
+ _finish_dependency = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
+
"DummyOperatorDependency", true);
+ }
+
+ std::vector<Dependency*> dependencies() const override { return
{_tmp_dependency.get()}; }
+ Dependency* finishdependency() override { return _finish_dependency.get();
}
+
+private:
+ std::shared_ptr<Dependency> _tmp_dependency;
+ std::shared_ptr<Dependency> _finish_dependency;
};
class DummySinkOperatorX final : public DataSinkOperatorX<DummySinkLocalState>
{
@@ -1128,6 +1158,10 @@ public:
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override {
return Status::OK();
}
+ void set_low_memory_mode(RuntimeState* state) override { _low_memory_mode
= true; }
+
+private:
+ bool _low_memory_mode = false;
};
#endif
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 54a271ea553..ffe335a4189 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -226,11 +226,7 @@ Status PipelineTask::_open() {
SCOPED_TIMER(_open_timer);
_dry_run = _sink->should_dry_run(_state);
for (auto& o : _operators) {
- auto* local_state = _state->get_local_state(o->operator_id());
- auto st = local_state->open(_state);
- DCHECK(st.is<ErrorCode::PIP_WAIT_FOR_RF>() ?
!_filter_dependencies.empty() : true)
- << debug_string();
- RETURN_IF_ERROR(st);
+
RETURN_IF_ERROR(_state->get_local_state(o->operator_id())->open(_state));
}
RETURN_IF_ERROR(_state->get_sink_local_state()->open(_state));
RETURN_IF_ERROR(_extract_dependencies());
@@ -334,8 +330,10 @@ void PipelineTask::terminate() {
* @return
*/
Status PipelineTask::execute(bool* done) {
- DCHECK(_exec_state == State::RUNNABLE) << debug_string();
- DCHECK(_blocked_dep == nullptr) << debug_string();
+ if (_exec_state != State::RUNNABLE || _blocked_dep != nullptr)
[[unlikely]] {
+ return Status::InternalError("Pipeline task is not runnable! Task
info: {}",
+ debug_string());
+ }
auto fragment_context = _fragment_context.lock();
DCHECK(fragment_context);
int64_t time_spent = 0;
diff --git a/be/test/pipeline/pipeline_task_test.cpp
b/be/test/pipeline/pipeline_task_test.cpp
index 456a36a6629..ef3ed753ff0 100644
--- a/be/test/pipeline/pipeline_task_test.cpp
+++ b/be/test/pipeline/pipeline_task_test.cpp
@@ -119,7 +119,7 @@ TEST_F(PipelineTaskTest, TEST_CONSTRUCTOR) {
EXPECT_EQ(task->_exec_state, PipelineTask::State::INITED);
}
-TEST_F(PipelineTaskTest, TEST_PREPARE_HAPPY_PATH) {
+TEST_F(PipelineTaskTest, TEST_PREPARE) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
@@ -145,7 +145,6 @@ TEST_F(PipelineTaskTest, TEST_PREPARE_HAPPY_PATH) {
auto task = std::make_shared<PipelineTask>(pip, task_id,
_runtime_state.get(), _context,
profile.get(),
shared_state_map, task_id);
{
- // HAPPY PATH
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
@@ -154,7 +153,7 @@ TEST_F(PipelineTaskTest, TEST_PREPARE_HAPPY_PATH) {
}
}
-TEST_F(PipelineTaskTest, TEST_PREPARE) {
+TEST_F(PipelineTaskTest, TEST_PREPARE_ERROR) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
@@ -189,4 +188,214 @@ TEST_F(PipelineTaskTest, TEST_PREPARE) {
}
}
+TEST_F(PipelineTaskTest, TEST_EXTRACT_DEPENDENCIES_ERROR) {
+ auto num_instances = 1;
+ auto pip_id = 0;
+ auto task_id = 0;
+ auto pip = std::make_shared<Pipeline>(pip_id, num_instances,
num_instances);
+ {
+ OperatorPtr source_op;
+ // 1. create and set the source operator of
multi_cast_data_stream_source for new pipeline
+ source_op.reset(new DummyOperator());
+ EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
+
+ int op_id = 1;
+ int node_id = 2;
+ int dest_id = 3;
+ DataSinkOperatorPtr sink_op;
+ sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
+ EXPECT_TRUE(pip->set_sink(sink_op).ok());
+ }
+ auto profile = std::make_shared<RuntimeProfile>("Pipeline : " +
std::to_string(pip_id));
+ std::map<int,
+ std::pair<std::shared_ptr<BasicSharedState>,
std::vector<std::shared_ptr<Dependency>>>>
+ shared_state_map;
+ auto task = std::make_shared<PipelineTask>(pip, task_id,
_runtime_state.get(), _context,
+ profile.get(),
shared_state_map, task_id);
+ {
+ EXPECT_FALSE(task->_extract_dependencies().ok());
+ EXPECT_TRUE(task->_read_dependencies.empty());
+ EXPECT_TRUE(task->_write_dependencies.empty());
+ EXPECT_TRUE(task->_finish_dependencies.empty());
+ EXPECT_TRUE(task->_spill_dependencies.empty());
+ }
+}
+
+TEST_F(PipelineTaskTest, TEST_OPEN) {
+ auto num_instances = 1;
+ auto pip_id = 0;
+ auto task_id = 0;
+ auto pip = std::make_shared<Pipeline>(pip_id, num_instances,
num_instances);
+ {
+ OperatorPtr source_op;
+ // 1. create and set the source operator of
multi_cast_data_stream_source for new pipeline
+ source_op.reset(new DummyOperator());
+ EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
+
+ int op_id = 1;
+ int node_id = 2;
+ int dest_id = 3;
+ DataSinkOperatorPtr sink_op;
+ sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
+ EXPECT_TRUE(pip->set_sink(sink_op).ok());
+ }
+ auto profile = std::make_shared<RuntimeProfile>("Pipeline : " +
std::to_string(pip_id));
+ std::map<int,
+ std::pair<std::shared_ptr<BasicSharedState>,
std::vector<std::shared_ptr<Dependency>>>>
+ shared_state_map;
+ _runtime_state->resize_op_id_to_local_state(-1);
+ auto task = std::make_shared<PipelineTask>(pip, task_id,
_runtime_state.get(), _context,
+ profile.get(),
shared_state_map, task_id);
+ {
+ std::vector<TScanRangeParams> scan_range;
+ int sender_id = 0;
+ TDataSink tsink;
+ EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
+ EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
+ }
+ {
+ EXPECT_TRUE(task->_open().ok());
+ EXPECT_FALSE(task->_read_dependencies.empty());
+ EXPECT_FALSE(task->_write_dependencies.empty());
+ EXPECT_FALSE(task->_finish_dependencies.empty());
+ EXPECT_TRUE(task->_spill_dependencies.empty());
+ EXPECT_TRUE(task->_opened);
+ }
+}
+
+TEST_F(PipelineTaskTest, TEST_EXECUTE) {
+ auto num_instances = 1;
+ auto pip_id = 0;
+ auto task_id = 0;
+ auto pip = std::make_shared<Pipeline>(pip_id, num_instances,
num_instances);
+ Dependency* read_dep;
+ Dependency* write_dep;
+ Dependency* source_finish_dep;
+ {
+ OperatorPtr source_op;
+ // 1. create and set the source operator of
multi_cast_data_stream_source for new pipeline
+ source_op.reset(new DummyOperator());
+ EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
+
+ int op_id = 1;
+ int node_id = 2;
+ int dest_id = 3;
+ DataSinkOperatorPtr sink_op;
+ sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
+ EXPECT_TRUE(pip->set_sink(sink_op).ok());
+ }
+ auto profile = std::make_shared<RuntimeProfile>("Pipeline : " +
std::to_string(pip_id));
+ std::map<int,
+ std::pair<std::shared_ptr<BasicSharedState>,
std::vector<std::shared_ptr<Dependency>>>>
+ shared_state_map;
+ _runtime_state->resize_op_id_to_local_state(-1);
+ auto task = std::make_shared<PipelineTask>(pip, task_id,
_runtime_state.get(), _context,
+ profile.get(),
shared_state_map, task_id);
+ task->set_task_queue(_task_queue.get());
+ {
+ // `execute` should be called after `prepare`
+ bool done = false;
+ EXPECT_FALSE(task->execute(&done).ok());
+ }
+ {
+ std::vector<TScanRangeParams> scan_range;
+ int sender_id = 0;
+ TDataSink tsink;
+ EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
+ EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
+ EXPECT_FALSE(task->_filter_dependencies.empty());
+ read_dep =
_runtime_state->get_local_state_result(task->_operators.front()->operator_id())
+ .value()
+ ->dependencies()
+ .front();
+ write_dep =
_runtime_state->get_sink_local_state()->dependencies().front();
+ }
+ {
+ // task is blocked by execution dependency.
+ bool done = false;
+ EXPECT_TRUE(task->execute(&done).ok());
+ EXPECT_FALSE(task->_eos);
+ EXPECT_FALSE(done);
+ EXPECT_FALSE(task->_wake_up_early);
+ EXPECT_FALSE(task->_opened);
+ EXPECT_FALSE(_query_ctx->get_execution_dependency()->ready());
+
EXPECT_FALSE(_query_ctx->get_execution_dependency()->_blocked_task.empty());
+ EXPECT_EQ(task->_exec_state, PipelineTask::State::BLOCKED);
+ }
+ {
+ // task is blocked by filter dependency.
+ _query_ctx->get_execution_dependency()->set_ready();
+ task->_filter_dependencies.front()->block();
+ EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
+ bool done = false;
+ EXPECT_TRUE(task->execute(&done).ok());
+ EXPECT_FALSE(task->_eos);
+ EXPECT_FALSE(done);
+ EXPECT_FALSE(task->_wake_up_early);
+ EXPECT_FALSE(task->_opened);
+ EXPECT_FALSE(task->_filter_dependencies.front()->ready());
+
EXPECT_FALSE(task->_filter_dependencies.front()->_blocked_task.empty());
+ EXPECT_TRUE(task->_read_dependencies.empty());
+ EXPECT_TRUE(task->_write_dependencies.empty());
+ EXPECT_TRUE(task->_finish_dependencies.empty());
+ EXPECT_TRUE(task->_spill_dependencies.empty());
+ EXPECT_EQ(task->_exec_state, PipelineTask::State::BLOCKED);
+ }
+ {
+ // `open` phase. And then task is blocked by read dependency.
+ task->_filter_dependencies.front()->set_ready();
+ read_dep->block();
+ EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
+ bool done = false;
+ EXPECT_TRUE(task->execute(&done).ok());
+ EXPECT_FALSE(task->_eos);
+ EXPECT_FALSE(done);
+ EXPECT_FALSE(task->_wake_up_early);
+ EXPECT_FALSE(task->_read_dependencies.empty());
+ EXPECT_FALSE(task->_write_dependencies.empty());
+ EXPECT_FALSE(task->_finish_dependencies.empty());
+ EXPECT_TRUE(task->_spill_dependencies.empty());
+ EXPECT_TRUE(task->_opened);
+ EXPECT_FALSE(read_dep->ready());
+ EXPECT_TRUE(write_dep->ready());
+ EXPECT_FALSE(read_dep->_blocked_task.empty());
+ source_finish_dep =
+
_runtime_state->get_local_state_result(task->_operators.front()->operator_id())
+ .value()
+ ->finishdependency();
+ EXPECT_EQ(task->_exec_state, PipelineTask::State::BLOCKED);
+ }
+ {
+ // `execute` phase. And then task is blocked by finish dependency.
+ read_dep->set_ready();
+ source_finish_dep->block();
+ EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
+ task->_operators.front()->cast<DummyOperator>()._eos = true;
+ bool done = false;
+ EXPECT_TRUE(task->execute(&done).ok());
+ EXPECT_TRUE(task->_eos);
+ EXPECT_FALSE(done);
+ EXPECT_FALSE(task->_wake_up_early);
+ EXPECT_FALSE(source_finish_dep->ready());
+ EXPECT_FALSE(source_finish_dep->_blocked_task.empty());
+ EXPECT_EQ(task->_exec_state, PipelineTask::State::BLOCKED);
+ }
+ {
+ // `execute` phase.
+ source_finish_dep->set_ready();
+ bool done = false;
+ EXPECT_TRUE(task->execute(&done).ok());
+ EXPECT_TRUE(task->_eos);
+ EXPECT_TRUE(done);
+ EXPECT_FALSE(task->_wake_up_early);
+ EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
+ }
+ {
+ EXPECT_TRUE(task->close(Status::OK()).ok());
+ EXPECT_EQ(task->_exec_state, PipelineTask::State::FINISHED);
+ EXPECT_TRUE(task->finalize().ok());
+ EXPECT_EQ(task->_exec_state, PipelineTask::State::FINALIZED);
+ }
+}
+
} // namespace doris::pipeline
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]