This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 29146c680e [refactor](profile)add node id info in pipeline profile
(#21823)
29146c680e is described below
commit 29146c680ed41286fd81f694346e11d9b70739da
Author: zhangstar333 <[email protected]>
AuthorDate: Mon Jul 17 15:24:02 2023 +0800
[refactor](profile)add node id info in pipeline profile (#21823)
---
be/src/pipeline/pipeline_fragment_context.cpp | 90 +++++++++++++--------------
be/src/pipeline/pipeline_task.cpp | 11 ++--
2 files changed, 49 insertions(+), 52 deletions(-)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 48316a5008..3c4957ad40 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -441,15 +441,14 @@ Status
PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
case TPlanNodeType::FILE_SCAN_NODE:
case TPlanNodeType::META_SCAN_NODE:
case TPlanNodeType::ES_SCAN_NODE: {
- OperatorBuilderPtr operator_t =
-
std::make_shared<ScanOperatorBuilder>(next_operator_builder_id(), node);
+ OperatorBuilderPtr operator_t =
std::make_shared<ScanOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(operator_t));
break;
}
case TPlanNodeType::MYSQL_SCAN_NODE: {
#ifdef DORIS_WITH_MYSQL
OperatorBuilderPtr operator_t =
-
std::make_shared<MysqlScanOperatorBuilder>(next_operator_builder_id(), node);
+ std::make_shared<MysqlScanOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(operator_t));
break;
#else
@@ -459,25 +458,24 @@ Status
PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
}
case TPlanNodeType::SCHEMA_SCAN_NODE: {
OperatorBuilderPtr operator_t =
-
std::make_shared<SchemaScanOperatorBuilder>(next_operator_builder_id(), node);
+ std::make_shared<SchemaScanOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(operator_t));
break;
}
case TPlanNodeType::EXCHANGE_NODE: {
OperatorBuilderPtr operator_t =
-
std::make_shared<ExchangeSourceOperatorBuilder>(next_operator_builder_id(),
node);
+ std::make_shared<ExchangeSourceOperatorBuilder>(node->id(),
node);
RETURN_IF_ERROR(cur_pipe->add_operator(operator_t));
break;
}
case TPlanNodeType::EMPTY_SET_NODE: {
OperatorBuilderPtr operator_t =
-
std::make_shared<EmptySetSourceOperatorBuilder>(next_operator_builder_id(),
node);
+ std::make_shared<EmptySetSourceOperatorBuilder>(node->id(),
node);
RETURN_IF_ERROR(cur_pipe->add_operator(operator_t));
break;
}
case TPlanNodeType::DATA_GEN_SCAN_NODE: {
- OperatorBuilderPtr operator_t =
-
std::make_shared<DataGenOperatorBuilder>(next_operator_builder_id(), node);
+ OperatorBuilderPtr operator_t =
std::make_shared<DataGenOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(operator_t));
break;
}
@@ -486,7 +484,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode*
node, PipelinePtr cur
if (union_node->children_count() == 0 &&
union_node->get_first_materialized_child_idx() == 0) { // only
have const expr
OperatorBuilderPtr builder =
-
std::make_shared<ConstValueOperatorBuilder>(next_operator_builder_id(), node);
+ std::make_shared<ConstValueOperatorBuilder>(node->id(),
node);
RETURN_IF_ERROR(cur_pipe->add_operator(builder));
} else {
int child_count = union_node->children_count();
@@ -495,11 +493,11 @@ Status
PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
auto new_child_pipeline = add_pipeline();
RETURN_IF_ERROR(_build_pipelines(union_node->child(child_id),
new_child_pipeline));
OperatorBuilderPtr child_sink_builder =
std::make_shared<UnionSinkOperatorBuilder>(
- next_operator_builder_id(), child_id, union_node,
data_queue);
+ union_node->id(), child_id, union_node, data_queue);
RETURN_IF_ERROR(new_child_pipeline->set_sink(child_sink_builder));
}
OperatorBuilderPtr source_builder =
std::make_shared<UnionSourceOperatorBuilder>(
- next_operator_builder_id(), union_node, data_queue);
+ node->id(), union_node, data_queue);
RETURN_IF_ERROR(cur_pipe->add_operator(source_builder));
}
break;
@@ -511,19 +509,19 @@ Status
PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
if (agg_node->is_streaming_preagg()) {
auto data_queue = std::make_shared<DataQueue>(1);
OperatorBuilderPtr pre_agg_sink =
std::make_shared<StreamingAggSinkOperatorBuilder>(
- next_operator_builder_id(), agg_node, data_queue);
+ node->id(), agg_node, data_queue);
RETURN_IF_ERROR(new_pipe->set_sink(pre_agg_sink));
OperatorBuilderPtr pre_agg_source =
std::make_shared<StreamingAggSourceOperatorBuilder>(
- next_operator_builder_id(), agg_node, data_queue);
+ node->id(), agg_node, data_queue);
RETURN_IF_ERROR(cur_pipe->add_operator(pre_agg_source));
} else {
OperatorBuilderPtr agg_sink =
-
std::make_shared<AggSinkOperatorBuilder>(next_operator_builder_id(), agg_node);
+ std::make_shared<AggSinkOperatorBuilder>(node->id(),
agg_node);
RETURN_IF_ERROR(new_pipe->set_sink(agg_sink));
- OperatorBuilderPtr agg_source =
std::make_shared<AggSourceOperatorBuilder>(
- next_operator_builder_id(), agg_node);
+ OperatorBuilderPtr agg_source =
+ std::make_shared<AggSourceOperatorBuilder>(node->id(),
agg_node);
RETURN_IF_ERROR(cur_pipe->add_operator(agg_source));
}
break;
@@ -532,12 +530,11 @@ Status
PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
auto new_pipeline = add_pipeline();
RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipeline));
- OperatorBuilderPtr sort_sink =
-
std::make_shared<SortSinkOperatorBuilder>(next_operator_builder_id(), node);
+ OperatorBuilderPtr sort_sink =
std::make_shared<SortSinkOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(new_pipeline->set_sink(sort_sink));
OperatorBuilderPtr sort_source =
-
std::make_shared<SortSourceOperatorBuilder>(next_operator_builder_id(), node);
+ std::make_shared<SortSourceOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(sort_source));
break;
}
@@ -545,13 +542,12 @@ Status
PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
auto new_pipeline = add_pipeline();
RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipeline));
- OperatorBuilderPtr partition_sort_sink =
std::make_shared<PartitionSortSinkOperatorBuilder>(
- next_operator_builder_id(), node);
+ OperatorBuilderPtr partition_sort_sink =
+ std::make_shared<PartitionSortSinkOperatorBuilder>(node->id(),
node);
RETURN_IF_ERROR(new_pipeline->set_sink(partition_sort_sink));
OperatorBuilderPtr partition_sort_source =
-
std::make_shared<PartitionSortSourceOperatorBuilder>(next_operator_builder_id(),
- node);
+
std::make_shared<PartitionSortSourceOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(partition_sort_source));
break;
}
@@ -560,32 +556,31 @@ Status
PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipeline));
OperatorBuilderPtr analytic_sink =
-
std::make_shared<AnalyticSinkOperatorBuilder>(next_operator_builder_id(), node);
+ std::make_shared<AnalyticSinkOperatorBuilder>(node->id(),
node);
RETURN_IF_ERROR(new_pipeline->set_sink(analytic_sink));
OperatorBuilderPtr analytic_source =
-
std::make_shared<AnalyticSourceOperatorBuilder>(next_operator_builder_id(),
node);
+ std::make_shared<AnalyticSourceOperatorBuilder>(node->id(),
node);
RETURN_IF_ERROR(cur_pipe->add_operator(analytic_source));
break;
}
case TPlanNodeType::REPEAT_NODE: {
RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
- OperatorBuilderPtr builder =
-
std::make_shared<RepeatOperatorBuilder>(next_operator_builder_id(), node);
+ OperatorBuilderPtr builder =
std::make_shared<RepeatOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(builder));
break;
}
case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
OperatorBuilderPtr builder =
-
std::make_shared<AssertNumRowsOperatorBuilder>(next_operator_builder_id(),
node);
+ std::make_shared<AssertNumRowsOperatorBuilder>(node->id(),
node);
RETURN_IF_ERROR(cur_pipe->add_operator(builder));
break;
}
case TPlanNodeType::TABLE_FUNCTION_NODE: {
RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
OperatorBuilderPtr builder =
-
std::make_shared<TableFunctionOperatorBuilder>(next_operator_builder_id(),
node);
+ std::make_shared<TableFunctionOperatorBuilder>(node->id(),
node);
RETURN_IF_ERROR(cur_pipe->add_operator(builder));
break;
}
@@ -596,16 +591,16 @@ Status
PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
RETURN_IF_ERROR(_build_pipelines(node->child(1), new_pipe));
} else {
OperatorBuilderPtr builder =
std::make_shared<EmptySourceOperatorBuilder>(
- next_operator_builder_id(), node->child(1)->row_desc(),
node->child(1));
+ node->child(1)->id(), node->child(1)->row_desc(),
node->child(1));
new_pipe->add_operator(builder);
}
OperatorBuilderPtr join_sink =
-
std::make_shared<HashJoinBuildSinkBuilder>(next_operator_builder_id(),
join_node);
+ std::make_shared<HashJoinBuildSinkBuilder>(node->id(),
join_node);
RETURN_IF_ERROR(new_pipe->set_sink(join_sink));
RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
- OperatorBuilderPtr join_source =
std::make_shared<HashJoinProbeOperatorBuilder>(
- next_operator_builder_id(), join_node);
+ OperatorBuilderPtr join_source =
+ std::make_shared<HashJoinProbeOperatorBuilder>(node->id(),
join_node);
RETURN_IF_ERROR(cur_pipe->add_operator(join_source));
cur_pipe->add_dependency(new_pipe);
@@ -614,13 +609,13 @@ Status
PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
case TPlanNodeType::CROSS_JOIN_NODE: {
auto new_pipe = add_pipeline();
RETURN_IF_ERROR(_build_pipelines(node->child(1), new_pipe));
- OperatorBuilderPtr join_sink =
std::make_shared<NestLoopJoinBuildOperatorBuilder>(
- next_operator_builder_id(), node);
+ OperatorBuilderPtr join_sink =
+ std::make_shared<NestLoopJoinBuildOperatorBuilder>(node->id(),
node);
RETURN_IF_ERROR(new_pipe->set_sink(join_sink));
RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
- OperatorBuilderPtr join_source =
std::make_shared<NestLoopJoinProbeOperatorBuilder>(
- next_operator_builder_id(), node);
+ OperatorBuilderPtr join_source =
+ std::make_shared<NestLoopJoinProbeOperatorBuilder>(node->id(),
node);
RETURN_IF_ERROR(cur_pipe->add_operator(join_source));
cur_pipe->add_dependency(new_pipe);
@@ -636,8 +631,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode*
node, PipelinePtr cur
}
case TPlanNodeType::SELECT_NODE: {
RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
- OperatorBuilderPtr builder =
-
std::make_shared<SelectOperatorBuilder>(next_operator_builder_id(), node);
+ OperatorBuilderPtr builder =
std::make_shared<SelectOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(builder));
break;
}
@@ -653,21 +647,21 @@ Status
PipelineFragmentContext::_build_operators_for_set_operation_node(ExecNode
PipelinePtr cur_pipe) {
auto build_pipeline = add_pipeline();
RETURN_IF_ERROR(_build_pipelines(node->child(0), build_pipeline));
- OperatorBuilderPtr sink_builder =
std::make_shared<SetSinkOperatorBuilder<is_intersect>>(
- next_operator_builder_id(), node);
+ OperatorBuilderPtr sink_builder =
+ std::make_shared<SetSinkOperatorBuilder<is_intersect>>(node->id(),
node);
RETURN_IF_ERROR(build_pipeline->set_sink(sink_builder));
for (int child_id = 1; child_id < node->children_count(); ++child_id) {
auto probe_pipeline = add_pipeline();
RETURN_IF_ERROR(_build_pipelines(node->child(child_id),
probe_pipeline));
OperatorBuilderPtr probe_sink_builder =
- std::make_shared<SetProbeSinkOperatorBuilder<is_intersect>>(
- next_operator_builder_id(), child_id, node);
+
std::make_shared<SetProbeSinkOperatorBuilder<is_intersect>>(node->id(),
child_id,
+
node);
RETURN_IF_ERROR(probe_pipeline->set_sink(probe_sink_builder));
}
- OperatorBuilderPtr source_builder =
std::make_shared<SetSourceOperatorBuilder<is_intersect>>(
- next_operator_builder_id(), node);
+ OperatorBuilderPtr source_builder =
+
std::make_shared<SetSourceOperatorBuilder<is_intersect>>(node->id(), node);
return cur_pipe->add_operator(source_builder);
}
@@ -737,7 +731,7 @@ Status PipelineFragmentContext::_create_sink(int sender_id,
const TDataSink& thr
OperatorBuilderPtr sink_;
switch (thrift_sink.type) {
case TDataSinkType::DATA_STREAM_SINK: {
- sink_ =
std::make_shared<ExchangeSinkOperatorBuilder>(next_operator_builder_id(),
+ sink_ =
std::make_shared<ExchangeSinkOperatorBuilder>(thrift_sink.stream_sink.dest_node_id,
_sink.get(),
this);
break;
}
@@ -758,8 +752,8 @@ Status PipelineFragmentContext::_create_sink(int sender_id,
const TDataSink& thr
break;
}
case TDataSinkType::RESULT_FILE_SINK: {
- sink_ =
std::make_shared<ResultFileSinkOperatorBuilder>(next_operator_builder_id(),
- _sink.get());
+ sink_ = std::make_shared<ResultFileSinkOperatorBuilder>(
+ thrift_sink.result_file_sink.dest_node_id, _sink.get());
break;
}
case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index c44cec514c..a4eac4a93d 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -109,15 +109,18 @@ Status PipelineTask::prepare(RuntimeState* state) {
RETURN_IF_ERROR(o->prepare(state));
}
- _task_profile->add_info_string("Sink", fmt::format("{}({})",
_sink->get_name(), _sink->id()));
+ _task_profile->add_info_string("Sink",
+ fmt::format("{}(dst_id={})",
_sink->get_name(), _sink->id()));
fmt::memory_buffer operator_ids_str;
for (size_t i = 0; i < _operators.size(); i++) {
if (i == 0) {
- fmt::format_to(operator_ids_str,
- fmt::format("[{}({})", _operators[i]->get_name(),
_operators[i]->id()));
+ fmt::format_to(
+ operator_ids_str,
+ fmt::format("[{}(node_id={})", _operators[i]->get_name(),
_operators[i]->id()));
} else {
fmt::format_to(operator_ids_str,
- fmt::format(", {}({})", _operators[i]->get_name(),
_operators[i]->id()));
+ fmt::format(", {}(node_id={})",
_operators[i]->get_name(),
+ _operators[i]->id()));
}
}
fmt::format_to(operator_ids_str, "]");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]