This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 88d771d3602 [pipeline](fix) Avoid to use a freed dependency when
cancelled (#34584) (#38046)
88d771d3602 is described below
commit 88d771d36021bad1a867c0841a2bca44c7f611d6
Author: Gabriel <[email protected]>
AuthorDate: Thu Jul 18 15:27:10 2024 +0800
[pipeline](fix) Avoid to use a freed dependency when cancelled (#34584)
(#38046)
## Proposed changes
pick #34584
<!--Describe your changes.-->
---
be/src/pipeline/exec/exchange_sink_operator.cpp | 10 ++---
be/src/pipeline/exec/exchange_sink_operator.h | 6 +--
be/src/pipeline/exec/exchange_source_operator.cpp | 2 +-
be/src/pipeline/exec/hashjoin_build_sink.cpp | 3 +-
.../exec/partitioned_aggregation_sink_operator.cpp | 6 +--
be/src/pipeline/exec/scan_operator.cpp | 5 +--
be/src/pipeline/exec/spill_sort_sink_operator.cpp | 6 +--
be/src/pipeline/exec/union_source_operator.cpp | 3 +-
be/src/pipeline/pipeline_x/dependency.cpp | 36 ++++++-----------
be/src/pipeline/pipeline_x/dependency.h | 46 +++++++---------------
be/src/pipeline/pipeline_x/operator.cpp | 10 ++---
be/src/pipeline/pipeline_x/operator.h | 6 +--
.../pipeline_x/pipeline_x_fragment_context.cpp | 6 +--
be/src/pipeline/pipeline_x/pipeline_x_task.h | 20 +++++++---
be/src/runtime/query_context.cpp | 3 +-
be/src/vec/exec/runtime_filter_consumer.cpp | 2 +-
be/src/vec/runtime/vdata_stream_recvr.cpp | 3 +-
17 files changed, 71 insertions(+), 102 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 5832a3695b8..e457f090af5 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -188,15 +188,13 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
id, p._dest_node_id, _sender_id, _state->be_number(), state, this);
register_channels(_sink_buffer.get());
- _queue_dependency =
- Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
- "ExchangeSinkQueueDependency", true,
state->get_query_ctx());
+ _queue_dependency = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
+
"ExchangeSinkQueueDependency", true);
_sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 1)
&&
!only_local_exchange) {
- _broadcast_dependency =
- Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
- "BroadcastDependency", true,
state->get_query_ctx());
+ _broadcast_dependency = Dependency::create_shared(
+ _parent->operator_id(), _parent->node_id(),
"BroadcastDependency", true);
_sink_buffer->set_broadcast_dependency(_broadcast_dependency);
_broadcast_pb_blocks =
vectorized::BroadcastPBlockHolderQueue::create_shared(_broadcast_dependency);
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index 4a217c940ce..5a7b8bf4201 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -89,9 +89,9 @@ public:
current_channel_idx(0),
only_local_exchange(false),
_serializer(this) {
- _finish_dependency = std::make_shared<FinishDependency>(
- parent->operator_id(), parent->node_id(), parent->get_name() +
"_FINISH_DEPENDENCY",
- state->get_query_ctx());
+ _finish_dependency =
+ std::make_shared<Dependency>(parent->operator_id(),
parent->node_id(),
+ parent->get_name() +
"_FINISH_DEPENDENCY", true);
}
std::vector<Dependency*> dependencies() const override {
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 3f11b30b752..3f3ab736814 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -82,7 +82,7 @@ Status ExchangeLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile,
timer_name, 1);
for (size_t i = 0; i < queues.size(); i++) {
deps[i] = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
- "SHUFFLE_DATA_DEPENDENCY",
state->get_query_ctx());
+ "SHUFFLE_DATA_DEPENDENCY");
queues[i]->set_dependency(deps[i]);
metrics[i] =
_runtime_profile->add_nonzero_counter(fmt::format("WaitForData{}", i),
TUnit ::TIME_NS,
timer_name, 1);
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 26a63064e82..33e64017d50 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -42,8 +42,7 @@
HashJoinBuildSinkLocalState::HashJoinBuildSinkLocalState(DataSinkOperatorXBase*
RuntimeState* state)
: JoinBuildSinkLocalState(parent, state) {
_finish_dependency = std::make_shared<CountedFinishDependency>(
- parent->operator_id(), parent->node_id(), parent->get_name() +
"_FINISH_DEPENDENCY",
- state->get_query_ctx());
+ parent->operator_id(), parent->node_id(), parent->get_name() +
"_FINISH_DEPENDENCY");
}
Status HashJoinBuildSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info) {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 92cd341de19..043a28a5d9b 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -28,9 +28,9 @@ namespace doris::pipeline {
PartitionedAggSinkLocalState::PartitionedAggSinkLocalState(DataSinkOperatorXBase*
parent,
RuntimeState* state)
: Base(parent, state) {
- _finish_dependency = std::make_shared<Dependency>(parent->operator_id(),
parent->node_id(),
- parent->get_name() +
"_SPILL_DEPENDENCY",
- true,
state->get_query_ctx());
+ _finish_dependency =
+ std::make_shared<Dependency>(parent->operator_id(),
parent->node_id(),
+ parent->get_name() +
"_SPILL_DEPENDENCY", true);
}
Status PartitionedAggSinkLocalState::init(doris::RuntimeState* state,
doris::pipeline::LocalSinkStateInfo&
info) {
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 268089be1e5..385517fbeaf 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -104,9 +104,8 @@ bool ScanLocalState<Derived>::should_run_serial() const {
template <typename Derived>
Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo&
info) {
RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
- _scan_dependency =
- Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
- _parent->get_name() + "_DEPENDENCY",
state->get_query_ctx());
+ _scan_dependency = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
+ _parent->get_name() +
"_DEPENDENCY");
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_runtime_profile, "WaitForDependency[" + _scan_dependency->name()
+ "]Time", 1);
SCOPED_TIMER(exec_time_counter());
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 004283841ec..c945d16cf57 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -23,9 +23,9 @@
namespace doris::pipeline {
SpillSortSinkLocalState::SpillSortSinkLocalState(DataSinkOperatorXBase*
parent, RuntimeState* state)
: Base(parent, state) {
- _finish_dependency = std::make_shared<Dependency>(parent->operator_id(),
parent->node_id(),
- parent->get_name() +
"_SPILL_DEPENDENCY",
- true,
state->get_query_ctx());
+ _finish_dependency =
+ std::make_shared<Dependency>(parent->operator_id(),
parent->node_id(),
+ parent->get_name() +
"_SPILL_DEPENDENCY", true);
}
Status SpillSortSinkLocalState::init(doris::RuntimeState* state,
diff --git a/be/src/pipeline/exec/union_source_operator.cpp
b/be/src/pipeline/exec/union_source_operator.cpp
index c89112261c1..3d92faaa608 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -117,8 +117,7 @@ Status UnionSourceLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
->data_queue.set_source_dependency(_shared_state->source_deps.front());
} else {
_only_const_dependency = Dependency::create_shared(
- _parent->operator_id(), _parent->node_id(),
_parent->get_name() + "_DEPENDENCY",
- state->get_query_ctx());
+ _parent->operator_id(), _parent->node_id(),
_parent->get_name() + "_DEPENDENCY");
_dependency = _only_const_dependency.get();
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_runtime_profile, "WaitForDependency[" + _dependency->name() +
"]Time", 1);
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp
b/be/src/pipeline/pipeline_x/dependency.cpp
index 011c32901bc..66aa14e64b0 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -31,17 +31,14 @@
namespace doris::pipeline {
Dependency* BasicSharedState::create_source_dependency(int operator_id, int
node_id,
- std::string name,
QueryContext* ctx) {
- source_deps.push_back(
- std::make_shared<Dependency>(operator_id, node_id, name +
"_DEPENDENCY", ctx));
+ std::string name) {
+ source_deps.push_back(std::make_shared<Dependency>(operator_id, node_id,
name + "_DEPENDENCY"));
source_deps.back()->set_shared_state(this);
return source_deps.back().get();
}
-Dependency* BasicSharedState::create_sink_dependency(int dest_id, int node_id,
std::string name,
- QueryContext* ctx) {
- sink_deps.push_back(
- std::make_shared<Dependency>(dest_id, node_id, name +
"_DEPENDENCY", true, ctx));
+Dependency* BasicSharedState::create_sink_dependency(int dest_id, int node_id,
std::string name) {
+ sink_deps.push_back(std::make_shared<Dependency>(dest_id, node_id, name +
"_DEPENDENCY", true));
sink_deps.back()->set_shared_state(this);
return sink_deps.back().get();
}
@@ -72,15 +69,6 @@ void Dependency::set_ready() {
}
Dependency* Dependency::is_blocked_by(PipelineXTask* task) {
- std::unique_lock<std::mutex> lc(_task_lock);
- auto ready = _ready.load() || _is_cancelled();
- if (!ready && task) {
- _add_block_task(task);
- }
- return ready ? nullptr : this;
-}
-
-Dependency* FinishDependency::is_blocked_by(PipelineXTask* task) {
std::unique_lock<std::mutex> lc(_task_lock);
auto ready = _ready.load();
if (!ready && task) {
@@ -91,20 +79,18 @@ Dependency* FinishDependency::is_blocked_by(PipelineXTask*
task) {
std::string Dependency::debug_string(int indentation_level) {
fmt::memory_buffer debug_string_buffer;
- fmt::format_to(debug_string_buffer,
- "{}{}: id={}, block task = {}, ready={}, _always_ready={},
is cancelled={}",
+ fmt::format_to(debug_string_buffer, "{}{}: id={}, block task = {},
ready={}, _always_ready={}",
std::string(indentation_level * 2, ' '), _name, _node_id,
_blocked_task.size(),
- _ready, _always_ready, _is_cancelled());
+ _ready, _always_ready);
return fmt::to_string(debug_string_buffer);
}
std::string CountedFinishDependency::debug_string(int indentation_level) {
fmt::memory_buffer debug_string_buffer;
- fmt::format_to(
- debug_string_buffer,
- "{}{}: id={}, block task = {}, ready={}, _always_ready={}, is
cancelled={}, count={}",
- std::string(indentation_level * 2, ' '), _name, _node_id,
_blocked_task.size(), _ready,
- _always_ready, _is_cancelled(), _counter);
+ fmt::format_to(debug_string_buffer,
+ "{}{}: id={}, block task = {}, ready={}, _always_ready={},
count={}",
+ std::string(indentation_level * 2, ' '), _name, _node_id,
_blocked_task.size(),
+ _ready, _always_ready, _counter);
return fmt::to_string(debug_string_buffer);
}
@@ -117,7 +103,7 @@ std::string RuntimeFilterDependency::debug_string(int
indentation_level) {
Dependency* RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) {
std::unique_lock<std::mutex> lc(_task_lock);
- auto ready = _ready.load() || _is_cancelled();
+ auto ready = _ready.load();
if (!ready && task) {
_add_block_task(task);
task->_blocked_dep = this;
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index c82104f7535..c4b8b9b9ff0 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -80,30 +80,26 @@ struct BasicSharedState {
virtual ~BasicSharedState() = default;
- Dependency* create_source_dependency(int operator_id, int node_id,
std::string name,
- QueryContext* ctx);
+ Dependency* create_source_dependency(int operator_id, int node_id,
std::string name);
- Dependency* create_sink_dependency(int dest_id, int node_id, std::string
name,
- QueryContext* ctx);
+ Dependency* create_sink_dependency(int dest_id, int node_id, std::string
name);
};
class Dependency : public std::enable_shared_from_this<Dependency> {
public:
ENABLE_FACTORY_CREATOR(Dependency);
- Dependency(int id, int node_id, std::string name, QueryContext* query_ctx)
+ Dependency(int id, int node_id, std::string name)
: _id(id),
_node_id(node_id),
_name(std::move(name)),
_is_write_dependency(false),
- _ready(false),
- _query_ctx(query_ctx) {}
- Dependency(int id, int node_id, std::string name, bool ready,
QueryContext* query_ctx)
+ _ready(false) {}
+ Dependency(int id, int node_id, std::string name, bool ready)
: _id(id),
_node_id(node_id),
_name(std::move(name)),
_is_write_dependency(true),
- _ready(ready),
- _query_ctx(query_ctx) {}
+ _ready(ready) {}
virtual ~Dependency() = default;
bool is_write_dependency() const { return _is_write_dependency; }
@@ -167,14 +163,12 @@ public:
protected:
void _add_block_task(PipelineXTask* task);
- bool _is_cancelled() const { return _query_ctx->is_cancelled(); }
const int _id;
const int _node_id;
const std::string _name;
const bool _is_write_dependency;
std::atomic<bool> _ready;
- const QueryContext* _query_ctx = nullptr;
BasicSharedState* _shared_state = nullptr;
MonotonicStopWatch _watcher;
@@ -200,20 +194,11 @@ public:
[[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override {
return nullptr; }
};
-struct FinishDependency : public Dependency {
-public:
- using SharedState = FakeSharedState;
- FinishDependency(int id, int node_id, std::string name, QueryContext*
query_ctx)
- : Dependency(id, node_id, name, true, query_ctx) {}
-
- [[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override;
-};
-
struct CountedFinishDependency final : public Dependency {
public:
using SharedState = FakeSharedState;
- CountedFinishDependency(int id, int node_id, std::string name,
QueryContext* query_ctx)
- : Dependency(id, node_id, name, true, query_ctx) {}
+ CountedFinishDependency(int id, int node_id, std::string name)
+ : Dependency(id, node_id, name, true) {}
void add() {
std::unique_lock<std::mutex> l(_mtx);
@@ -309,9 +294,8 @@ struct RuntimeFilterTimerQueue {
class RuntimeFilterDependency final : public Dependency {
public:
- RuntimeFilterDependency(int id, int node_id, std::string name,
QueryContext* query_ctx,
- IRuntimeFilter* runtime_filter)
- : Dependency(id, node_id, name, query_ctx),
_runtime_filter(runtime_filter) {}
+ RuntimeFilterDependency(int id, int node_id, std::string name,
IRuntimeFilter* runtime_filter)
+ : Dependency(id, node_id, name), _runtime_filter(runtime_filter) {}
std::string debug_string(int indentation_level = 0) override;
Dependency* is_blocked_by(PipelineXTask* task) override;
@@ -621,8 +605,8 @@ class AsyncWriterDependency final : public Dependency {
public:
using SharedState = BasicSharedState;
ENABLE_FACTORY_CREATOR(AsyncWriterDependency);
- AsyncWriterDependency(int id, int node_id, QueryContext* query_ctx)
- : Dependency(id, node_id, "AsyncWriterDependency", true,
query_ctx) {}
+ AsyncWriterDependency(int id, int node_id)
+ : Dependency(id, node_id, "AsyncWriterDependency", true) {}
~AsyncWriterDependency() override = default;
};
@@ -771,10 +755,10 @@ public:
std::atomic<int64_t> mem_usage = 0;
// We need to make sure to add mem_usage first and then enqueue, otherwise
sub mem_usage may cause negative mem_usage during concurrent dequeue.
std::mutex le_lock;
- void create_source_dependencies(int operator_id, int node_id,
QueryContext* ctx) {
+ void create_source_dependencies(int operator_id, int node_id) {
for (size_t i = 0; i < source_deps.size(); i++) {
- source_deps[i] = std::make_shared<Dependency>(
- operator_id, node_id,
"LOCAL_EXCHANGE_OPERATOR_DEPENDENCY", ctx);
+ source_deps[i] = std::make_shared<Dependency>(operator_id, node_id,
+
"LOCAL_EXCHANGE_OPERATOR_DEPENDENCY");
source_deps[i]->set_shared_state(this);
}
};
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index 7e047851d40..e577fe707fe 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -400,8 +400,7 @@ Status
PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
_shared_state = info.shared_state->template cast<SharedStateArg>();
_dependency = _shared_state->create_source_dependency(
- _parent->operator_id(), _parent->node_id(),
_parent->get_name(),
- state->get_query_ctx());
+ _parent->operator_id(), _parent->node_id(),
_parent->get_name());
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_runtime_profile, "WaitForDependency[" +
_dependency->name() + "]Time", 1);
}
@@ -482,8 +481,7 @@ Status
PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink
} else {
_shared_state = info.shared_state->template cast<SharedState>();
_dependency = _shared_state->create_sink_dependency(
- _parent->dests_id().front(), _parent->node_id(),
_parent->get_name(),
- state->get_query_ctx());
+ _parent->dests_id().front(), _parent->node_id(),
_parent->get_name());
}
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_profile, "WaitForDependency[" + _dependency->name() +
"]Time", 1);
@@ -563,8 +561,8 @@ template <typename Writer, typename Parent>
Status AsyncWriterSink<Writer, Parent>::init(RuntimeState* state,
LocalSinkStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
_writer.reset(new Writer(info.tsink, _output_vexpr_ctxs));
- _async_writer_dependency = AsyncWriterDependency::create_shared(
- _parent->operator_id(), _parent->node_id(),
state->get_query_ctx());
+ _async_writer_dependency =
+ AsyncWriterDependency::create_shared(_parent->operator_id(),
_parent->node_id());
_writer->set_dependency(_async_writer_dependency.get(),
_finish_dependency.get());
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index 97035358037..304d3051214 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -869,9 +869,9 @@ public:
using Base = PipelineXSinkLocalState<FakeSharedState>;
AsyncWriterSink(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state), _async_writer_dependency(nullptr) {
- _finish_dependency = std::make_shared<FinishDependency>(
- parent->operator_id(), parent->node_id(), parent->get_name() +
"_FINISH_DEPENDENCY",
- state->get_query_ctx());
+ _finish_dependency =
+ std::make_shared<Dependency>(parent->operator_id(),
parent->node_id(),
+ parent->get_name() +
"_FINISH_DEPENDENCY", true);
}
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 854bd1e7def..90c5394debd 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -851,8 +851,7 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
std::to_string((int)data_distribution.distribution_type));
}
auto sink_dep = std::make_shared<Dependency>(sink_id, local_exchange_id,
-
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true,
-
_runtime_state->get_query_ctx());
+
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true);
sink_dep->set_shared_state(shared_state.get());
shared_state->sink_deps.push_back(sink_dep);
_op_id_to_le_state.insert({local_exchange_id, {shared_state, sink_dep}});
@@ -877,8 +876,7 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
}
operator_xs.insert(operator_xs.begin(), source_op);
- shared_state->create_source_dependencies(source_op->operator_id(),
source_op->node_id(),
- _query_ctx.get());
+ shared_state->create_source_dependencies(source_op->operator_id(),
source_op->node_id());
// 5. Set children for two pipelines separately.
std::vector<std::shared_ptr<Pipeline>> new_children;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index d51cd670729..ae89fe2cdde 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -139,12 +139,22 @@ public:
int task_id() const { return _index; };
void clear_blocking_state() {
- // Another thread may call finalize to release all dependencies
- // And then it will core.
+ // We use a lock to assure all dependencies are not deconstructed here.
std::unique_lock<std::mutex> lc(_dependency_lock);
- if (!_finished && get_state() != PipelineTaskState::PENDING_FINISH &&
_blocked_dep) {
- _blocked_dep->set_ready();
- _blocked_dep = nullptr;
+ if (!_finished) {
+ _execution_dep->set_always_ready();
+ for (auto* dep : _filter_dependencies) {
+ dep->set_always_ready();
+ }
+ for (auto* dep : _read_dependencies) {
+ dep->set_always_ready();
+ }
+ for (auto* dep : _write_dependencies) {
+ dep->set_always_ready();
+ }
+ for (auto* dep : _finish_dependencies) {
+ dep->set_always_ready();
+ }
}
}
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 40518e62cc8..b8dfb176d98 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -57,8 +57,7 @@ QueryContext::QueryContext(TUniqueId query_id, int
total_fragment_num, ExecEnv*
_start_time = VecDateTimeValue::local_time();
_shared_hash_table_controller.reset(new
vectorized::SharedHashTableController());
_shared_scanner_controller.reset(new
vectorized::SharedScannerController());
- _execution_dependency =
- pipeline::Dependency::create_unique(-1, -1, "ExecutionDependency",
this);
+ _execution_dependency = pipeline::Dependency::create_unique(-1, -1,
"ExecutionDependency");
_runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(
TUniqueId(), RuntimeFilterParamsContext::create(this),
query_mem_tracker);
diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp
b/be/src/vec/exec/runtime_filter_consumer.cpp
index 30c2cc14917..3ec6f56b3c4 100644
--- a/be/src/vec/exec/runtime_filter_consumer.cpp
+++ b/be/src/vec/exec/runtime_filter_consumer.cpp
@@ -89,7 +89,7 @@ void RuntimeFilterConsumer::init_runtime_filter_dependency(
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
IRuntimeFilter* runtime_filter =
_runtime_filter_ctxs[i].runtime_filter;
runtime_filter_dependencies[i] =
std::make_shared<pipeline::RuntimeFilterDependency>(
- id, node_id, name, _state->get_query_ctx(), runtime_filter);
+ id, node_id, name, runtime_filter);
_runtime_filter_ctxs[i].runtime_filter_dependency =
runtime_filter_dependencies[i].get();
runtime_filter_timers[i] =
std::make_shared<pipeline::RuntimeFilterTimer>(
runtime_filter->registration_time(),
runtime_filter->wait_time_ms(),
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index fe46743053a..916d874c02e 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -361,8 +361,7 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr*
stream_mgr, RuntimeState* sta
_sender_to_local_channel_dependency.resize(num_queues);
for (size_t i = 0; i < num_queues; i++) {
_sender_to_local_channel_dependency[i] =
pipeline::Dependency::create_shared(
- _dest_node_id, _dest_node_id,
"LocalExchangeChannelDependency", true,
- state->get_query_ctx());
+ _dest_node_id, _dest_node_id,
"LocalExchangeChannelDependency", true);
}
}
_sender_queues.reserve(num_queues);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]