This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch branch-3.0.2-tmp
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0.2-tmp by this push:
new f7d95de0bfe [Fix](local merge) Fix local exchange dependencies
acquired by local … (#39386)
f7d95de0bfe is described below
commit f7d95de0bfec0013e93649620fd212c8ead82d49
Author: Gabriel <[email protected]>
AuthorDate: Mon Aug 19 10:50:29 2024 +0800
[Fix](local merge) Fix local exchange dependencies acquired by local …
(#39386)
…merge (#39238)
---
be/src/pipeline/dependency.h | 17 +++++++++++------
be/src/pipeline/exec/operator.cpp | 7 ++-----
.../local_exchange/local_exchange_source_operator.cpp | 17 +++++++++++++----
be/src/pipeline/pipeline_fragment_context.cpp | 2 +-
4 files changed, 27 insertions(+), 16 deletions(-)
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 36f06b91095..957a6ca8bd3 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -817,9 +817,9 @@ public:
std::atomic<int64_t> mem_usage = 0;
// We need to make sure to add mem_usage first and then enqueue, otherwise
sub mem_usage may cause negative mem_usage during concurrent dequeue.
std::mutex le_lock;
- virtual void create_dependencies(int operator_id, int node_id) {
+ virtual void create_dependencies(int local_exchange_id) {
for (auto& source_dep : source_deps) {
- source_dep = std::make_shared<Dependency>(operator_id, node_id,
+ source_dep = std::make_shared<Dependency>(local_exchange_id,
local_exchange_id,
"LOCAL_EXCHANGE_OPERATOR_DEPENDENCY");
source_dep->set_shared_state(this);
}
@@ -874,6 +874,7 @@ public:
};
struct LocalMergeExchangeSharedState : public LocalExchangeSharedState {
+ ENABLE_FACTORY_CREATOR(LocalMergeExchangeSharedState);
LocalMergeExchangeSharedState(int num_instances)
: LocalExchangeSharedState(num_instances),
_queues_mem_usage(num_instances),
@@ -883,14 +884,18 @@ struct LocalMergeExchangeSharedState : public
LocalExchangeSharedState {
}
}
- void create_dependencies(int operator_id, int node_id) override {
+ void create_dependencies(int local_exchange_id) override {
sink_deps.resize(source_deps.size());
+ std::vector<DependencySPtr> new_deps(sink_deps.size(), nullptr);
+ source_deps.swap(new_deps);
for (size_t i = 0; i < source_deps.size(); i++) {
- source_deps[i] = std::make_shared<Dependency>(operator_id, node_id,
-
"LOCAL_EXCHANGE_OPERATOR_DEPENDENCY");
+ source_deps[i] =
+ std::make_shared<Dependency>(local_exchange_id,
local_exchange_id,
+
"LOCAL_MERGE_EXCHANGE_OPERATOR_DEPENDENCY");
source_deps[i]->set_shared_state(this);
sink_deps[i] = std::make_shared<Dependency>(
- operator_id, node_id,
"LOCAL_EXCHANGE_OPERATOR_SINK_DEPENDENCY", true);
+ local_exchange_id, local_exchange_id,
+ "LOCAL_MERGE_EXCHANGE_OPERATOR_SINK_DEPENDENCY", true);
sink_deps[i]->set_shared_state(this);
}
}
diff --git a/be/src/pipeline/exec/operator.cpp
b/be/src/pipeline/exec/operator.cpp
index 1e00b9fcbcb..5f9a904abf2 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -452,10 +452,7 @@ Status
PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
DCHECK(info.le_state_map.find(_parent->operator_id()) !=
info.le_state_map.end());
_shared_state =
info.le_state_map.at(_parent->operator_id()).first.get();
- auto deps = _shared_state->get_dep_by_channel_id(info.task_idx);
- if (deps.size() == 1) {
- _dependency = deps.front().get();
- }
+ _dependency =
_shared_state->get_dep_by_channel_id(info.task_idx).front().get();
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_runtime_profile, "WaitForDependency[" +
_dependency->name() + "]Time", 1);
} else if (info.shared_state) {
@@ -541,7 +538,7 @@ Status
PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink
if constexpr (std::is_same_v<LocalExchangeSharedState, SharedState>) {
DCHECK(info.le_state_map.find(_parent->dests_id().front()) !=
info.le_state_map.end());
_dependency =
info.le_state_map.at(_parent->dests_id().front()).second.get();
- _shared_state = (SharedState*)_dependency->shared_state();
+ _shared_state = _dependency->shared_state()->template
cast<SharedState>();
} else {
_shared_state = info.shared_state->template cast<SharedState>();
_dependency = _shared_state->create_sink_dependency(
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
index 0d88545b7e6..0cffe125a1f 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
@@ -55,15 +55,24 @@ Status LocalExchangeSourceLocalState::close(RuntimeState*
state) {
}
std::vector<Dependency*> LocalExchangeSourceLocalState::dependencies() const {
- auto deps = Base::dependencies();
- auto le_deps = _shared_state->get_dep_by_channel_id(_channel_id);
- if (le_deps.size() > 1) {
+ if (_exchanger->get_type() == ExchangeType::LOCAL_MERGE_SORT &&
_channel_id == 0) {
+ // If this is a local merge exchange, source operator is runnable only
if all sink operators
+ // set dependencies ready
+ std::vector<Dependency*> deps;
+ auto le_deps = _shared_state->get_dep_by_channel_id(_channel_id);
+ DCHECK_GT(le_deps.size(), 1);
// If this is a local merge exchange, we should use all dependencies
here.
for (auto& dep : le_deps) {
deps.push_back(dep.get());
}
+ return deps;
+ } else if (_exchanger->get_type() == ExchangeType::LOCAL_MERGE_SORT &&
_channel_id != 0) {
+ // If this is a local merge exchange and is not the first task, source
operators always
+ // return empty result so no dependencies here.
+ return {};
+ } else {
+ return Base::dependencies();
}
- return deps;
}
std::string LocalExchangeSourceLocalState::debug_string(int indentation_level)
const {
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 0ee5da26922..7eed3fbae13 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -801,7 +801,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
}
operator_xs.insert(operator_xs.begin(), source_op);
- shared_state->create_dependencies(source_op->operator_id(),
source_op->node_id());
+ shared_state->create_dependencies(local_exchange_id);
// 5. Set children for two pipelines separately.
std::vector<std::shared_ptr<Pipeline>> new_children;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]