This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8b225c6c3c4 [pipelineX](fix) Fix core dump if cancelled (#29138)
8b225c6c3c4 is described below
commit 8b225c6c3c4872c6a99a2044f2b3345e5d560af2
Author: Gabriel <[email protected]>
AuthorDate: Thu Dec 28 10:04:51 2023 +0800
[pipelineX](fix) Fix core dump if cancelled (#29138)
---
be/src/pipeline/exec/analytic_source_operator.cpp | 3 --
be/src/pipeline/pipeline_x/dependency.h | 32 ++++++++--------------
.../local_exchange_source_operator.cpp | 2 +-
be/src/pipeline/pipeline_x/operator.cpp | 6 ----
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 1 +
be/src/vec/exec/vaggregation_node.cpp | 3 --
be/src/vec/exec/vanalytic_eval_node.cpp | 3 --
be/src/vec/exprs/vectorized_agg_fn.cpp | 2 --
be/src/vec/exprs/vectorized_agg_fn.h | 2 --
9 files changed, 13 insertions(+), 41 deletions(-)
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp
b/be/src/pipeline/exec/analytic_source_operator.cpp
index 3c99d5bbdd1..a95c2a1225a 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -559,9 +559,6 @@ Status AnalyticLocalState::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
- for (auto* agg_function : _agg_functions) {
- agg_function->close(state);
- }
static_cast<void>(_destroy_agg_status());
_agg_arena_pool = nullptr;
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index 5b57f48dae8..116a9f3e87c 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -59,9 +59,6 @@ struct BasicSharedState {
Dependency* source_dep = nullptr;
Dependency* sink_dep = nullptr;
- std::atomic<int> ref_count = 0;
-
- void ref() { ref_count++; }
virtual Status close(RuntimeState* state) { return Status::OK(); }
virtual ~BasicSharedState() = default;
};
@@ -296,24 +293,17 @@ public:
agg_data = std::make_unique<vectorized::AggregatedDataVariants>();
agg_arena_pool = std::make_unique<vectorized::Arena>();
}
- ~AggSharedState() override = default;
+ ~AggSharedState() override {
+ if (probe_expr_ctxs.empty()) {
+ _close_without_key();
+ } else {
+ _close_with_serialized_key();
+ }
+ }
void init_spill_partition_helper(size_t spill_partition_count_bits) {
spill_partition_helper =
std::make_unique<vectorized::SpillPartitionHelper>(spill_partition_count_bits);
}
- Status close(RuntimeState* state) override {
- if (ref_count.fetch_sub(1) == 1) {
- for (auto* aggregate_evaluator : aggregate_evaluators) {
- aggregate_evaluator->close(state);
- }
- if (probe_expr_ctxs.empty()) {
- _close_without_key();
- } else {
- _close_with_serialized_key();
- }
- }
- return Status::OK();
- }
vectorized::AggregatedDataVariantsUPtr agg_data = nullptr;
std::unique_ptr<vectorized::AggregateDataContainer>
aggregate_data_container;
@@ -620,25 +610,25 @@ struct LocalExchangeSharedState : public BasicSharedState
{
public:
ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
std::unique_ptr<Exchanger> exchanger {};
- std::vector<Dependency*> source_dependencies;
+ std::vector<DependencySPtr> source_dependencies;
Dependency* sink_dependency;
std::vector<MemTracker*> mem_trackers;
std::atomic<size_t> mem_usage = 0;
std::mutex le_lock;
void sub_running_sink_operators();
void _set_ready_for_read() {
- for (auto* dep : source_dependencies) {
+ for (auto& dep : source_dependencies) {
DCHECK(dep);
dep->set_ready();
}
}
- void set_dep_by_channel_id(Dependency* dep, int channel_id) {
+ void set_dep_by_channel_id(DependencySPtr dep, int channel_id) {
source_dependencies[channel_id] = dep;
}
void set_ready_to_read(int channel_id) {
- auto* dep = source_dependencies[channel_id];
+ auto& dep = source_dependencies[channel_id];
DCHECK(dep) << channel_id;
dep->set_ready();
}
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
index c1933f93c71..029dcb15a48 100644
---
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
+++
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
@@ -37,7 +37,7 @@ Status LocalExchangeSourceLocalState::init(RuntimeState*
state, LocalStateInfo&
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
_channel_id = info.task_idx;
- _shared_state->set_dep_by_channel_id(_dependency, _channel_id);
+ _shared_state->set_dep_by_channel_id(info.dependency, _channel_id);
_shared_state->mem_trackers[_channel_id] = _mem_tracker.get();
_exchanger = _shared_state->exchanger.get();
DCHECK(_exchanger != nullptr);
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index e763a6eee1d..c5dbd34619f 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -335,7 +335,6 @@ Status
PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalState
_dependency->set_shared_state(info.le_state_map[_parent->operator_id()].first);
_shared_state =
(typename
DependencyType::SharedState*)_dependency->shared_state().get();
- _shared_state->ref();
_shared_state->source_dep = _dependency;
_shared_state->sink_dep = deps.front().get();
@@ -343,7 +342,6 @@ Status
PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalState
_dependency->set_shared_state(deps.front()->shared_state());
_shared_state =
(typename
DependencyType::SharedState*)_dependency->shared_state().get();
- _shared_state->ref();
_shared_state->source_dep = _dependency;
_shared_state->sink_dep = deps.front().get();
@@ -419,10 +417,6 @@ Status
PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_profile, "WaitForDependency[" + _dependency->name() +
"]Time", 1);
}
- if constexpr (!is_fake_shared) {
- _shared_state->ref();
- }
-
} else {
auto& deps = info.dependencys;
deps.front() = std::make_shared<FakeDependency>(0, 0,
state->get_query_ctx());
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 1a0c0749e8b..4c0c0af1cd4 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -300,6 +300,7 @@ void PipelineXTask::finalize() {
_finished = true;
std::vector<DependencySPtr> {}.swap(_downstream_dependency);
DependencyMap {}.swap(_upstream_dependency);
+ std::map<int, DependencySPtr> {}.swap(_source_dependency);
_le_state_map.clear();
}
diff --git a/be/src/vec/exec/vaggregation_node.cpp
b/be/src/vec/exec/vaggregation_node.cpp
index 69a50ba891b..020763ecff7 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -534,9 +534,6 @@ Status AggregationNode::sink(doris::RuntimeState* state,
vectorized::Block* in_b
}
void AggregationNode::release_resource(RuntimeState* state) {
- for (auto* aggregate_evaluator : _aggregate_evaluators) {
- aggregate_evaluator->close(state);
- }
if (_executor.close) {
_executor.close();
}
diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp
b/be/src/vec/exec/vanalytic_eval_node.cpp
index f279352dd90..918d86fb753 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -292,9 +292,6 @@ void VAnalyticEvalNode::release_resource(RuntimeState*
state) {
if (is_closed()) {
return;
}
- for (auto* agg_function : _agg_functions) {
- agg_function->close(state);
- }
static_cast<void>(_destroy_agg_status());
_release_mem();
diff --git a/be/src/vec/exprs/vectorized_agg_fn.cpp
b/be/src/vec/exprs/vectorized_agg_fn.cpp
index 0e63838e87e..166ad9bc2b2 100644
--- a/be/src/vec/exprs/vectorized_agg_fn.cpp
+++ b/be/src/vec/exprs/vectorized_agg_fn.cpp
@@ -216,8 +216,6 @@ Status AggFnEvaluator::open(RuntimeState* state) {
return VExpr::open(_input_exprs_ctxs, state);
}
-void AggFnEvaluator::close(RuntimeState* state) {}
-
void AggFnEvaluator::create(AggregateDataPtr place) {
_function->create(place);
}
diff --git a/be/src/vec/exprs/vectorized_agg_fn.h
b/be/src/vec/exprs/vectorized_agg_fn.h
index 3cabd275614..546b939ddf4 100644
--- a/be/src/vec/exprs/vectorized_agg_fn.h
+++ b/be/src/vec/exprs/vectorized_agg_fn.h
@@ -63,8 +63,6 @@ public:
Status open(RuntimeState* state);
- void close(RuntimeState* state);
-
// create/destroy AGG Data
void create(AggregateDataPtr place);
void destroy(AggregateDataPtr place);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]