This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b817197b2b [pipelineX](sink) simplify result sink dependency (#27226)
7b817197b2b is described below

commit 7b817197b2b8a9d6f7b3aac9c7724c8bae03e956
Author: Gabriel <[email protected]>
AuthorDate: Sun Nov 19 12:42:10 2023 +0800

    [pipelineX](sink) simplify result sink dependency (#27226)
---
 be/src/pipeline/exec/result_sink_operator.cpp | 26 +++-------------
 be/src/pipeline/exec/result_sink_operator.h   | 41 ++++---------------------
 be/src/runtime/buffer_control_block.cpp       | 44 +++++++++++----------------
 be/src/runtime/buffer_control_block.h         | 15 +++------
 4 files changed, 33 insertions(+), 93 deletions(-)

diff --git a/be/src/pipeline/exec/result_sink_operator.cpp 
b/be/src/pipeline/exec/result_sink_operator.cpp
index e27b0da1d32..321642bdba0 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -56,9 +56,6 @@ Status ResultSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info)
     SCOPED_TIMER(_open_timer);
     static const std::string timer_name = "WaitForDependencyTime";
     _wait_for_dependency_timer = ADD_TIMER(_profile, timer_name);
-    _wait_for_queue_timer = ADD_CHILD_TIMER(_profile, "WaitForQueue", 
timer_name);
-    _wait_for_buffer_timer = ADD_CHILD_TIMER(_profile, "WaitForBuffer", 
timer_name);
-    _wait_for_cancel_timer = ADD_CHILD_TIMER(_profile, "WaitForCancel", 
timer_name);
     auto fragment_instance_id = state->fragment_instance_id();
     // create sender
     std::shared_ptr<BufferControlBlock> sender = nullptr;
@@ -66,19 +63,9 @@ Status ResultSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info)
             state->fragment_instance_id(), 
vectorized::RESULT_SINK_BUFFER_SIZE, &_sender, true,
             state->execution_timeout()));
     _result_sink_dependency =
-            OrDependency::create_shared(_parent->operator_id(), 
_parent->node_id());
-    _buffer_dependency =
-            ResultBufferDependency::create_shared(_parent->operator_id(), 
_parent->node_id());
-    _cancel_dependency =
-            CancelDependency::create_shared(_parent->operator_id(), 
_parent->node_id());
-    _result_sink_dependency->add_child(_cancel_dependency);
-    _result_sink_dependency->add_child(_buffer_dependency);
-    _queue_dependency =
-            ResultQueueDependency::create_shared(_parent->operator_id(), 
_parent->node_id());
-    _result_sink_dependency->add_child(_queue_dependency);
-
-    ((PipBufferControlBlock*)_sender.get())
-            ->set_dependency(_buffer_dependency, _queue_dependency, 
_cancel_dependency);
+            ResultSinkDependency::create_shared(_parent->operator_id(), 
_parent->node_id());
+
+    
((PipBufferControlBlock*)_sender.get())->set_dependency(_result_sink_dependency);
     return Status::OK();
 }
 
@@ -176,13 +163,8 @@ Status ResultSinkLocalState::close(RuntimeState* state, 
Status exec_status) {
         return Status::OK();
     }
     SCOPED_TIMER(_close_timer);
-    COUNTER_UPDATE(_wait_for_queue_timer, 
_queue_dependency->write_watcher_elapse_time());
-    COUNTER_UPDATE(exec_time_counter(), 
_queue_dependency->write_watcher_elapse_time());
-    COUNTER_SET(_wait_for_buffer_timer, 
_buffer_dependency->write_watcher_elapse_time());
-    COUNTER_UPDATE(exec_time_counter(), 
_buffer_dependency->write_watcher_elapse_time());
-    COUNTER_SET(_wait_for_cancel_timer, 
_cancel_dependency->write_watcher_elapse_time());
-    COUNTER_UPDATE(exec_time_counter(), 
_cancel_dependency->write_watcher_elapse_time());
     SCOPED_TIMER(exec_time_counter());
