This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 88d771d3602 [pipeline](fix) Avoid to use a freed dependency when 
cancelled (#34584) (#38046)
88d771d3602 is described below

commit 88d771d36021bad1a867c0841a2bca44c7f611d6
Author: Gabriel <[email protected]>
AuthorDate: Thu Jul 18 15:27:10 2024 +0800

    [pipeline](fix) Avoid to use a freed dependency when cancelled (#34584) 
(#38046)
    
    ## Proposed changes
    
    pick #34584
    <!--Describe your changes.-->
---
 be/src/pipeline/exec/exchange_sink_operator.cpp    | 10 ++---
 be/src/pipeline/exec/exchange_sink_operator.h      |  6 +--
 be/src/pipeline/exec/exchange_source_operator.cpp  |  2 +-
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |  3 +-
 .../exec/partitioned_aggregation_sink_operator.cpp |  6 +--
 be/src/pipeline/exec/scan_operator.cpp             |  5 +--
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  |  6 +--
 be/src/pipeline/exec/union_source_operator.cpp     |  3 +-
 be/src/pipeline/pipeline_x/dependency.cpp          | 36 ++++++-----------
 be/src/pipeline/pipeline_x/dependency.h            | 46 +++++++---------------
 be/src/pipeline/pipeline_x/operator.cpp            | 10 ++---
 be/src/pipeline/pipeline_x/operator.h              |  6 +--
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  6 +--
 be/src/pipeline/pipeline_x/pipeline_x_task.h       | 20 +++++++---
 be/src/runtime/query_context.cpp                   |  3 +-
 be/src/vec/exec/runtime_filter_consumer.cpp        |  2 +-
 be/src/vec/runtime/vdata_stream_recvr.cpp          |  3 +-
 17 files changed, 71 insertions(+), 102 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 5832a3695b8..e457f090af5 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -188,15 +188,13 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
             id, p._dest_node_id, _sender_id, _state->be_number(), state, this);
 
     register_channels(_sink_buffer.get());
-    _queue_dependency =
-            Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
-                                      "ExchangeSinkQueueDependency", true, 
state->get_query_ctx());
+    _queue_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
+                                                  
"ExchangeSinkQueueDependency", true);
     _sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
     if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) 
&&
         !only_local_exchange) {
-        _broadcast_dependency =
-                Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
-                                          "BroadcastDependency", true, 
state->get_query_ctx());
+        _broadcast_dependency = Dependency::create_shared(
+                _parent->operator_id(), _parent->node_id(), 
"BroadcastDependency", true);
         _sink_buffer->set_broadcast_dependency(_broadcast_dependency);
         _broadcast_pb_blocks =
                 
vectorized::BroadcastPBlockHolderQueue::create_shared(_broadcast_dependency);
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 4a217c940ce..5a7b8bf4201 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -89,9 +89,9 @@ public:
               current_channel_idx(0),
               only_local_exchange(false),
               _serializer(this) {
-        _finish_dependency = std::make_shared<FinishDependency>(
-                parent->operator_id(), parent->node_id(), parent->get_name() + 
"_FINISH_DEPENDENCY",
-                state->get_query_ctx());
+        _finish_dependency =
+                std::make_shared<Dependency>(parent->operator_id(), 
parent->node_id(),
+                                             parent->get_name() + 
"_FINISH_DEPENDENCY", true);
     }
 
     std::vector<Dependency*> dependencies() const override {
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 3f11b30b752..3f3ab736814 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -82,7 +82,7 @@ Status ExchangeLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, 
timer_name, 1);
     for (size_t i = 0; i < queues.size(); i++) {
         deps[i] = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
-                                            "SHUFFLE_DATA_DEPENDENCY", 
state->get_query_ctx());
+                                            "SHUFFLE_DATA_DEPENDENCY");
         queues[i]->set_dependency(deps[i]);
         metrics[i] = 
