This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7b817197b2b [pipelineX](sink) simplify result sink dependency (#27226)
7b817197b2b is described below
commit 7b817197b2b8a9d6f7b3aac9c7724c8bae03e956
Author: Gabriel <[email protected]>
AuthorDate: Sun Nov 19 12:42:10 2023 +0800
[pipelineX](sink) simplify result sink dependency (#27226)
---
be/src/pipeline/exec/result_sink_operator.cpp | 26 +++-------------
be/src/pipeline/exec/result_sink_operator.h | 41 ++++---------------------
be/src/runtime/buffer_control_block.cpp | 44 +++++++++++----------------
be/src/runtime/buffer_control_block.h | 15 +++------
4 files changed, 33 insertions(+), 93 deletions(-)
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp
b/be/src/pipeline/exec/result_sink_operator.cpp
index e27b0da1d32..321642bdba0 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -56,9 +56,6 @@ Status ResultSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info)
SCOPED_TIMER(_open_timer);
static const std::string timer_name = "WaitForDependencyTime";
_wait_for_dependency_timer = ADD_TIMER(_profile, timer_name);
- _wait_for_queue_timer = ADD_CHILD_TIMER(_profile, "WaitForQueue",
timer_name);
- _wait_for_buffer_timer = ADD_CHILD_TIMER(_profile, "WaitForBuffer",
timer_name);
- _wait_for_cancel_timer = ADD_CHILD_TIMER(_profile, "WaitForCancel",
timer_name);
auto fragment_instance_id = state->fragment_instance_id();
// create sender
std::shared_ptr<BufferControlBlock> sender = nullptr;
@@ -66,19 +63,9 @@ Status ResultSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info)
state->fragment_instance_id(),
vectorized::RESULT_SINK_BUFFER_SIZE, &_sender, true,
state->execution_timeout()));
_result_sink_dependency =
- OrDependency::create_shared(_parent->operator_id(),
_parent->node_id());
- _buffer_dependency =
- ResultBufferDependency::create_shared(_parent->operator_id(),
_parent->node_id());
- _cancel_dependency =
- CancelDependency::create_shared(_parent->operator_id(),
_parent->node_id());
- _result_sink_dependency->add_child(_cancel_dependency);
- _result_sink_dependency->add_child(_buffer_dependency);
- _queue_dependency =
- ResultQueueDependency::create_shared(_parent->operator_id(),
_parent->node_id());
- _result_sink_dependency->add_child(_queue_dependency);
-
- ((PipBufferControlBlock*)_sender.get())
- ->set_dependency(_buffer_dependency, _queue_dependency,
_cancel_dependency);
+ ResultSinkDependency::create_shared(_parent->operator_id(),
_parent->node_id());
+
+
((PipBufferControlBlock*)_sender.get())->set_dependency(_result_sink_dependency);
return Status::OK();
}
@@ -176,13 +163,8 @@ Status ResultSinkLocalState::close(RuntimeState* state,
Status exec_status) {
return Status::OK();
}
SCOPED_TIMER(_close_timer);
- COUNTER_UPDATE(_wait_for_queue_timer,
_queue_dependency->write_watcher_elapse_time());
- COUNTER_UPDATE(exec_time_counter(),
_queue_dependency->write_watcher_elapse_time());
- COUNTER_SET(_wait_for_buffer_timer,
_buffer_dependency->write_watcher_elapse_time());
- COUNTER_UPDATE(exec_time_counter(),
_buffer_dependency->write_watcher_elapse_time());
- COUNTER_SET(_wait_for_cancel_timer,
_cancel_dependency->write_watcher_elapse_time());
- COUNTER_UPDATE(exec_time_counter(),
_cancel_dependency->write_watcher_elapse_time());
SCOPED_TIMER(exec_time_counter());
+ COUNTER_SET(_wait_for_dependency_timer,
_result_sink_dependency->write_watcher_elapse_time());
Status final_status = exec_status;
if (_writer) {
// close the writer
diff --git a/be/src/pipeline/exec/result_sink_operator.h
b/be/src/pipeline/exec/result_sink_operator.h
index e117f55d25c..53e0292f68f 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -43,33 +43,12 @@ public:
bool can_write() override;
};
-class ResultBufferDependency final : public WriteDependency {
+class ResultSinkDependency final : public WriteDependency {
public:
- ENABLE_FACTORY_CREATOR(ResultBufferDependency);
- ResultBufferDependency(int id, int node_id)
- : WriteDependency(id, node_id, "ResultBufferDependency") {}
- ~ResultBufferDependency() override = default;
-
- void* shared_state() override { return nullptr; }
-};
-
-class ResultQueueDependency final : public WriteDependency {
-public:
- ENABLE_FACTORY_CREATOR(ResultQueueDependency);
- ResultQueueDependency(int id, int node_id)
- : WriteDependency(id, node_id, "ResultQueueDependency") {}
- ~ResultQueueDependency() override = default;
-
- void* shared_state() override { return nullptr; }
-};
-
-class CancelDependency final : public WriteDependency {
-public:
- ENABLE_FACTORY_CREATOR(CancelDependency);
- CancelDependency(int id, int node_id) : WriteDependency(id, node_id,
"CancelDependency") {
- _ready_for_write = false;
- }
- ~CancelDependency() override = default;
+ ENABLE_FACTORY_CREATOR(ResultSinkDependency);
+ ResultSinkDependency(int id, int node_id)
+ : WriteDependency(id, node_id, "ResultSinkDependency") {}
+ ~ResultSinkDependency() override = default;
void* shared_state() override { return nullptr; }
};
@@ -93,15 +72,7 @@ private:
std::shared_ptr<BufferControlBlock> _sender;
std::shared_ptr<ResultWriter> _writer;
- std::shared_ptr<OrDependency> _result_sink_dependency;
- std::shared_ptr<pipeline::ResultBufferDependency> _buffer_dependency;
- std::shared_ptr<pipeline::ResultQueueDependency> _queue_dependency;
- std::shared_ptr<pipeline::CancelDependency> _cancel_dependency;
-
- RuntimeProfile::Counter* _wait_for_queue_timer = nullptr;
- RuntimeProfile::Counter* _wait_for_buffer_timer = nullptr;
- // time of prefilter input block from scanner
- RuntimeProfile::Counter* _wait_for_cancel_timer = nullptr;
+ std::shared_ptr<ResultSinkDependency> _result_sink_dependency;
};
class ResultSinkOperatorX final : public
DataSinkOperatorX<ResultSinkLocalState> {
diff --git a/be/src/runtime/buffer_control_block.cpp
b/be/src/runtime/buffer_control_block.cpp
index 17f90889022..867f34a0127 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -269,60 +269,52 @@ Status BufferControlBlock::cancel() {
Status PipBufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>&
result) {
RETURN_IF_ERROR(BufferControlBlock::add_batch(result));
- if (_buffer_dependency && _buffer_rows >= _buffer_limit) {
- _buffer_dependency->block_writing();
- }
+ _update_dependency();
return Status::OK();
}
Status
PipBufferControlBlock::add_arrow_batch(std::shared_ptr<arrow::RecordBatch>&
result) {
RETURN_IF_ERROR(BufferControlBlock::add_arrow_batch(result));
- if (_buffer_dependency && _buffer_rows >= _buffer_limit) {
- _buffer_dependency->block_writing();
- }
+ _update_dependency();
return Status::OK();
}
void PipBufferControlBlock::get_batch(GetResultBatchCtx* ctx) {
BufferControlBlock::get_batch(ctx);
- if (_buffer_dependency && _buffer_rows < _buffer_limit) {
- _buffer_dependency->set_ready_for_write();
- }
+ _update_dependency();
}
Status
PipBufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>*
result) {
RETURN_IF_ERROR(BufferControlBlock::get_arrow_batch(result));
- if (_buffer_dependency && _buffer_rows < _buffer_limit) {
- _buffer_dependency->set_ready_for_write();
- }
+ _update_dependency();
return Status::OK();
}
Status PipBufferControlBlock::cancel() {
RETURN_IF_ERROR(BufferControlBlock::cancel());
- if (_cancel_dependency) {
- _cancel_dependency->set_ready_for_write();
- }
+ _update_dependency();
return Status::OK();
}
void PipBufferControlBlock::set_dependency(
- std::shared_ptr<pipeline::ResultBufferDependency> buffer_dependency,
- std::shared_ptr<pipeline::ResultQueueDependency> queue_dependency,
- std::shared_ptr<pipeline::CancelDependency> cancel_dependency) {
- _buffer_dependency = buffer_dependency;
- _queue_dependency = queue_dependency;
- _cancel_dependency = cancel_dependency;
+ std::shared_ptr<pipeline::ResultSinkDependency>
result_sink_dependency) {
+ _result_sink_dependency = result_sink_dependency;
+}
+
+void PipBufferControlBlock::_update_dependency() {
+ if (_result_sink_dependency &&
+ (_batch_queue_empty || _buffer_rows < _buffer_limit || _is_cancelled))
{
+ _result_sink_dependency->set_ready_for_write();
+ } else if (_result_sink_dependency &&
+ (!_batch_queue_empty && _buffer_rows < _buffer_limit &&
!_is_cancelled)) {
+ _result_sink_dependency->block_writing();
+ }
}
void PipBufferControlBlock::_update_batch_queue_empty() {
_batch_queue_empty = _fe_result_batch_queue.empty() &&
_arrow_flight_batch_queue.empty();
- if (_queue_dependency && _batch_queue_empty) {
- _queue_dependency->set_ready_for_write();
- } else if (_queue_dependency) {
- _queue_dependency->block_writing();
- }
+ _update_dependency();
}
} // namespace doris
diff --git a/be/src/runtime/buffer_control_block.h
b/be/src/runtime/buffer_control_block.h
index 1b53193f917..c388cf7d500 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -48,9 +48,7 @@ class Controller;
namespace doris {
namespace pipeline {
-class ResultBufferDependency;
-class CancelDependency;
-class ResultQueueDependency;
+class ResultSinkDependency;
} // namespace pipeline
class PFetchDataResult;
@@ -169,18 +167,15 @@ public:
Status cancel() override;
- void set_dependency(std::shared_ptr<pipeline::ResultBufferDependency>
buffer_dependency,
- std::shared_ptr<pipeline::ResultQueueDependency>
queue_dependency,
- std::shared_ptr<pipeline::CancelDependency>
cancel_dependency);
+ void set_dependency(std::shared_ptr<pipeline::ResultSinkDependency>
result_sink_dependency);
private:
+ void _update_dependency();
bool _get_batch_queue_empty() override { return _batch_queue_empty; }
void _update_batch_queue_empty() override;
- std::atomic_bool _batch_queue_empty = false;
- std::shared_ptr<pipeline::ResultBufferDependency> _buffer_dependency =
nullptr;
- std::shared_ptr<pipeline::ResultQueueDependency> _queue_dependency =
nullptr;
- std::shared_ptr<pipeline::CancelDependency> _cancel_dependency = nullptr;
+ std::atomic_bool _batch_queue_empty {false};
+ std::shared_ptr<pipeline::ResultSinkDependency> _result_sink_dependency
{nullptr};
};
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]