This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b225c6c3c4 [pipelineX](fix) Fix core dump if cancelled (#29138)
8b225c6c3c4 is described below

commit 8b225c6c3c4872c6a99a2044f2b3345e5d560af2
Author: Gabriel <[email protected]>
AuthorDate: Thu Dec 28 10:04:51 2023 +0800

    [pipelineX](fix) Fix core dump if cancelled (#29138)
---
 be/src/pipeline/exec/analytic_source_operator.cpp  |  3 --
 be/src/pipeline/pipeline_x/dependency.h            | 32 ++++++++--------------
 .../local_exchange_source_operator.cpp             |  2 +-
 be/src/pipeline/pipeline_x/operator.cpp            |  6 ----
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp     |  1 +
 be/src/vec/exec/vaggregation_node.cpp              |  3 --
 be/src/vec/exec/vanalytic_eval_node.cpp            |  3 --
 be/src/vec/exprs/vectorized_agg_fn.cpp             |  2 --
 be/src/vec/exprs/vectorized_agg_fn.h               |  2 --
 9 files changed, 13 insertions(+), 41 deletions(-)

diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp 
b/be/src/pipeline/exec/analytic_source_operator.cpp
index 3c99d5bbdd1..a95c2a1225a 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -559,9 +559,6 @@ Status AnalyticLocalState::close(RuntimeState* state) {
     if (_closed) {
         return Status::OK();
     }
-    for (auto* agg_function : _agg_functions) {
-        agg_function->close(state);
-    }
 
     static_cast<void>(_destroy_agg_status());
     _agg_arena_pool = nullptr;
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 5b57f48dae8..116a9f3e87c 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -59,9 +59,6 @@ struct BasicSharedState {
     Dependency* source_dep = nullptr;
     Dependency* sink_dep = nullptr;
 
-    std::atomic<int> ref_count = 0;
-
-    void ref() { ref_count++; }
     virtual Status close(RuntimeState* state) { return Status::OK(); }
     virtual ~BasicSharedState() = default;
 };
@@ -296,24 +293,17 @@ public:
         agg_data = std::make_unique<vectorized::AggregatedDataVariants>();
         agg_arena_pool = std::make_unique<vectorized::Arena>();
     }
-    ~AggSharedState() override = default;
+    ~AggSharedState() override {
+        if (probe_expr_ctxs.empty()) {
+            _close_without_key();
+        } else {
+            _close_with_serialized_key();
+        }
+    }
     void init_spill_partition_helper(size_t spill_partition_count_bits) {
         spill_partition_helper =
                 
std::make_unique<vectorized::SpillPartitionHelper>(spill_partition_count_bits);
     }
-    Status close(RuntimeState* state) override {
-        if (ref_count.fetch_sub(1) == 1) {
-            for (auto* aggregate_evaluator : aggregate_evaluators) {
-                aggregate_evaluator->close(state);
-            }
-            if (probe_expr_ctxs.empty()) {
-                _close_without_key();
-            } else {
-                _close_with_serialized_key();
-            }
-        }
-        return Status::OK();
-    }
 
     vectorized::AggregatedDataVariantsUPtr agg_data = nullptr;
     std::unique_ptr<vectorized::AggregateDataContainer> 
aggregate_data_container;
@@ -620,25 +610,25 @@ struct LocalExchangeSharedState : public BasicSharedState 
{
 public:
     ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
     std::unique_ptr<Exchanger> exchanger {};
-    std::vector<Dependency*> source_dependencies;
+    std::vector<DependencySPtr> source_dependencies;
     Dependency* sink_dependency;
     std::vector<MemTracker*> mem_trackers;
     std::atomic<size_t> mem_usage = 0;
     std::mutex le_lock;
     void sub_running_sink_operators();
     void _set_ready_for_read() {
-        for (auto* dep : source_dependencies) {
+        for (auto& dep : source_dependencies) {
             DCHECK(dep);
             dep->set_ready();
         }
     }
 
-    void set_dep_by_channel_id(Dependency* dep, int channel_id) {
+    void set_dep_by_channel_id(DependencySPtr dep, int channel_id) {
         source_dependencies[channel_id] = dep;
     }
 
     void set_ready_to_read(int channel_id) {
-        auto* dep = source_dependencies[channel_id];
+        auto& dep = source_dependencies[channel_id];
         DCHECK(dep) << channel_id;
         dep->set_ready();
     }
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
index c1933f93c71..029dcb15a48 100644
--- 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
+++ 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
@@ -37,7 +37,7 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* 
state, LocalStateInfo&
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
     _channel_id = info.task_idx;
-    _shared_state->set_dep_by_channel_id(_dependency, _channel_id);
+    _shared_state->set_dep_by_channel_id(info.dependency, _channel_id);
     _shared_state->mem_trackers[_channel_id] = _mem_tracker.get();
     _exchanger = _shared_state->exchanger.get();
     DCHECK(_exchanger != nullptr);
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index e763a6eee1d..c5dbd34619f 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -335,7 +335,6 @@ Status 
PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalState
             
_dependency->set_shared_state(info.le_state_map[_parent->operator_id()].first);
             _shared_state =
                     (typename 
DependencyType::SharedState*)_dependency->shared_state().get();
-            _shared_state->ref();
 
             _shared_state->source_dep = _dependency;
             _shared_state->sink_dep = deps.front().get();
@@ -343,7 +342,6 @@ Status 
PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalState
             _dependency->set_shared_state(deps.front()->shared_state());
             _shared_state =
                     (typename 
DependencyType::SharedState*)_dependency->shared_state().get();
-            _shared_state->ref();
 
             _shared_state->source_dep = _dependency;
             _shared_state->sink_dep = deps.front().get();
@@ -419,10 +417,6 @@ Status 
PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
             _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
                     _profile, "WaitForDependency[" + _dependency->name() + 
"]Time", 1);
         }
-        if constexpr (!is_fake_shared) {
-            _shared_state->ref();
-        }
-
     } else {
         auto& deps = info.dependencys;
         deps.front() = std::make_shared<FakeDependency>(0, 0, 
state->get_query_ctx());
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 1a0c0749e8b..4c0c0af1cd4 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -300,6 +300,7 @@ void PipelineXTask::finalize() {
     _finished = true;
     std::vector<DependencySPtr> {}.swap(_downstream_dependency);
     DependencyMap {}.swap(_upstream_dependency);
+    std::map<int, DependencySPtr> {}.swap(_source_dependency);
 
     _le_state_map.clear();
 }
diff --git a/be/src/vec/exec/vaggregation_node.cpp 
b/be/src/vec/exec/vaggregation_node.cpp
index 69a50ba891b..020763ecff7 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -534,9 +534,6 @@ Status AggregationNode::sink(doris::RuntimeState* state, 
vectorized::Block* in_b
 }
 
 void AggregationNode::release_resource(RuntimeState* state) {
-    for (auto* aggregate_evaluator : _aggregate_evaluators) {
-        aggregate_evaluator->close(state);
-    }
     if (_executor.close) {
         _executor.close();
     }
diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp 
b/be/src/vec/exec/vanalytic_eval_node.cpp
index f279352dd90..918d86fb753 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -292,9 +292,6 @@ void VAnalyticEvalNode::release_resource(RuntimeState* 
state) {
     if (is_closed()) {
         return;
     }
-    for (auto* agg_function : _agg_functions) {
-        agg_function->close(state);
-    }
 
     static_cast<void>(_destroy_agg_status());
     _release_mem();
diff --git a/be/src/vec/exprs/vectorized_agg_fn.cpp 
b/be/src/vec/exprs/vectorized_agg_fn.cpp
index 0e63838e87e..166ad9bc2b2 100644
--- a/be/src/vec/exprs/vectorized_agg_fn.cpp
+++ b/be/src/vec/exprs/vectorized_agg_fn.cpp
@@ -216,8 +216,6 @@ Status AggFnEvaluator::open(RuntimeState* state) {
     return VExpr::open(_input_exprs_ctxs, state);
 }
 
-void AggFnEvaluator::close(RuntimeState* state) {}
-
 void AggFnEvaluator::create(AggregateDataPtr place) {
     _function->create(place);
 }
diff --git a/be/src/vec/exprs/vectorized_agg_fn.h 
b/be/src/vec/exprs/vectorized_agg_fn.h
index 3cabd275614..546b939ddf4 100644
--- a/be/src/vec/exprs/vectorized_agg_fn.h
+++ b/be/src/vec/exprs/vectorized_agg_fn.h
@@ -63,8 +63,6 @@ public:
 
     Status open(RuntimeState* state);
 
-    void close(RuntimeState* state);
-
     // create/destroy AGG Data
     void create(AggregateDataPtr place);
     void destroy(AggregateDataPtr place);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to