This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fe5c4e3b465 [pipelineX](api) Fix core dump for pipelineX API (#27437)
fe5c4e3b465 is described below

commit fe5c4e3b465ff4055d979ba2624c86526428fa45
Author: Gabriel <[email protected]>
AuthorDate: Thu Nov 23 00:24:32 2023 +0800

    [pipelineX](api) Fix core dump for pipelineX API (#27437)
---
 be/src/pipeline/exec/operator.h                    | 10 --------
 .../pipeline/exec/partition_sort_sink_operator.cpp |  2 --
 .../pipeline/exec/partition_sort_sink_operator.h   |  2 --
 .../exec/partition_sort_source_operator.cpp        |  2 --
 .../pipeline/exec/partition_sort_source_operator.h |  4 +---
 be/src/pipeline/pipeline_task.cpp                  | 13 ----------
 be/src/pipeline/pipeline_task.h                    |  5 +---
 be/src/pipeline/pipeline_x/operator.h              |  4 ----
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  2 +-
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp     | 25 +++++++++----------
 be/src/pipeline/pipeline_x/pipeline_x_task.h       | 13 ++++------
 be/src/pipeline/task_scheduler.cpp                 | 28 ++++++++--------------
 12 files changed, 31 insertions(+), 79 deletions(-)

diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 5362be88e89..d7330648757 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -241,12 +241,6 @@ public:
     virtual Status sink(RuntimeState* state, vectorized::Block* block,
                         SourceState source_state) = 0;
 
-    virtual Status finalize(RuntimeState* state) {
-        std::stringstream error_msg;
-        error_msg << " not a sink, can not finalize";
-        return Status::NotSupported(error_msg.str());
-    }
-
     /**
      * pending_finish means we have called `close` and there are still some 
work to do before finishing.
      * Now it is a pull-based pipeline and operators pull data from its child 
by this method.
@@ -323,8 +317,6 @@ public:
         return Status::OK();
     }
 
-    Status finalize(RuntimeState* state) override { return Status::OK(); }
-
     [[nodiscard]] RuntimeProfile* get_runtime_profile() const override { 
return _sink->profile(); }
     void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) 
override {
         _sink->set_query_statistics(statistics);
@@ -391,8 +383,6 @@ public:
         return Status::OK();
     }
 
-    Status finalize(RuntimeState* state) override { return Status::OK(); }
-
     bool can_read() override { return _node->can_read(); }
 
     [[nodiscard]] RuntimeProfile* get_runtime_profile() const override {
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp 
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index d7a2bc4c920..327d186d77b 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -42,8 +42,6 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo
     _agg_arena_pool = std::make_unique<vectorized::Arena>();
     _hash_table_size_counter = ADD_COUNTER(_profile, "HashTableSize", 
TUnit::UNIT);
     _build_timer = ADD_TIMER(_profile, "HashTableBuildTime");
-    _partition_sort_timer = ADD_TIMER(_profile, "PartitionSortTime");
-    _get_sorted_timer = ADD_TIMER(_profile, "GetSortedTime");
     _selector_block_timer = ADD_TIMER(_profile, "SelectorBlockTime");
     _emplace_key_timer = ADD_TIMER(_profile, "EmplaceKeyTime");
     _init_hash_method();
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h 
b/be/src/pipeline/exec/partition_sort_sink_operator.h
index dbfb7ad5d0e..62bf7c16b43 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.h
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.h
@@ -84,8 +84,6 @@ private:
 
     RuntimeProfile::Counter* _build_timer;
     RuntimeProfile::Counter* _emplace_key_timer;
-    RuntimeProfile::Counter* _partition_sort_timer;
-    RuntimeProfile::Counter* _get_sorted_timer;
     RuntimeProfile::Counter* _selector_block_timer;
 
     RuntimeProfile::Counter* _hash_table_size_counter;
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp 
b/be/src/pipeline/exec/partition_sort_source_operator.cpp
index 0a2f28b19f3..56db3852bc2 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp
@@ -33,7 +33,6 @@ Status PartitionSortSourceLocalState::init(RuntimeState* 
state, LocalStateInfo&
     
RETURN_IF_ERROR(PipelineXLocalState<PartitionSortSourceDependency>::init(state, 
info));
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
-    _get_next_timer = ADD_TIMER(profile(), "GetResultTime");
     _get_sorted_timer = ADD_TIMER(profile(), "GetSortedTime");
     _shared_state->previous_row = 
std::make_unique<vectorized::SortCursorCmp>();
     return Status::OK();
@@ -44,7 +43,6 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* 
state, vectorized::
     RETURN_IF_CANCELLED(state);
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
-    SCOPED_TIMER(local_state._get_next_timer);
     output_block->clear_column_data();
     {
         std::lock_guard<std::mutex> 
lock(local_state._shared_state->buffer_mutex);
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.h 
b/be/src/pipeline/exec/partition_sort_source_operator.h
index df22bde6365..719f6c13889 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.h
+++ b/be/src/pipeline/exec/partition_sort_source_operator.h
@@ -83,15 +83,13 @@ public:
     using Base = PipelineXLocalState<PartitionSortSourceDependency>;
     PartitionSortSourceLocalState(RuntimeState* state, OperatorXBase* parent)
             : PipelineXLocalState<PartitionSortSourceDependency>(state, 
parent),
-              _get_sorted_timer(nullptr),
-              _get_next_timer(nullptr) {}
+              _get_sorted_timer(nullptr) {}
 
     Status init(RuntimeState* state, LocalStateInfo& info) override;
 
 private:
     friend class PartitionSortSourceOperatorX;
     RuntimeProfile::Counter* _get_sorted_timer;
-    RuntimeProfile::Counter* _get_next_timer;
     std::atomic<int> _sort_idx = 0;
 };
 
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 72552b48468..7d9900f637c 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -113,7 +113,6 @@ void PipelineTask::_init_profile() {
     _get_block_timer = ADD_CHILD_TIMER(_task_profile, "GetBlockTime", 
exec_time);
     _get_block_counter = ADD_COUNTER(_task_profile, "GetBlockCounter", 
TUnit::UNIT);
     _sink_timer = ADD_CHILD_TIMER(_task_profile, "SinkTime", exec_time);
-    _finalize_timer = ADD_CHILD_TIMER(_task_profile, "FinalizeTime", 
exec_time);
     _close_timer = ADD_CHILD_TIMER(_task_profile, "CloseTime", exec_time);
 
     _wait_source_timer = ADD_TIMER(_task_profile, "WaitSourceTime");
@@ -320,18 +319,6 @@ Status PipelineTask::execute(bool* eos) {
     return Status::OK();
 }
 
-Status PipelineTask::finalize() {
-    SCOPED_TIMER(_task_profile->total_time_counter());
-    SCOPED_CPU_TIMER(_task_cpu_timer);
-    Defer defer {[&]() {
-        if (_task_queue) {
-            _task_queue->update_statistics(this, _finalize_timer->value());
-        }
-    }};
-    SCOPED_TIMER(_finalize_timer);
-    return _sink->finalize(_state);
-}
-
 Status PipelineTask::_collect_query_statistics() {
     // The execnode tree of a fragment will be split into multiple pipelines, 
we only need to collect the root pipeline.
     if (_pipeline->is_root_pipeline()) {
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 0b26c9d215d..4e9c2f8cc9e 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -172,7 +172,7 @@ public:
 
     virtual bool sink_can_write() { return _sink->can_write() || 
_pipeline->_always_can_write; }
 
-    virtual Status finalize();
+    virtual void finalize() {}
 
     PipelineFragmentContext* fragment_context() { return _fragment_context; }
 
@@ -193,8 +193,6 @@ public:
         _previous_schedule_id = id;
     }
 
-    virtual void release_dependency() {}
-
     bool has_dependency();
 
     OperatorPtr get_root() { return _root; }
@@ -315,7 +313,6 @@ protected:
     RuntimeProfile::Counter* _get_block_timer;
     RuntimeProfile::Counter* _get_block_counter;
     RuntimeProfile::Counter* _sink_timer;
-    RuntimeProfile::Counter* _finalize_timer;
     RuntimeProfile::Counter* _close_timer;
     RuntimeProfile::Counter* _block_counts;
     RuntimeProfile::Counter* _block_by_source_counts;
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index 943d8c4670b..3f9548099a2 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -182,8 +182,6 @@ public:
 
     Status open(RuntimeState* state) override;
 
-    Status finalize(RuntimeState* state) override { return Status::OK(); }
-
     [[nodiscard]] bool can_terminate_early() override { return false; }
 
     [[nodiscard]] virtual bool can_terminate_early(RuntimeState* state) { 
return false; }
@@ -518,8 +516,6 @@ public:
 
     [[nodiscard]] std::string get_name() const override { return _name; }
 
-    Status finalize(RuntimeState* state) override { return Status::OK(); }
-
     virtual bool should_dry_run(RuntimeState* state) { return false; }
 
 protected:
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index e47b55e4914..e832124bf88 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -1044,7 +1044,7 @@ std::string PipelineXFragmentContext::debug_string() {
     for (size_t j = 0; j < _tasks.size(); j++) {
         fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
         for (size_t i = 0; i < _tasks[j].size(); i++) {
-            if (_tasks[j][i]->get_state() == PipelineTaskState::FINISHED) {
+            if (_tasks[j][i]->is_finished()) {
                 continue;
             }
             fmt::format_to(debug_string_buffer, "Task {}: {}\n", i, 
_tasks[j][i]->debug_string());
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 703e4862f7c..110faba5ca1 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -156,7 +156,6 @@ void PipelineXTask::_init_profile() {
     _get_block_timer = ADD_CHILD_TIMER(_task_profile, "GetBlockTime", 
exec_time);
     _get_block_counter = ADD_COUNTER(_task_profile, "GetBlockCounter", 
TUnit::UNIT);
     _sink_timer = ADD_CHILD_TIMER(_task_profile, "SinkTime", exec_time);
-    _finalize_timer = ADD_CHILD_TIMER(_task_profile, "FinalizeTime", 
exec_time);
     _close_timer = ADD_CHILD_TIMER(_task_profile, "CloseTime", exec_time);
 
     _wait_bf_timer = ADD_TIMER(_task_profile, "WaitBfTime");
@@ -289,16 +288,14 @@ Status PipelineXTask::execute(bool* eos) {
     return Status::OK();
 }
 
-Status PipelineXTask::finalize() {
-    SCOPED_TIMER(_task_profile->total_time_counter());
-    SCOPED_CPU_TIMER(_task_cpu_timer);
-    Defer defer {[&]() {
-        if (_task_queue) {
-            _task_queue->update_statistics(this, _finalize_timer->value());
-        }
-    }};
-    SCOPED_TIMER(_finalize_timer);
-    return _sink->finalize(_state);
+void PipelineXTask::finalize() {
+    PipelineTask::finalize();
+    std::unique_lock<std::mutex> lc(_release_lock);
+    _finished = true;
+    std::vector<DependencySPtr> {}.swap(_downstream_dependency);
+    DependencyMap {}.swap(_upstream_dependency);
+
+    _local_exchange_state = nullptr;
 }
 
 Status PipelineXTask::try_close(Status exec_status) {
@@ -338,6 +335,10 @@ Status PipelineXTask::close(Status exec_status) {
 }
 
 std::string PipelineXTask::debug_string() {
+    std::unique_lock<std::mutex> lc(_release_lock);
+    if (_finished) {
+        return "ALREADY FINISHED";
+    }
     fmt::memory_buffer debug_string_buffer;
 
     fmt::format_to(debug_string_buffer, "QueryId: {}\n", 
print_id(query_context()->query_id()));
@@ -374,7 +375,7 @@ std::string PipelineXTask::debug_string() {
     fmt::format_to(debug_string_buffer, "Finish Dependency Information: \n");
     for (size_t j = 0; j < _finish_dependencies.size(); j++, i++) {
         fmt::format_to(debug_string_buffer, "{}. {}\n", i,
-                       _finish_dependencies[i]->debug_string(j + 1));
+                       _finish_dependencies[j]->debug_string(j + 1));
     }
     return fmt::to_string(debug_string_buffer);
 }
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h 
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 46d006d0dac..621773b2ac1 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -84,7 +84,9 @@ public:
 
     bool sink_can_write() override { return _write_blocked_dependency() == 
nullptr; }
 
-    Status finalize() override;
+    void finalize() override;
+
+    bool is_finished() const { return _finished.load(); }
 
     std::string debug_string() override;
 
@@ -103,13 +105,6 @@ public:
         }
     }
 
-    void release_dependency() override {
-        std::vector<DependencySPtr> {}.swap(_downstream_dependency);
-        DependencyMap {}.swap(_upstream_dependency);
-
-        _local_exchange_state = nullptr;
-    }
-
     std::vector<DependencySPtr>& get_upstream_dependency(int id) {
         if (_upstream_dependency.find(id) == _upstream_dependency.end()) {
             _upstream_dependency.insert({id, {DependencySPtr {}}});
@@ -212,6 +207,8 @@ private:
     Dependency* _blocked_dep {nullptr};
 
     std::atomic<bool> _use_blocking_queue {true};
+    std::atomic<bool> _finished {false};
+    std::mutex _release_lock;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index 5470ba22591..cc6e502fb5e 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -313,23 +313,15 @@ void TaskScheduler::_do_work(size_t index) {
             task->set_eos_time();
             // TODO: pipeline parallel need to wait the last task finish to 
call finalize
             //  and find_p_dependency
-            status = task->finalize();
-            if (!status.ok()) {
-                // execute failed,cancel all fragment
-                fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
-                                     "finalize fail:" + status.msg());
-            } else {
-                VLOG_DEBUG << fmt::format(
-                        "Try close task: {}, fragment_ctx->is_canceled(): {}",
-                        PrintInstanceStandardInfo(
-                                task->query_context()->query_id(),
-                                
task->fragment_context()->get_fragment_instance_id()),
-                        fragment_ctx->is_canceled());
-                _try_close_task(task,
-                                fragment_ctx->is_canceled() ? 
PipelineTaskState::CANCELED
-                                                            : 
PipelineTaskState::FINISHED,
-                                status);
-            }
+            VLOG_DEBUG << fmt::format(
+                    "Try close task: {}, fragment_ctx->is_canceled(): {}",
+                    
PrintInstanceStandardInfo(task->query_context()->query_id(),
+                                              
task->fragment_context()->get_fragment_instance_id()),
+                    fragment_ctx->is_canceled());
+            _try_close_task(task,
+                            fragment_ctx->is_canceled() ? 
PipelineTaskState::CANCELED
+                                                        : 
PipelineTaskState::FINISHED,
+                            status);
             VLOG_DEBUG << fmt::format(
                     "Task {} is eos, status {}.",
                     
PrintInstanceStandardInfo(task->query_context()->query_id(),
@@ -388,7 +380,7 @@ void TaskScheduler::_try_close_task(PipelineTask* task, 
PipelineTaskState state,
     }
     task->set_state(state);
     task->set_close_pipeline_time();
-    task->release_dependency();
+    task->finalize();
     task->set_running(false);
     // close_a_pipeline may delete fragment context and will core in some defer
     // code, because the defer code will access fragment context it self.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to