_runtime_profile->add_nonzero_counter(fmt::format("WaitForData{}", i),
                                                            TUnit ::TIME_NS, 
timer_name, 1);
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 26a63064e82..33e64017d50 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -42,8 +42,7 @@ 
HashJoinBuildSinkLocalState::HashJoinBuildSinkLocalState(DataSinkOperatorXBase*
                                                          RuntimeState* state)
         : JoinBuildSinkLocalState(parent, state) {
     _finish_dependency = std::make_shared<CountedFinishDependency>(
-            parent->operator_id(), parent->node_id(), parent->get_name() + 
"_FINISH_DEPENDENCY",
-            state->get_query_ctx());
+            parent->operator_id(), parent->node_id(), parent->get_name() + 
"_FINISH_DEPENDENCY");
 }
 
 Status HashJoinBuildSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 92cd341de19..043a28a5d9b 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -28,9 +28,9 @@ namespace doris::pipeline {
 
PartitionedAggSinkLocalState::PartitionedAggSinkLocalState(DataSinkOperatorXBase*
 parent,
                                                            RuntimeState* state)
         : Base(parent, state) {
-    _finish_dependency = std::make_shared<Dependency>(parent->operator_id(), 
parent->node_id(),
-                                                      parent->get_name() + 
"_SPILL_DEPENDENCY",
-                                                      true, 
state->get_query_ctx());
+    _finish_dependency =
+            std::make_shared<Dependency>(parent->operator_id(), 
parent->node_id(),
+                                         parent->get_name() + 
"_SPILL_DEPENDENCY", true);
 }
 Status PartitionedAggSinkLocalState::init(doris::RuntimeState* state,
                                           doris::pipeline::LocalSinkStateInfo& 
info) {
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 268089be1e5..385517fbeaf 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -104,9 +104,8 @@ bool ScanLocalState<Derived>::should_run_serial() const {
 template <typename Derived>
 Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& 
info) {
     RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
-    _scan_dependency =
-            Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
-                                      _parent->get_name() + "_DEPENDENCY", 
state->get_query_ctx());
+    _scan_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
+                                                 _parent->get_name() + 
"_DEPENDENCY");
     _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
             _runtime_profile, "WaitForDependency[" + _scan_dependency->name() 
+ "]Time", 1);
     SCOPED_TIMER(exec_time_counter());
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 004283841ec..c945d16cf57 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -23,9 +23,9 @@
 namespace doris::pipeline {
 SpillSortSinkLocalState::SpillSortSinkLocalState(DataSinkOperatorXBase* 
parent, RuntimeState* state)
         : Base(parent, state) {
-    _finish_dependency = std::make_shared<Dependency>(parent->operator_id(), 
parent->node_id(),
-                                                      parent->get_name() + 
"_SPILL_DEPENDENCY",
-                                                      true, 
state->get_query_ctx());
+    _finish_dependency =
+            std::make_shared<Dependency>(parent->operator_id(), 
parent->node_id(),
+                                         parent->get_name() + 
"_SPILL_DEPENDENCY", true);
 }
 
 Status SpillSortSinkLocalState::init(doris::RuntimeState* state,
diff --git a/be/src/pipeline/exec/union_source_operator.cpp 
b/be/src/pipeline/exec/union_source_operator.cpp
index c89112261c1..3d92faaa608 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -117,8 +117,7 @@ Status UnionSourceLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
                 
->data_queue.set_source_dependency(_shared_state->source_deps.front());
     } else {
         _only_const_dependency = Dependency::create_shared(
-                _parent->operator_id(), _parent->node_id(), 
_parent->get_name() + "_DEPENDENCY",
-                state->get_query_ctx());
+                _parent->operator_id(), _parent->node_id(), 
_parent->get_name() + "_DEPENDENCY");
         _dependency = _only_const_dependency.get();
         _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
                 _runtime_profile, "WaitForDependency[" + _dependency->name() + 
"]Time", 1);
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp 
b/be/src/pipeline/pipeline_x/dependency.cpp
index 011c32901bc..66aa14e64b0 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -31,17 +31,14 @@
 namespace doris::pipeline {
 
 Dependency* BasicSharedState::create_source_dependency(int operator_id, int 
node_id,
-                                                       std::string name, 
QueryContext* ctx) {
-    source_deps.push_back(
-            std::make_shared<Dependency>(operator_id, node_id, name + 
"_DEPENDENCY", ctx));
+                                                       std::string name) {
+    source_deps.push_back(std::make_shared<Dependency>(operator_id, node_id, 
name + "_DEPENDENCY"));
     source_deps.back()->set_shared_state(this);
     return source_deps.back().get();
 }
 
-Dependency* BasicSharedState::create_sink_dependency(int dest_id, int node_id, 
std::string name,
-                                                     QueryContext* ctx) {
-    sink_deps.push_back(
-            std::make_shared<Dependency>(dest_id, node_id, name + 
"_DEPENDENCY", true, ctx));
+Dependency* BasicSharedState::create_sink_dependency(int dest_id, int node_id, 
std::string name) {
+    sink_deps.push_back(std::make_shared<Dependency>(dest_id, node_id, name + 
"_DEPENDENCY", true));
     sink_deps.back()->set_shared_state(this);
     return sink_deps.back().get();
 }
@@ -72,15 +69,6 @@ void Dependency::set_ready() {
 }
 
 Dependency* Dependency::is_blocked_by(PipelineXTask* task) {
-    std::unique_lock<std::mutex> lc(_task_lock);
-    auto ready = _ready.load() || _is_cancelled();
-    if (!ready && task) {
-        _add_block_task(task);
-    }
-    return ready ? nullptr : this;
-}
-
-Dependency* FinishDependency::is_blocked_by(PipelineXTask* task) {
     std::unique_lock<std::mutex> lc(_task_lock);
     auto ready = _ready.load();
     if (!ready && task) {
@@ -91,20 +79,18 @@ Dependency* FinishDependency::is_blocked_by(PipelineXTask* 
task) {
 
 std::string Dependency::debug_string(int indentation_level) {
     fmt::memory_buffer debug_string_buffer;
-    fmt::format_to(debug_string_buffer,
-                   "{}{}: id={}, block task = {}, ready={}, _always_ready={}, 
is cancelled={}",
+    fmt::format_to(debug_string_buffer, "{}{}: id={}, block task = {}, 
ready={}, _always_ready={}",
                    std::string(indentation_level * 2, ' '), _name, _node_id, 
_blocked_task.size(),
-                   _ready, _always_ready, _is_cancelled());
+                   _ready, _always_ready);
     return fmt::to_string(debug_string_buffer);
 }
 
 std::string CountedFinishDependency::debug_string(int indentation_level) {
     fmt::memory_buffer debug_string_buffer;
-    fmt::format_to(
-            debug_string_buffer,
-            "{}{}: id={}, block task = {}, ready={}, _always_ready={}, is 
cancelled={}, count={}",
-            std::string(indentation_level * 2, ' '), _name, _node_id, 
_blocked_task.size(), _ready,
-            _always_ready, _is_cancelled(), _counter);
+    fmt::format_to(debug_string_buffer,
+                   "{}{}: id={}, block task = {}, ready={}, _always_ready={}, 
count={}",
+                   std::string(indentation_level * 2, ' '), _name, _node_id, 
_blocked_task.size(),
+                   _ready, _always_ready, _counter);
     return fmt::to_string(debug_string_buffer);
 }
 
@@ -117,7 +103,7 @@ std::string RuntimeFilterDependency::debug_string(int 
indentation_level) {
 
 Dependency* RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) {
     std::unique_lock<std::mutex> lc(_task_lock);
-    auto ready = _ready.load() || _is_cancelled();
+    auto ready = _ready.load();
     if (!ready && task) {
         _add_block_task(task);
         task->_blocked_dep = this;
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index c82104f7535..c4b8b9b9ff0 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -80,30 +80,26 @@ struct BasicSharedState {
 
     virtual ~BasicSharedState() = default;
 
-    Dependency* create_source_dependency(int operator_id, int node_id, 
std::string name,
-                                         QueryContext* ctx);
+    Dependency* create_source_dependency(int operator_id, int node_id, 
std::string name);
 
-    Dependency* create_sink_dependency(int dest_id, int node_id, std::string 
name,
-                                       QueryContext* ctx);
+    Dependency* create_sink_dependency(int dest_id, int node_id, std::string 
name);
 };
 
 class Dependency : public std::enable_shared_from_this<Dependency> {
 public:
     ENABLE_FACTORY_CREATOR(Dependency);
-    Dependency(int id, int node_id, std::string name, QueryContext* query_ctx)
+    Dependency(int id, int node_id, std::string name)
             : _id(id),
               _node_id(node_id),
               _name(std::move(name)),
               _is_write_dependency(false),
-              _ready(false),
-              _query_ctx(query_ctx) {}
-    Dependency(int id, int node_id, std::string name, bool ready, 
QueryContext* query_ctx)
+              _ready(false) {}
+    Dependency(int id, int node_id, std::string name, bool ready)
             : _id(id),
               _node_id(node_id),
               _name(std::move(name)),
               _is_write_dependency(true),
-              _ready(ready),
-              _query_ctx(query_ctx) {}
+              _ready(ready) {}
     virtual ~Dependency() = default;
 
     bool is_write_dependency() const { return _is_write_dependency; }
@@ -167,14 +163,12 @@ public:
 
 protected:
     void _add_block_task(PipelineXTask* task);
-    bool _is_cancelled() const { return _query_ctx->is_cancelled(); }
 
     const int _id;
     const int _node_id;
     const std::string _name;
     const bool _is_write_dependency;
     std::atomic<bool> _ready;
-    const QueryContext* _query_ctx = nullptr;
 
     BasicSharedState* _shared_state = nullptr;
     MonotonicStopWatch _watcher;
@@ -200,20 +194,11 @@ public:
     [[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override { 
return nullptr; }
 };
 
-struct FinishDependency : public Dependency {
-public:
-    using SharedState = FakeSharedState;
-    FinishDependency(int id, int node_id, std::string name, QueryContext* 
query_ctx)
-            : Dependency(id, node_id, name, true, query_ctx) {}
-
-    [[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override;
-};
-
 struct CountedFinishDependency final : public Dependency {
 public:
     using SharedState = FakeSharedState;
-    CountedFinishDependency(int id, int node_id, std::string name, 
QueryContext* query_ctx)
-            : Dependency(id, node_id, name, true, query_ctx) {}
+    CountedFinishDependency(int id, int node_id, std::string name)
+            : Dependency(id, node_id, name, true) {}
 
     void add() {
         std::unique_lock<std::mutex> l(_mtx);
@@ -309,9 +294,8 @@ struct RuntimeFilterTimerQueue {
 
 class RuntimeFilterDependency final : public Dependency {
 public:
-    RuntimeFilterDependency(int id, int node_id, std::string name, 
QueryContext* query_ctx,
-                            IRuntimeFilter* runtime_filter)
-            : Dependency(id, node_id, name, query_ctx), 
_runtime_filter(runtime_filter) {}
+    RuntimeFilterDependency(int id, int node_id, std::string name, 
IRuntimeFilter* runtime_filter)
+            : Dependency(id, node_id, name), _runtime_filter(runtime_filter) {}
     std::string debug_string(int indentation_level = 0) override;
 
     Dependency* is_blocked_by(PipelineXTask* task) override;
@@ -621,8 +605,8 @@ class AsyncWriterDependency final : public Dependency {
 public:
     using SharedState = BasicSharedState;
     ENABLE_FACTORY_CREATOR(AsyncWriterDependency);
-    AsyncWriterDependency(int id, int node_id, QueryContext* query_ctx)
-            : Dependency(id, node_id, "AsyncWriterDependency", true, 
query_ctx) {}
+    AsyncWriterDependency(int id, int node_id)
+            : Dependency(id, node_id, "AsyncWriterDependency", true) {}
     ~AsyncWriterDependency() override = default;
 };
 
@@ -771,10 +755,10 @@ public:
     std::atomic<int64_t> mem_usage = 0;
     // We need to make sure to add mem_usage first and then enqueue, otherwise 
sub mem_usage may cause negative mem_usage during concurrent dequeue.
     std::mutex le_lock;
-    void create_source_dependencies(int operator_id, int node_id, 
QueryContext* ctx) {
+    void create_source_dependencies(int operator_id, int node_id) {
         for (size_t i = 0; i < source_deps.size(); i++) {
-            source_deps[i] = std::make_shared<Dependency>(
-                    operator_id, node_id, 
"LOCAL_EXCHANGE_OPERATOR_DEPENDENCY", ctx);
+            source_deps[i] = std::make_shared<Dependency>(operator_id, node_id,
+                                                          
"LOCAL_EXCHANGE_OPERATOR_DEPENDENCY");
             source_deps[i]->set_shared_state(this);
         }
     };
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index 7e047851d40..e577fe707fe 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -400,8 +400,7 @@ Status 
PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
             _shared_state = info.shared_state->template cast<SharedStateArg>();
 
             _dependency = _shared_state->create_source_dependency(
-                    _parent->operator_id(), _parent->node_id(), 
_parent->get_name(),
-                    state->get_query_ctx());
+                    _parent->operator_id(), _parent->node_id(), 
_parent->get_name());
             _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
                     _runtime_profile, "WaitForDependency[" + 
_dependency->name() + "]Time", 1);
         }
@@ -482,8 +481,7 @@ Status 
PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink
         } else {
             _shared_state = info.shared_state->template cast<SharedState>();
             _dependency = _shared_state->create_sink_dependency(
-                    _parent->dests_id().front(), _parent->node_id(), 
_parent->get_name(),
-                    state->get_query_ctx());
+                    _parent->dests_id().front(), _parent->node_id(), 
_parent->get_name());
         }
         _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
                 _profile, "WaitForDependency[" + _dependency->name() + 
"]Time", 1);
@@ -563,8 +561,8 @@ template <typename Writer, typename Parent>
 Status AsyncWriterSink<Writer, Parent>::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
     _writer.reset(new Writer(info.tsink, _output_vexpr_ctxs));
-    _async_writer_dependency = AsyncWriterDependency::create_shared(
-            _parent->operator_id(), _parent->node_id(), 
state->get_query_ctx());
+    _async_writer_dependency =
+            AsyncWriterDependency::create_shared(_parent->operator_id(), 
_parent->node_id());
     _writer->set_dependency(_async_writer_dependency.get(), 
_finish_dependency.get());
 
     _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index 97035358037..304d3051214 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -869,9 +869,9 @@ public:
     using Base = PipelineXSinkLocalState<FakeSharedState>;
     AsyncWriterSink(DataSinkOperatorXBase* parent, RuntimeState* state)
             : Base(parent, state), _async_writer_dependency(nullptr) {
-        _finish_dependency = std::make_shared<FinishDependency>(
-                parent->operator_id(), parent->node_id(), parent->get_name() + 
"_FINISH_DEPENDENCY",
-                state->get_query_ctx());
+        _finish_dependency =
+                std::make_shared<Dependency>(parent->operator_id(), 
parent->node_id(),
+                                             parent->get_name() + 
"_FINISH_DEPENDENCY", true);
     }
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 854bd1e7def..90c5394debd 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -851,8 +851,7 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
                                      
std::to_string((int)data_distribution.distribution_type));
     }
     auto sink_dep = std::make_shared<Dependency>(sink_id, local_exchange_id,
-                                                 
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true,
-                                                 
_runtime_state->get_query_ctx());
+                                                 
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true);
     sink_dep->set_shared_state(shared_state.get());
     shared_state->sink_deps.push_back(sink_dep);
     _op_id_to_le_state.insert({local_exchange_id, {shared_state, sink_dep}});
@@ -877,8 +876,7 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
     }
     operator_xs.insert(operator_xs.begin(), source_op);
 
-    shared_state->create_source_dependencies(source_op->operator_id(), 
source_op->node_id(),
-                                             _query_ctx.get());
+    shared_state->create_source_dependencies(source_op->operator_id(), 
source_op->node_id());
 
     // 5. Set children for two pipelines separately.
     std::vector<std::shared_ptr<Pipeline>> new_children;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h 
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index d51cd670729..ae89fe2cdde 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -139,12 +139,22 @@ public:
     int task_id() const { return _index; };
 
     void clear_blocking_state() {
-        // Another thread may call finalize to release all dependencies
-        // And then it will core.
+        // We use a lock to assure all dependencies are not deconstructed here.
         std::unique_lock<std::mutex> lc(_dependency_lock);
-        if (!_finished && get_state() != PipelineTaskState::PENDING_FINISH && 
_blocked_dep) {
-            _blocked_dep->set_ready();
-            _blocked_dep = nullptr;
+        if (!_finished) {
+            _execution_dep->set_always_ready();
+            for (auto* dep : _filter_dependencies) {
+                dep->set_always_ready();
+            }
+            for (auto* dep : _read_dependencies) {
+                dep->set_always_ready();
+            }
+            for (auto* dep : _write_dependencies) {
+                dep->set_always_ready();
+            }
+            for (auto* dep : _finish_dependencies) {
+                dep->set_always_ready();
+            }
         }
     }
 
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 40518e62cc8..b8dfb176d98 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -57,8 +57,7 @@ QueryContext::QueryContext(TUniqueId query_id, int 
total_fragment_num, ExecEnv*
     _start_time = VecDateTimeValue::local_time();
     _shared_hash_table_controller.reset(new 
vectorized::SharedHashTableController());
     _shared_scanner_controller.reset(new 
vectorized::SharedScannerController());
-    _execution_dependency =
-            pipeline::Dependency::create_unique(-1, -1, "ExecutionDependency", 
this);
+    _execution_dependency = pipeline::Dependency::create_unique(-1, -1, 
"ExecutionDependency");
     _runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(
             TUniqueId(), RuntimeFilterParamsContext::create(this), 
query_mem_tracker);
 
diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp 
b/be/src/vec/exec/runtime_filter_consumer.cpp
index 30c2cc14917..3ec6f56b3c4 100644
--- a/be/src/vec/exec/runtime_filter_consumer.cpp
+++ b/be/src/vec/exec/runtime_filter_consumer.cpp
@@ -89,7 +89,7 @@ void RuntimeFilterConsumer::init_runtime_filter_dependency(
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
         IRuntimeFilter* runtime_filter = 
_runtime_filter_ctxs[i].runtime_filter;
         runtime_filter_dependencies[i] = 
std::make_shared<pipeline::RuntimeFilterDependency>(
-                id, node_id, name, _state->get_query_ctx(), runtime_filter);
+                id, node_id, name, runtime_filter);
         _runtime_filter_ctxs[i].runtime_filter_dependency = 
runtime_filter_dependencies[i].get();
         runtime_filter_timers[i] = 
std::make_shared<pipeline::RuntimeFilterTimer>(
                 runtime_filter->registration_time(), 
runtime_filter->wait_time_ms(),
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index fe46743053a..916d874c02e 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -361,8 +361,7 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* 
stream_mgr, RuntimeState* sta
         _sender_to_local_channel_dependency.resize(num_queues);
         for (size_t i = 0; i < num_queues; i++) {
             _sender_to_local_channel_dependency[i] = 
pipeline::Dependency::create_shared(
-                    _dest_node_id, _dest_node_id, 
"LocalExchangeChannelDependency", true,
-                    state->get_query_ctx());
+                    _dest_node_id, _dest_node_id, 
"LocalExchangeChannelDependency", true);
         }
     }
     _sender_queues.reserve(num_queues);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to