This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch branch-3.0.2-tmp
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0.2-tmp by this push:
     new f7d95de0bfe [Fix](local merge) Fix local exchange dependencies 
acquired by local … (#39386)
f7d95de0bfe is described below

commit f7d95de0bfec0013e93649620fd212c8ead82d49
Author: Gabriel <[email protected]>
AuthorDate: Mon Aug 19 10:50:29 2024 +0800

    [Fix](local merge) Fix local exchange dependencies acquired by local … 
(#39386)
    
    …merge (#39238)
---
 be/src/pipeline/dependency.h                            | 17 +++++++++++------
 be/src/pipeline/exec/operator.cpp                       |  7 ++-----
 .../local_exchange/local_exchange_source_operator.cpp   | 17 +++++++++++++----
 be/src/pipeline/pipeline_fragment_context.cpp           |  2 +-
 4 files changed, 27 insertions(+), 16 deletions(-)

diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 36f06b91095..957a6ca8bd3 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -817,9 +817,9 @@ public:
     std::atomic<int64_t> mem_usage = 0;
     // We need to make sure to add mem_usage first and then enqueue, otherwise 
sub mem_usage may cause negative mem_usage during concurrent dequeue.
     std::mutex le_lock;
-    virtual void create_dependencies(int operator_id, int node_id) {
+    virtual void create_dependencies(int local_exchange_id) {
         for (auto& source_dep : source_deps) {
-            source_dep = std::make_shared<Dependency>(operator_id, node_id,
+            source_dep = std::make_shared<Dependency>(local_exchange_id, 
local_exchange_id,
                                                       
"LOCAL_EXCHANGE_OPERATOR_DEPENDENCY");
             source_dep->set_shared_state(this);
         }
@@ -874,6 +874,7 @@ public:
 };
 
 struct LocalMergeExchangeSharedState : public LocalExchangeSharedState {
+    ENABLE_FACTORY_CREATOR(LocalMergeExchangeSharedState);
     LocalMergeExchangeSharedState(int num_instances)
             : LocalExchangeSharedState(num_instances),
               _queues_mem_usage(num_instances),
@@ -883,14 +884,18 @@ struct LocalMergeExchangeSharedState : public 
LocalExchangeSharedState {
         }
     }
 
-    void create_dependencies(int operator_id, int node_id) override {
+    void create_dependencies(int local_exchange_id) override {
         sink_deps.resize(source_deps.size());
+        std::vector<DependencySPtr> new_deps(sink_deps.size(), nullptr);
+        source_deps.swap(new_deps);
         for (size_t i = 0; i < source_deps.size(); i++) {
-            source_deps[i] = std::make_shared<Dependency>(operator_id, node_id,
-                                                          
"LOCAL_EXCHANGE_OPERATOR_DEPENDENCY");
+            source_deps[i] =
+                    std::make_shared<Dependency>(local_exchange_id, 
local_exchange_id,
+                                                 
"LOCAL_MERGE_EXCHANGE_OPERATOR_DEPENDENCY");
             source_deps[i]->set_shared_state(this);
             sink_deps[i] = std::make_shared<Dependency>(
-                    operator_id, node_id, 
"LOCAL_EXCHANGE_OPERATOR_SINK_DEPENDENCY", true);
+                    local_exchange_id, local_exchange_id,
+                    "LOCAL_MERGE_EXCHANGE_OPERATOR_SINK_DEPENDENCY", true);
             sink_deps[i]->set_shared_state(this);
         }
     }
diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index 1e00b9fcbcb..5f9a904abf2 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -452,10 +452,7 @@ Status 
PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
             DCHECK(info.le_state_map.find(_parent->operator_id()) != 
info.le_state_map.end());
             _shared_state = 
info.le_state_map.at(_parent->operator_id()).first.get();
 
-            auto deps = _shared_state->get_dep_by_channel_id(info.task_idx);
-            if (deps.size() == 1) {
-                _dependency = deps.front().get();
-            }
+            _dependency = 
_shared_state->get_dep_by_channel_id(info.task_idx).front().get();
             _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
                     _runtime_profile, "WaitForDependency[" + 
_dependency->name() + "]Time", 1);
         } else if (info.shared_state) {
@@ -541,7 +538,7 @@ Status 
PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink
         if constexpr (std::is_same_v<LocalExchangeSharedState, SharedState>) {
             DCHECK(info.le_state_map.find(_parent->dests_id().front()) != 
info.le_state_map.end());
             _dependency = 
info.le_state_map.at(_parent->dests_id().front()).second.get();
-            _shared_state = (SharedState*)_dependency->shared_state();
+            _shared_state = _dependency->shared_state()->template 
cast<SharedState>();
         } else {
             _shared_state = info.shared_state->template cast<SharedState>();
             _dependency = _shared_state->create_sink_dependency(
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
index 0d88545b7e6..0cffe125a1f 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
@@ -55,15 +55,24 @@ Status LocalExchangeSourceLocalState::close(RuntimeState* 
state) {
 }
 
 std::vector<Dependency*> LocalExchangeSourceLocalState::dependencies() const {
-    auto deps = Base::dependencies();
-    auto le_deps = _shared_state->get_dep_by_channel_id(_channel_id);
-    if (le_deps.size() > 1) {
+    if (_exchanger->get_type() == ExchangeType::LOCAL_MERGE_SORT && 
_channel_id == 0) {
+        // If this is a local merge exchange, source operator is runnable only 
if all sink operators
+        // set dependencies ready
+        std::vector<Dependency*> deps;
+        auto le_deps = _shared_state->get_dep_by_channel_id(_channel_id);
+        DCHECK_GT(le_deps.size(), 1);
         // If this is a local merge exchange, we should use all dependencies 
here.
         for (auto& dep : le_deps) {
             deps.push_back(dep.get());
         }
+        return deps;
+    } else if (_exchanger->get_type() == ExchangeType::LOCAL_MERGE_SORT && 
_channel_id != 0) {
+        // If this is a local merge exchange and is not the first task, source 
operators always
+        // return empty result so no dependencies here.
+        return {};
+    } else {
+        return Base::dependencies();
     }
-    return deps;
 }
 
 std::string LocalExchangeSourceLocalState::debug_string(int indentation_level) 
const {
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 0ee5da26922..7eed3fbae13 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -801,7 +801,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
     }
     operator_xs.insert(operator_xs.begin(), source_op);
 
-    shared_state->create_dependencies(source_op->operator_id(), 
source_op->node_id());
+    shared_state->create_dependencies(local_exchange_id);
 
     // 5. Set children for two pipelines separately.
     std::vector<std::shared_ptr<Pipeline>> new_children;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to