This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 725811e1d1 [Improvement](pipeline) Terminate early for short-circuit
join (#23378) (#23396)
725811e1d1 is described below
commit 725811e1d15fcee0bed6accf08b5ac55e1813822
Author: Gabriel <[email protected]>
AuthorDate: Thu Aug 24 12:32:12 2023 +0800
[Improvement](pipeline) Terminate early for short-circuit join (#23378)
(#23396)
---
be/src/exec/exec_node.h | 2 ++
be/src/pipeline/exec/operator.h | 4 ++++
be/src/pipeline/pipeline_task.cpp | 8 +++----
be/src/pipeline/pipeline_task.h | 41 ++++++++++++++++++++++++++++++++--
be/src/vec/exec/join/vjoin_node_base.h | 2 ++
5 files changed, 51 insertions(+), 6 deletions(-)
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index ad7eb83074..ae7b40dc20 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -134,6 +134,8 @@ public:
bool can_read() const { return _can_read; }
+ [[nodiscard]] virtual bool can_terminate_early() { return false; }
+
// Sink Data to ExecNode to do some stock work, both need impl with
method: get_result
// `eos` means source is exhausted, exec node should do some finalize work
// Eg: Aggregation, Sort
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index acf55cb7bc..38ed45ed89 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -199,6 +199,8 @@ public:
virtual bool can_write() { return false; } // for sink
+ [[nodiscard]] virtual bool can_terminate_early() { return false; }
+
/**
* The main method to execute a pipeline task.
* Now it is a pull-based pipeline and operators pull data from its child
by this method.
@@ -321,6 +323,8 @@ public:
~StreamingOperator() override = default;
+ [[nodiscard]] bool can_terminate_early() override { return
_node->can_terminate_early(); }
+
Status prepare(RuntimeState* state) override {
_node->increase_ref();
_use_projection = _node->has_output_row_descriptor();
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 4d0fb49de8..645028a3dc 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -222,11 +222,11 @@ Status PipelineTask::execute(bool* eos) {
set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY);
return Status::OK();
}
- if (!_source->can_read()) {
+ if (!source_can_read()) {
set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
return Status::OK();
}
- if (!_sink->can_write()) {
+ if (!sink_can_write()) {
set_state(PipelineTaskState::BLOCKED_FOR_SINK);
return Status::OK();
}
@@ -234,11 +234,11 @@ Status PipelineTask::execute(bool* eos) {
this->set_begin_execute_time();
while (!_fragment_context->is_canceled()) {
- if (_data_state != SourceState::MORE_DATA && !_source->can_read()) {
+ if (_data_state != SourceState::MORE_DATA && !source_can_read()) {
set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
break;
}
- if (!_sink->can_write()) {
+ if (!sink_can_write()) {
set_state(PipelineTaskState::BLOCKED_FOR_SINK);
break;
}
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 62de1fd281..34382a3f7c 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -152,13 +152,50 @@ public:
return false;
}
- bool source_can_read() { return _source->can_read(); }
+ bool source_can_read() {
+ return _source->can_read() || ignore_blocking_source();
+ ;
+ }
bool runtime_filters_are_ready_or_timeout() {
return _source->runtime_filters_are_ready_or_timeout();
}
- bool sink_can_write() { return _sink->can_write(); }
+ bool sink_can_write() { return _sink->can_write() ||
ignore_blocking_sink(); }
+ /**
+ * Consider the query plan below:
+ *
+ * ExchangeSource JoinBuild1
+ * \ /
+ * JoinProbe1 (Right Outer) JoinBuild2
+ * \ /
+ * JoinProbe2 (Right Outer)
+ * |
+ * Sink
+ *
+ * Assume JoinBuild1/JoinBuild2 outputs 0 rows, this pipeline task should
not be blocked by ExchangeSource
+ * because we have a determined conclusion that JoinProbe1/JoinProbe2 will
also output 0 rows.
+ *
+ * Assume JoinBuild2 outputs > 0 rows, this pipeline task may be blocked
by Sink because JoinProbe2 will
+ * produce more data.
+ *
+ * Assume both JoinBuild2 outputs 0 rows this pipeline task should not be
blocked by ExchangeSource
+ * and Sink because JoinProbe2 will always produce 0 rows and terminate
early.
+ *
+ * In a nutshell, we should follow the rules:
+ * 1. if any operator in pipeline can terminate early, this task should
never be blocked by source operator.
+ * 2. if the last operator (except sink) can terminate early, this task
should never be blocked by sink operator.
+ */
+ [[nodiscard]] bool ignore_blocking_sink() { return
_root->can_terminate_early(); }
+
+ [[nodiscard]] bool ignore_blocking_source() {
+ for (size_t i = 1; i < _operators.size(); i++) {
+ if (_operators[i]->can_terminate_early()) {
+ return true;
+ }
+ }
+ return false;
+ }
Status finalize();
diff --git a/be/src/vec/exec/join/vjoin_node_base.h
b/be/src/vec/exec/join/vjoin_node_base.h
index 120e77785e..8756c24d20 100644
--- a/be/src/vec/exec/join/vjoin_node_base.h
+++ b/be/src/vec/exec/join/vjoin_node_base.h
@@ -74,6 +74,8 @@ public:
virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr)
override;
+ [[nodiscard]] bool can_terminate_early() override { return
_short_circuit_for_probe; }
+
protected:
// Construct the intermediate blocks to store the results from join
operation.
void _construct_mutable_join_block();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]