+    COUNTER_SET(_wait_for_dependency_timer, 
_result_sink_dependency->write_watcher_elapse_time());
     Status final_status = exec_status;
     if (_writer) {
         // close the writer
diff --git a/be/src/pipeline/exec/result_sink_operator.h 
b/be/src/pipeline/exec/result_sink_operator.h
index e117f55d25c..53e0292f68f 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -43,33 +43,12 @@ public:
     bool can_write() override;
 };
 
-class ResultBufferDependency final : public WriteDependency {
+class ResultSinkDependency final : public WriteDependency {
 public:
-    ENABLE_FACTORY_CREATOR(ResultBufferDependency);
-    ResultBufferDependency(int id, int node_id)
-            : WriteDependency(id, node_id, "ResultBufferDependency") {}
-    ~ResultBufferDependency() override = default;
-
-    void* shared_state() override { return nullptr; }
-};
-
-class ResultQueueDependency final : public WriteDependency {
-public:
-    ENABLE_FACTORY_CREATOR(ResultQueueDependency);
-    ResultQueueDependency(int id, int node_id)
-            : WriteDependency(id, node_id, "ResultQueueDependency") {}
-    ~ResultQueueDependency() override = default;
-
-    void* shared_state() override { return nullptr; }
-};
-
-class CancelDependency final : public WriteDependency {
-public:
-    ENABLE_FACTORY_CREATOR(CancelDependency);
-    CancelDependency(int id, int node_id) : WriteDependency(id, node_id, 
"CancelDependency") {
-        _ready_for_write = false;
-    }
-    ~CancelDependency() override = default;
+    ENABLE_FACTORY_CREATOR(ResultSinkDependency);
+    ResultSinkDependency(int id, int node_id)
+            : WriteDependency(id, node_id, "ResultSinkDependency") {}
+    ~ResultSinkDependency() override = default;
 
     void* shared_state() override { return nullptr; }
 };
@@ -93,15 +72,7 @@ private:
 
     std::shared_ptr<BufferControlBlock> _sender;
     std::shared_ptr<ResultWriter> _writer;
-    std::shared_ptr<OrDependency> _result_sink_dependency;
-    std::shared_ptr<pipeline::ResultBufferDependency> _buffer_dependency;
-    std::shared_ptr<pipeline::ResultQueueDependency> _queue_dependency;
-    std::shared_ptr<pipeline::CancelDependency> _cancel_dependency;
-
-    RuntimeProfile::Counter* _wait_for_queue_timer = nullptr;
-    RuntimeProfile::Counter* _wait_for_buffer_timer = nullptr;
-    // time of prefilter input block from scanner
-    RuntimeProfile::Counter* _wait_for_cancel_timer = nullptr;
+    std::shared_ptr<ResultSinkDependency> _result_sink_dependency;
 };
 
 class ResultSinkOperatorX final : public 
