This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a5bf8a1ded1 [Bug](pipeline) make sink operator process eos signals
after wake_up_early (#45207)
a5bf8a1ded1 is described below
commit a5bf8a1ded1477b3e1b3fb1b21ecffbcb81d4b36
Author: Pxl <[email protected]>
AuthorDate: Tue Dec 17 10:35:58 2024 +0800
[Bug](pipeline) make sink operator process eos signals after wake_up_early
(#45207)
1. make sink operator process eos signals after wake_up_early
2. set wake_up_early when `pipeline task meet
wake_up_by_downstream`/`sink reach limit`/`sink get eof status`
3. close non-sink operators after sink meet eos
---
be/src/exprs/runtime_filter.cpp | 6 +-
be/src/pipeline/exec/hashjoin_build_sink.cpp | 50 ++++------
be/src/pipeline/exec/hashjoin_probe_operator.cpp | 2 +-
be/src/pipeline/pipeline.cpp | 2 +-
be/src/pipeline/pipeline_task.cpp | 116 +++++++++++------------
be/src/pipeline/pipeline_task.h | 8 +-
6 files changed, 87 insertions(+), 97 deletions(-)
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 8f297d7074f..d1567a8fa79 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -498,12 +498,12 @@ public:
switch (_filter_type) {
case RuntimeFilterType::IN_FILTER: {
if (!_context->hybrid_set) {
- _context->ignored = true;
+ set_ignored();
return Status::OK();
}
_context->hybrid_set->insert(wrapper->_context->hybrid_set.get());
if (_max_in_num >= 0 && _context->hybrid_set->size() >=
_max_in_num) {
- _context->ignored = true;
+ set_ignored();
// release in filter
_context->hybrid_set.reset();
}
@@ -1337,7 +1337,7 @@ void IRuntimeFilter::set_synced_size(uint64_t
global_size) {
}
void IRuntimeFilter::set_ignored() {
- _wrapper->_context->ignored = true;
+ _wrapper->set_ignored();
}
bool IRuntimeFilter::get_ignored() {
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index b2a79a941f7..19e8493e596 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -135,26 +135,16 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState*
state, Status exec_statu
}
}};
- if (!_runtime_filter_slots || _runtime_filters.empty() ||
state->is_cancelled()) {
+ if (!_runtime_filter_slots || _runtime_filters.empty() ||
state->is_cancelled() || !_eos) {
return Base::close(state, exec_status);
}
try {
- if (state->get_task()->wake_up_by_downstream()) {
- if (_should_build_hash_table) {
- // partitial ignore rf to make global rf work
- RETURN_IF_ERROR(
- _runtime_filter_slots->send_filter_size(state, 0,
_finish_dependency));
- RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
- } else {
- // do not publish filter coz local rf not inited and useless
- return Base::close(state, exec_status);
- }
+ if (state->get_task()->wake_up_early()) {
+ // partitial ignore rf to make global rf work or ignore useless rf
+ RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0,
_finish_dependency));
+ RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
} else if (_should_build_hash_table) {
- if (p._shared_hashtable_controller &&
- !p._shared_hash_table_context->complete_build_stage) {
- return Status::InternalError("close before sink meet eos");
- }
auto* block = _shared_state->build_block.get();
uint64_t hash_table_size = block ? block->rows() : 0;
{
@@ -166,26 +156,25 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState*
state, Status exec_statu
SCOPED_TIMER(_runtime_filter_compute_timer);
_runtime_filter_slots->insert(block);
}
- } else if ((p._shared_hashtable_controller &&
!p._shared_hash_table_context->signaled) ||
- (p._shared_hash_table_context &&
- !p._shared_hash_table_context->complete_build_stage)) {
- throw Exception(ErrorCode::INTERNAL_ERROR, "build_sink::close meet
error state");
- } else {
- RETURN_IF_ERROR(
-
_runtime_filter_slots->copy_from_shared_context(p._shared_hash_table_context));
}
SCOPED_TIMER(_publish_runtime_filter_timer);
RETURN_IF_ERROR(_runtime_filter_slots->publish(state,
!_should_build_hash_table));
} catch (Exception& e) {
+ bool blocked_by_complete_build_stage = p._shared_hashtable_controller
&&
+
!p._shared_hash_table_context->complete_build_stage;
+ bool blocked_by_shared_hash_table_signal = !_should_build_hash_table &&
+
p._shared_hashtable_controller &&
+
!p._shared_hash_table_context->signaled;
+
return Status::InternalError(
- "rf process meet error: {}, wake_up_by_downstream: {},
should_build_hash_table: "
- "{}, _finish_dependency: {}, complete_build_stage: {},
shared_hash_table_signaled: "
+ "rf process meet error: {}, wake_up_early: {},
should_build_hash_table: "
+ "{}, _finish_dependency: {}, blocked_by_complete_build_stage:
{}, "
+ "blocked_by_shared_hash_table_signal: "
"{}",
- e.to_string(), state->get_task()->wake_up_by_downstream(),
_should_build_hash_table,
- _finish_dependency->debug_string(),
- p._shared_hash_table_context &&
!p._shared_hash_table_context->complete_build_stage,
- p._shared_hashtable_controller &&
!p._shared_hash_table_context->signaled);
+ e.to_string(), state->get_task()->wake_up_early(),
_should_build_hash_table,
+ _finish_dependency->debug_string(),
blocked_by_complete_build_stage,
+ blocked_by_shared_hash_table_signal);
}
return Base::close(state, exec_status);
}
@@ -479,7 +468,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
- local_state._eos = eos;
if (local_state._should_build_hash_table) {
// If eos or have already met a null value using short-circuit
strategy, we do not need to pull
// data from probe side.
@@ -556,6 +544,9 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
return _shared_hash_table_context->status;
}
+
RETURN_IF_ERROR(local_state._runtime_filter_slots->copy_from_shared_context(
+ _shared_hash_table_context));
+
local_state.profile()->add_info_string(
"SharedHashTableFrom",
print_id(
@@ -581,6 +572,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
}
if (eos) {
+ local_state._eos = true;
local_state.init_short_circuit_for_probe();
// Since the comparison of null values is meaningless, null aware left
anti/semi join should not output null
// when the build side is not empty.
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 7c98a4c9f97..0db525f1bf5 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -242,7 +242,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState*
state, vectorized::Bloc
// If we use a short-circuit strategy, should return block directly by
add additional null data.
auto block_rows = local_state._probe_block.rows();
if (local_state._probe_eos && block_rows == 0) {
- *eos = local_state._probe_eos;
+ *eos = true;
return Status::OK();
}
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index e4678b7dcf3..6c39d361e59 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -112,7 +112,7 @@ void Pipeline::make_all_runnable() {
if (_sink->count_down_destination()) {
for (auto* task : _tasks) {
if (task) {
- task->set_wake_up_by_downstream();
+ task->set_wake_up_early();
}
}
for (auto* task : _tasks) {
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 6814881ac7a..5ed725010ec 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -22,6 +22,7 @@
#include <glog/logging.h>
#include <stddef.h>
+#include <algorithm>
#include <ostream>
#include <vector>
@@ -223,9 +224,6 @@ bool PipelineTask::_wait_to_start() {
_blocked_dep = _execution_dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
static_cast<Dependency*>(_blocked_dep)->start_watcher();
- if (_wake_up_by_downstream) {
- _eos = true;
- }
return true;
}
@@ -233,9 +231,6 @@ bool PipelineTask::_wait_to_start() {
_blocked_dep = op_dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
- if (_wake_up_by_downstream) {
- _eos = true;
- }
return true;
}
}
@@ -257,9 +252,6 @@ bool PipelineTask::_is_blocked() {
_blocked_dep = dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
- if (_wake_up_by_downstream) {
- _eos = true;
- }
return true;
}
}
@@ -279,9 +271,6 @@ bool PipelineTask::_is_blocked() {
_blocked_dep = op_dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
- if (_wake_up_by_downstream) {
- _eos = true;
- }
return true;
}
}
@@ -289,15 +278,15 @@ bool PipelineTask::_is_blocked() {
}
Status PipelineTask::execute(bool* eos) {
- SCOPED_TIMER(_task_profile->total_time_counter());
- SCOPED_TIMER(_exec_timer);
- SCOPED_ATTACH_TASK(_state);
- _eos = _sink->is_finished(_state) || _eos || _wake_up_by_downstream;
- *eos = _eos;
if (_eos) {
- // If task is waken up by finish dependency, `_eos` is set to true by
last execution, and we should return here.
+ *eos = true;
return Status::OK();
}
+
+ SCOPED_TIMER(_task_profile->total_time_counter());
+ SCOPED_TIMER(_exec_timer);
+ SCOPED_ATTACH_TASK(_state);
+
int64_t time_spent = 0;
DBUG_EXECUTE_IF("fault_inject::PipelineXTask::execute", {
Status status = Status::Error<INTERNAL_ERROR>("fault_inject
pipeline_task execute failed");
@@ -320,27 +309,31 @@ Status PipelineTask::execute(bool* eos) {
if (_wait_to_start()) {
return Status::OK();
}
- if (_wake_up_by_downstream) {
- _eos = true;
- *eos = true;
- return Status::OK();
- }
+
// The status must be runnable
if (!_opened && !_fragment_context->is_canceled()) {
+ if (_wake_up_early) {
+ *eos = true;
+ _eos = true;
+ return Status::OK();
+ }
RETURN_IF_ERROR(_open());
}
+ auto set_wake_up_and_dep_ready = [&]() {
+ if (wake_up_early()) {
+ return;
+ }
+ set_wake_up_early();
+ clear_blocking_state();
+ };
+
_task_profile->add_info_string("TaskState", "Runnable");
_task_profile->add_info_string("BlockedByDependency", "");
while (!_fragment_context->is_canceled()) {
if (_is_blocked()) {
return Status::OK();
}
- if (_wake_up_by_downstream) {
- _eos = true;
- *eos = true;
- return Status::OK();
- }
/// When a task is cancelled,
/// its blocking state will be cleared and it will transition to a
ready state (though it is not truly ready).
@@ -361,47 +354,47 @@ Status PipelineTask::execute(bool* eos) {
RETURN_IF_ERROR(_sink->revoke_memory(_state));
continue;
}
- *eos = _eos;
DBUG_EXECUTE_IF("fault_inject::PipelineXTask::executing", {
Status status =
Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task
executing failed");
return status;
});
- // `_dry_run` means sink operator need no more data
// `_sink->is_finished(_state)` means sink operator should be finished
- if (_dry_run || _sink->is_finished(_state)) {
- *eos = true;
- _eos = true;
- } else {
+ if (_sink->is_finished(_state)) {
+ set_wake_up_and_dep_ready();
+ }
+
+ // `_dry_run` means sink operator need no more data
+ *eos = wake_up_early() || _dry_run;
+ if (!*eos) {
SCOPED_TIMER(_get_block_timer);
_get_block_counter->update(1);
RETURN_IF_ERROR(_root->get_block_after_projects(_state, block,
eos));
}
+ if (*eos) {
+ RETURN_IF_ERROR(close(Status::OK(), false));
+ }
+
if (_block->rows() != 0 || *eos) {
SCOPED_TIMER(_sink_timer);
- Status status = Status::OK();
- // Define a lambda function to catch sink exception, because sink
will check
- // return error status with EOF, it is special, could not return
directly.
- auto sink_function = [&]() -> Status {
- Status internal_st;
- internal_st = _sink->sink(_state, block, *eos);
- return internal_st;
- };
- status = sink_function();
- if (!status.is<ErrorCode::END_OF_FILE>()) {
- RETURN_IF_ERROR(status);
+ Status status = _sink->sink(_state, block, *eos);
+
+ if (status.is<ErrorCode::END_OF_FILE>()) {
+ set_wake_up_and_dep_ready();
+ } else if (!status) {
+ return status;
}
- *eos = status.is<ErrorCode::END_OF_FILE>() ? true : *eos;
+
if (*eos) { // just return, the scheduler will do finish work
- _eos = true;
_task_profile->add_info_string("TaskState", "Finished");
+ _eos = true;
return Status::OK();
}
}
}
- static_cast<void>(get_task_queue()->push_back(this));
+ RETURN_IF_ERROR(get_task_queue()->push_back(this));
return Status::OK();
}
@@ -470,17 +463,14 @@ void PipelineTask::finalize() {
_le_state_map.clear();
}
-Status PipelineTask::close(Status exec_status) {
+Status PipelineTask::close(Status exec_status, bool close_sink) {
int64_t close_ns = 0;
- Defer defer {[&]() {
- if (_task_queue) {
- _task_queue->update_statistics(this, close_ns);
- }
- }};
Status s;
{
SCOPED_RAW_TIMER(&close_ns);
- s = _sink->close(_state, exec_status);
+ if (close_sink) {
+ s = _sink->close(_state, exec_status);
+ }
for (auto& op : _operators) {
auto tem = op->close(_state);
if (!tem.ok() && s.ok()) {
@@ -489,10 +479,18 @@ Status PipelineTask::close(Status exec_status) {
}
}
if (_opened) {
- _fresh_profile_counter();
- COUNTER_SET(_close_timer, close_ns);
+ COUNTER_UPDATE(_close_timer, close_ns);
COUNTER_UPDATE(_task_profile->total_time_counter(), close_ns);
}
+
+ if (close_sink && _opened) {
+ _task_profile->add_info_string("WakeUpEarly", wake_up_early() ? "true"
: "false");
+ _fresh_profile_counter();
+ }
+
+ if (_task_queue) {
+ _task_queue->update_statistics(this, close_ns);
+ }
return s;
}
@@ -508,10 +506,10 @@ std::string PipelineTask::debug_string() {
auto elapsed = _fragment_context->elapsed_time() / 1000000000.0;
fmt::format_to(debug_string_buffer,
"PipelineTask[this = {}, id = {}, open = {}, eos = {},
finish = {}, dry run = "
- "{}, elapse time = {}s, _wake_up_by_downstream = {}], block
dependency = {}, is "
+ "{}, elapse time = {}s, _wake_up_early = {}], block
dependency = {}, is "
"running = {}\noperators: ",
(void*)this, _index, _opened, _eos, _finalized, _dry_run,
elapsed,
- _wake_up_by_downstream.load(),
+ _wake_up_early.load(),
cur_blocked_dep && !_finalized ?
cur_blocked_dep->debug_string() : "NULL",
is_running());
for (size_t i = 0; i < _operators.size(); i++) {
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 4bb062122c0..1a31e5954f4 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -61,7 +61,7 @@ public:
// if the pipeline create a bunch of pipeline task
// must be call after all pipeline task is finish to release resource
- Status close(Status exec_status);
+ Status close(Status exec_status, bool close_sink = true);
PipelineFragmentContext* fragment_context() { return _fragment_context; }
@@ -135,7 +135,7 @@ public:
int task_id() const { return _index; };
bool is_finalized() const { return _finalized; }
- void set_wake_up_by_downstream() { _wake_up_by_downstream = true; }
+ void set_wake_up_early() { _wake_up_early = true; }
void clear_blocking_state() {
_state->get_query_ctx()->get_execution_dependency()->set_always_ready();
@@ -237,7 +237,7 @@ public:
PipelineId pipeline_id() const { return _pipeline->id(); }
- bool wake_up_by_downstream() const { return _wake_up_by_downstream; }
+ bool wake_up_early() const { return _wake_up_early; }
private:
friend class RuntimeFilterDependency;
@@ -319,7 +319,7 @@ private:
std::atomic<bool> _running = false;
std::atomic<bool> _eos = false;
- std::atomic<bool> _wake_up_by_downstream = false;
+ std::atomic<bool> _wake_up_early = false;
};
} // namespace doris::pipeline
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]