This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ba882dea21 [pipelineX](dependency) Build DAG between pipelines (#23355)
ba882dea21 is described below
commit ba882dea2170c0d01ebdcb5ea7932c395cb0880e
Author: Gabriel <[email protected]>
AuthorDate: Wed Aug 23 13:21:32 2023 +0800
[pipelineX](dependency) Build DAG between pipelines (#23355)
---
be/src/pipeline/exec/aggregation_sink_operator.cpp | 11 ----
be/src/pipeline/exec/aggregation_sink_operator.h | 1 -
.../pipeline/exec/aggregation_source_operator.cpp | 14 ++++-
be/src/pipeline/exec/aggregation_source_operator.h | 1 +
be/src/pipeline/pipeline.h | 2 +
.../pipeline_x/pipeline_x_fragment_context.cpp | 63 +++++++++++++++++++---
.../pipeline_x/pipeline_x_fragment_context.h | 3 ++
be/src/vec/common/sort/vsort_exec_exprs.cpp | 1 +
8 files changed, 75 insertions(+), 21 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 6db0162325..854a4ed122 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -56,7 +56,6 @@ AggSinkLocalState::AggSinkLocalState(DataSinkOperatorX*
parent, RuntimeState* st
_merge_timer(nullptr),
_serialize_data_timer(nullptr),
_deserialize_data_timer(nullptr),
- _hash_table_size_counter(nullptr),
_max_row_size_counter(nullptr) {}
Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
@@ -96,7 +95,6 @@ Status AggSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info) {
_deserialize_data_timer = ADD_TIMER(profile(), "DeserializeAndMergeTime");
_hash_table_compute_timer = ADD_TIMER(profile(), "HashTableComputeTime");
_hash_table_emplace_timer = ADD_TIMER(profile(), "HashTableEmplaceTime");
- _hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize",
TUnit::UNIT);
_hash_table_input_counter = ADD_COUNTER(profile(), "HashTableInputCount",
TUnit::UNIT);
_max_row_size_counter = ADD_COUNTER(profile(), "MaxRowSizeInBytes",
TUnit::UNIT);
COUNTER_SET(_max_row_size_counter, (int64_t)0);
@@ -863,15 +861,6 @@ Status AggSinkOperatorX::setup_local_state(RuntimeState*
state, LocalSinkStateIn
Status AggSinkOperatorX::close(RuntimeState* state) {
auto& local_state =
state->get_sink_local_state(id())->cast<AggSinkLocalState>();
- /// _hash_table_size_counter may be null if prepare failed.
- if (local_state._hash_table_size_counter) {
- std::visit(
- [&](auto&& agg_method) {
- COUNTER_SET(local_state._hash_table_size_counter,
- int64_t(agg_method.data.size()));
- },
- local_state._agg_data->method_variant);
- }
local_state._preagg_block.clear();
vectorized::PODArray<vectorized::AggregateDataPtr> tmp_places;
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 72136cd3f3..34267eb270 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -279,7 +279,6 @@ protected:
RuntimeProfile::Counter* _merge_timer;
RuntimeProfile::Counter* _serialize_data_timer;
RuntimeProfile::Counter* _deserialize_data_timer;
- RuntimeProfile::Counter* _hash_table_size_counter;
RuntimeProfile::Counter* _max_row_size_counter;
RuntimeProfile::Counter* _memory_usage_counter;
RuntimeProfile::Counter* _hash_table_memory_usage;
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index 5dee84b216..1c9d9864d7 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -33,7 +33,8 @@ AggLocalState::AggLocalState(RuntimeState* state,
OperatorXBase* parent)
_serialize_result_timer(nullptr),
_hash_table_iterate_timer(nullptr),
_insert_keys_to_column_timer(nullptr),
- _serialize_data_timer(nullptr) {}
+ _serialize_data_timer(nullptr),
+ _hash_table_size_counter(nullptr) {}
Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(PipelineXLocalState::init(state, info));
@@ -45,6 +46,7 @@ Status AggLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
_hash_table_iterate_timer = ADD_TIMER(profile(), "HashTableIterateTime");
_insert_keys_to_column_timer = ADD_TIMER(profile(),
"InsertKeysToColumnTime");
_serialize_data_timer = ADD_TIMER(profile(), "SerializeDataTime");
+ _hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize",
TUnit::UNIT);
auto& p = _parent->cast<AggSourceOperatorX>();
if (p._without_key) {
if (p._needs_finalize) {
@@ -527,6 +529,16 @@ Status AggSourceOperatorX::close(RuntimeState* state) {
local_state._executor.close();
}
+ /// _hash_table_size_counter may be null if prepare failed.
+ if (local_state._hash_table_size_counter) {
+ std::visit(
+ [&](auto&& agg_method) {
+ COUNTER_SET(local_state._hash_table_size_counter,
+ int64_t(agg_method.data.size()));
+ },
+ local_state._agg_data->method_variant);
+ }
+
local_state._shared_state->agg_data = nullptr;
local_state._shared_state->aggregate_data_container = nullptr;
local_state._shared_state->agg_arena_pool = nullptr;
diff --git a/be/src/pipeline/exec/aggregation_source_operator.h
b/be/src/pipeline/exec/aggregation_source_operator.h
index 6831a110e6..df99f75023 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.h
+++ b/be/src/pipeline/exec/aggregation_source_operator.h
@@ -88,6 +88,7 @@ private:
RuntimeProfile::Counter* _hash_table_iterate_timer;
RuntimeProfile::Counter* _insert_keys_to_column_timer;
RuntimeProfile::Counter* _serialize_data_timer;
+ RuntimeProfile::Counter* _hash_table_size_counter;
using vectorized_get_result = std::function<Status(
RuntimeState* state, vectorized::Block* block, SourceState&
source_state)>;
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 38d8f1df00..114f51071d 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -94,6 +94,8 @@ public:
return _operators[_operators.size() - 1]->row_desc();
}
+ PipelineId id() const { return _pipeline_id; }
+
private:
void _init_profile();
std::atomic<uint32_t> _complete_dependency;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 0cb7d24405..13e89e8766 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -295,21 +295,51 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
_runtime_states[i]->set_desc_tbl(_query_ctx->desc_tbl);
- for (int pip_id = _pipelines.size() - 1; pip_id >= 0; pip_id--) {
+ std::map<PipelineId, PipelineXTask*> pipeline_id_to_task;
+ for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
auto scan_ranges =
find_with_default(local_params.per_node_scan_ranges,
-
_pipelines[pip_id]->operator_xs().front()->id(),
+
_pipelines[pip_idx]->operator_xs().front()->id(),
no_scan_ranges);
auto task = std::make_unique<PipelineXTask>(
- _pipelines[pip_id], _total_tasks++,
_runtime_states[i].get(), this,
- _pipelines[pip_id]->pipeline_profile(), scan_ranges,
local_params.sender_id);
+ _pipelines[pip_idx], _total_tasks++,
_runtime_states[i].get(), this,
+ _pipelines[pip_idx]->pipeline_profile(), scan_ranges,
local_params.sender_id);
+ pipeline_id_to_task.insert({_pipelines[pip_idx]->id(),
task.get()});
RETURN_IF_ERROR(task->prepare(_runtime_states[i].get()));
-
_runtime_profile->add_child(_pipelines[pip_id]->pipeline_profile(), true,
nullptr);
- if (pip_id < _pipelines.size() - 1) {
-
task->set_upstream_dependency(_tasks[i].back()->get_downstream_dependency());
- }
+
_runtime_profile->add_child(_pipelines[pip_idx]->pipeline_profile(), true,
nullptr);
_tasks[i].emplace_back(std::move(task));
}
+
+ /**
+ * Build DAG for pipeline tasks.
+ * For example, we have
+ *
+ * ExchangeSink (Pipeline1) JoinBuildSink (Pipeline2)
+ * \ /
+ * JoinProbeOperator1 (Pipeline1) JoinBuildSink (Pipeline3)
+ * \ /
+ * JoinProbeOperator2 (Pipeline1)
+ *
+ * In this fragment, we have three pipelines and pipeline 1 depends on
pipeline 2 and pipeline 3.
+ * To build this DAG, `_dag` manage dependencies between pipelines by
pipeline ID and
+ * `pipeline_id_to_task` is used to find the task by a unique pipeline
ID.
+ *
+ * Finally, we have two upstream dependencies in Pipeline1
corresponding to JoinProbeOperator1
+ * and JoinProbeOperator2.
+ */
+
+ for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
+ auto task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
+ DCHECK(task != nullptr);
+
+ if (_dag.find(_pipelines[pip_idx]->id()) != _dag.end()) {
+ auto& deps = _dag[_pipelines[pip_idx]->id()];
+ for (auto& dep : deps) {
+ task->set_upstream_dependency(
+
pipeline_id_to_task[dep]->get_downstream_dependency());
+ }
+ }
+ }
}
// register the profile of child data stream sender
// for (auto& sender : _multi_cast_stream_sink_senders) {
@@ -475,7 +505,12 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
"StreamingAggSourceXOperator"));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ const auto downstream_pipeline_id = cur_pipe->id();
+ if (_dag.find(downstream_pipeline_id) == _dag.end()) {
+ _dag.insert({downstream_pipeline_id, {}});
+ }
cur_pipe = add_pipeline();
+ _dag[downstream_pipeline_id].push_back(cur_pipe->id());
DataSinkOperatorXPtr sink;
sink.reset(new StreamingAggSinkOperatorX(pool, tnode, descs));
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
@@ -484,7 +519,13 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
op.reset(new AggSourceOperatorX(pool, tnode, descs,
"AggSourceXOperator"));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ const auto downstream_pipeline_id = cur_pipe->id();
+ if (_dag.find(downstream_pipeline_id) == _dag.end()) {
+ _dag.insert({downstream_pipeline_id, {}});
+ }
cur_pipe = add_pipeline();
+ _dag[downstream_pipeline_id].push_back(cur_pipe->id());
+
DataSinkOperatorXPtr sink;
sink.reset(new AggSinkOperatorX(pool, tnode, descs));
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
@@ -496,7 +537,13 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
op.reset(new SortSourceOperatorX(pool, tnode, descs,
"SortSourceXOperator"));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ const auto downstream_pipeline_id = cur_pipe->id();
+ if (_dag.find(downstream_pipeline_id) == _dag.end()) {
+ _dag.insert({downstream_pipeline_id, {}});
+ }
cur_pipe = add_pipeline();
+ _dag[downstream_pipeline_id].push_back(cur_pipe->id());
+
DataSinkOperatorXPtr sink;
sink.reset(new SortSinkOperatorX(pool, tnode, descs));
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index fd27464491..075b2bc931 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -125,6 +125,9 @@ private:
DataSinkOperatorXPtr _sink;
std::atomic_bool _canceled = false;
+
+ // `_dag` manage dependencies between pipelines by pipeline ID
+ std::map<PipelineId, std::vector<PipelineId>> _dag;
};
} // namespace pipeline
} // namespace doris
diff --git a/be/src/vec/common/sort/vsort_exec_exprs.cpp
b/be/src/vec/common/sort/vsort_exec_exprs.cpp
index 9b9b91426d..cb3aaa6d65 100644
--- a/be/src/vec/common/sort/vsort_exec_exprs.cpp
+++ b/be/src/vec/common/sort/vsort_exec_exprs.cpp
@@ -98,6 +98,7 @@ Status VSortExecExprs::clone(RuntimeState* state,
VSortExecExprs& new_exprs) {
RETURN_IF_ERROR(
_rhs_ordering_expr_ctxs[i]->clone(state,
new_exprs._rhs_ordering_expr_ctxs[i]));
}
+
new_exprs._sort_tuple_slot_expr_ctxs.resize(_sort_tuple_slot_expr_ctxs.size());
for (size_t i = 0; i < _sort_tuple_slot_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(_sort_tuple_slot_expr_ctxs[i]->clone(
state, new_exprs._sort_tuple_slot_expr_ctxs[i]));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]