DataSinkOperatorX<ResultSinkLocalState> {
diff --git a/be/src/runtime/buffer_control_block.cpp 
b/be/src/runtime/buffer_control_block.cpp
index 17f90889022..867f34a0127 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -269,60 +269,52 @@ Status BufferControlBlock::cancel() {
 
 Status PipBufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& 
result) {
     RETURN_IF_ERROR(BufferControlBlock::add_batch(result));
-    if (_buffer_dependency && _buffer_rows >= _buffer_limit) {
-        _buffer_dependency->block_writing();
-    }
+    _update_dependency();
     return Status::OK();
 }
 
 Status 
PipBufferControlBlock::add_arrow_batch(std::shared_ptr<arrow::RecordBatch>& 
result) {
     RETURN_IF_ERROR(BufferControlBlock::add_arrow_batch(result));
-    if (_buffer_dependency && _buffer_rows >= _buffer_limit) {
-        _buffer_dependency->block_writing();
-    }
+    _update_dependency();
     return Status::OK();
 }
 
 void PipBufferControlBlock::get_batch(GetResultBatchCtx* ctx) {
     BufferControlBlock::get_batch(ctx);
-    if (_buffer_dependency && _buffer_rows < _buffer_limit) {
-        _buffer_dependency->set_ready_for_write();
-    }
+    _update_dependency();
 }
 
 Status 
PipBufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* 
result) {
     RETURN_IF_ERROR(BufferControlBlock::get_arrow_batch(result));
-    if (_buffer_dependency && _buffer_rows < _buffer_limit) {
-        _buffer_dependency->set_ready_for_write();
-    }
+    _update_dependency();
     return Status::OK();
 }
 
 Status PipBufferControlBlock::cancel() {
     RETURN_IF_ERROR(BufferControlBlock::cancel());
-    if (_cancel_dependency) {
-        _cancel_dependency->set_ready_for_write();
-    }
+    _update_dependency();
 
     return Status::OK();
 }
 
 void PipBufferControlBlock::set_dependency(
-        std::shared_ptr<pipeline::ResultBufferDependency> buffer_dependency,
-        std::shared_ptr<pipeline::ResultQueueDependency> queue_dependency,
-        std::shared_ptr<pipeline::CancelDependency> cancel_dependency) {
-    _buffer_dependency = buffer_dependency;
-    _queue_dependency = queue_dependency;
-    _cancel_dependency = cancel_dependency;
+        std::shared_ptr<pipeline::ResultSinkDependency> 
result_sink_dependency) {
+    _result_sink_dependency = result_sink_dependency;
+}
+
+void PipBufferControlBlock::_update_dependency() {
+    if (_result_sink_dependency &&
+        (_batch_queue_empty || _buffer_rows < _buffer_limit || _is_cancelled)) 
{
+        _result_sink_dependency->set_ready_for_write();
+    } else if (_result_sink_dependency &&
+               (!_batch_queue_empty && _buffer_rows < _buffer_limit && 
!_is_cancelled)) {
+        _result_sink_dependency->block_writing();
+    }
 }
 
 void PipBufferControlBlock::_update_batch_queue_empty() {
     _batch_queue_empty = _fe_result_batch_queue.empty() && 
_arrow_flight_batch_queue.empty();
-    if (_queue_dependency && _batch_queue_empty) {
-        _queue_dependency->set_ready_for_write();
-    } else if (_queue_dependency) {
-        _queue_dependency->block_writing();
-    }
+    _update_dependency();
 }
 
 } // namespace doris
diff --git a/be/src/runtime/buffer_control_block.h 
b/be/src/runtime/buffer_control_block.h
index 1b53193f917..c388cf7d500 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -48,9 +48,7 @@ class Controller;
 namespace doris {
 
 namespace pipeline {
-class ResultBufferDependency;
-class CancelDependency;
-class ResultQueueDependency;
+class ResultSinkDependency;
 } // namespace pipeline
 
 class PFetchDataResult;
@@ -169,18 +167,15 @@ public:
 
     Status cancel() override;
 
-    void set_dependency(std::shared_ptr<pipeline::ResultBufferDependency> 
buffer_dependency,
-                        std::shared_ptr<pipeline::ResultQueueDependency> 
queue_dependency,
-                        std::shared_ptr<pipeline::CancelDependency> 
cancel_dependency);
+    void set_dependency(std::shared_ptr<pipeline::ResultSinkDependency> 
result_sink_dependency);
 
 private:
+    void _update_dependency();
     bool _get_batch_queue_empty() override { return _batch_queue_empty; }
     void _update_batch_queue_empty() override;
 
-    std::atomic_bool _batch_queue_empty = false;
-    std::shared_ptr<pipeline::ResultBufferDependency> _buffer_dependency = 
nullptr;
-    std::shared_ptr<pipeline::ResultQueueDependency> _queue_dependency = 
nullptr;
-    std::shared_ptr<pipeline::CancelDependency> _cancel_dependency = nullptr;
+    std::atomic_bool _batch_queue_empty {false};
+    std::shared_ptr<pipeline::ResultSinkDependency> _result_sink_dependency 
{nullptr};
 };
 
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to