This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 0f74793c1dfd701188ddafc4c6220c10a37467c2 Author: zhangstar333 <[email protected]> AuthorDate: Mon Jul 17 15:24:02 2023 +0800 [refactor](profile)add node id info in pipeline profile (#21823) --- be/src/pipeline/pipeline_fragment_context.cpp | 90 +++++++++++++-------------- be/src/pipeline/pipeline_task.cpp | 11 ++-- 2 files changed, 49 insertions(+), 52 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index ea0a6c9240..69848be82a 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -438,15 +438,14 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur case TPlanNodeType::FILE_SCAN_NODE: case TPlanNodeType::META_SCAN_NODE: case TPlanNodeType::ES_SCAN_NODE: { - OperatorBuilderPtr operator_t = - std::make_shared<ScanOperatorBuilder>(next_operator_builder_id(), node); + OperatorBuilderPtr operator_t = std::make_shared<ScanOperatorBuilder>(node->id(), node); RETURN_IF_ERROR(cur_pipe->add_operator(operator_t)); break; } case TPlanNodeType::MYSQL_SCAN_NODE: { #ifdef DORIS_WITH_MYSQL OperatorBuilderPtr operator_t = - std::make_shared<MysqlScanOperatorBuilder>(next_operator_builder_id(), node); + std::make_shared<MysqlScanOperatorBuilder>(node->id(), node); RETURN_IF_ERROR(cur_pipe->add_operator(operator_t)); break; #else @@ -456,25 +455,24 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur } case TPlanNodeType::SCHEMA_SCAN_NODE: { OperatorBuilderPtr operator_t = - std::make_shared<SchemaScanOperatorBuilder>(next_operator_builder_id(), node); + std::make_shared<SchemaScanOperatorBuilder>(node->id(), node); RETURN_IF_ERROR(cur_pipe->add_operator(operator_t)); break; } case TPlanNodeType::EXCHANGE_NODE: { OperatorBuilderPtr operator_t = - std::make_shared<ExchangeSourceOperatorBuilder>(next_operator_builder_id(), node); + std::make_shared<ExchangeSourceOperatorBuilder>(node->id(), node); RETURN_IF_ERROR(cur_pipe->add_operator(operator_t)); break; } case TPlanNodeType::EMPTY_SET_NODE: { OperatorBuilderPtr operator_t = - std::make_shared<EmptySetSourceOperatorBuilder>(next_operator_builder_id(), node); + std::make_shared<EmptySetSourceOperatorBuilder>(node->id(), node); RETURN_IF_ERROR(cur_pipe->add_operator(operator_t)); break; } case TPlanNodeType::DATA_GEN_SCAN_NODE: { - OperatorBuilderPtr operator_t = - std::make_shared<DataGenOperatorBuilder>(next_operator_builder_id(), node); + OperatorBuilderPtr operator_t = std::make_shared<DataGenOperatorBuilder>(node->id(), node); RETURN_IF_ERROR(cur_pipe->add_operator(operator_t)); break; } @@ -483,7 +481,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur if (union_node->children_count() == 0 && union_node->get_first_materialized_child_idx() == 0) { // only have const expr OperatorBuilderPtr builder = - std::make_shared<ConstValueOperatorBuilder>(next_operator_builder_id(), node); + std::make_shared<ConstValueOperatorBuilder>(node->id(), node); RETURN_IF_ERROR(cur_pipe->add_operator(builder)); } else { int child_count = union_node->children_count(); @@ -492,11 +490,11 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur auto new_child_pipeline = add_pipeline(); RETURN_IF_ERROR(_build_pipelines(union_node->child(child_id), new_child_pipeline)); OperatorBuilderPtr child_sink_builder = std::make_shared<UnionSinkOperatorBuilder>( - next_operator_builder_id(), child_id, union_node, data_queue); + union_node->id(), child_id, union_node, data_queue); RETURN_IF_ERROR(new_child_pipeline->set_sink(child_sink_builder)); } OperatorBuilderPtr source_builder = std::make_shared<UnionSourceOperatorBuilder>( - next_operator_builder_id(), union_node, data_queue); + node->id(), union_node, data_queue); RETURN_IF_ERROR(cur_pipe->add_operator(source_builder)); } break; @@ -508,19 +506,19 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur if (agg_node->is_streaming_preagg()) { auto data_queue = std::make_shared<DataQueue>(1); OperatorBuilderPtr pre_agg_sink = std::make_shared<StreamingAggSinkOperatorBuilder>( - next_operator_builder_id(), agg_node, data_queue); + node->id(), agg_node, data_queue); RETURN_IF_ERROR(new_pipe->set_sink(pre_agg_sink)); OperatorBuilderPtr pre_agg_source = std::make_shared<StreamingAggSourceOperatorBuilder>( - next_operator_builder_id(), agg_node, data_queue); + node->id(), agg_node, data_queue); RETURN_IF_ERROR(cur_pipe->add_operator(pre_agg_source)); } else { OperatorBuilderPtr agg_sink = - std::make_shared<AggSinkOperatorBuilder>(next_operator_builder_id(), agg_node); + std::make_shared<AggSinkOperatorBuilder>(node->id(), agg_node); RETURN_IF_ERROR(new_pipe->set_sink(agg_sink)); - OperatorBuilderPtr agg_source = std::make_shared<AggSourceOperatorBuilder>( - next_operator_builder_id(), agg_node); + OperatorBuilderPtr agg_source = + std::make_shared<AggSourceOperatorBuilder>(node->id(), agg_node); RETURN_IF_ERROR(cur_pipe->add_operator(agg_source)); } break; @@ -529,12 +527,11 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur auto new_pipeline = add_pipeline(); RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipeline)); - OperatorBuilderPtr sort_sink = - std::make_shared<SortSinkOperatorBuilder>(next_operator_builder_id(), node); + OperatorBuilderPtr sort_sink = std::make_shared<SortSinkOperatorBuilder>(node->id(), node); RETURN_IF_ERROR(new_pipeline->set_sink(sort_sink)); OperatorBuilderPtr sort_source = - std::make_shared<SortSourceOperatorBuilder>(next_operator_builder_id(), node); + std::make_shared<SortSourceOperatorBuilder>(node->id(), node); RETURN_IF_ERROR(cur_pipe->add_operator(sort_source)); break; } @@ -542,13 +539,12 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur auto new_pipeline = add_pipeline(); RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipeline)); - OperatorBuilderPtr partition_sort_sink = std::make_shared<PartitionSortSinkOperatorBuilder>( - next_operator_builder_id(), node); + OperatorBuilderPtr partition_sort_sink = + std::make_shared<PartitionSortSinkOperatorBuilder>(node->id(), node); RETURN_IF_ERROR(new_pipeline->set_sink(partition_sort_sink)); OperatorBuilderPtr partition_sort_source = - std::make_shared<PartitionSortSourceOperatorBuilder>(next_operator_builder_id(), - node); + std::make_shared<PartitionSortSourceOperatorBuilder>(node->id(), node); RETURN_IF_ERROR(cur_pipe->add_operator(partition_sort_source)); break; } @@ -557,32 +553,31 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipeline)); OperatorBuilderPtr analytic_sink = - std::make_shared<AnalyticSinkOperatorBuilder>(next_operator_builder_id(), node); + std::make_shared<AnalyticSinkOperatorBuilder>(node->id(), node); RETURN_IF_ERROR(new_pipeline->set_sink(analytic_sink)); OperatorBuilderPtr analytic_source = - std::make_shared<AnalyticSourceOperatorBuilder>(next_operator_builder_id(), node); + std::make_shared<AnalyticSourceOperatorBuilder>(node->id(), node); RETURN_IF_ERROR(cur_pipe->add_operator(analytic_source)); break; } case TPlanNodeType::REPEAT_NODE: { RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe)); - OperatorBuilderPtr builder = - std::make_shared<RepeatOperatorBuilder>(next_operator_builder_id(), node); + OperatorBuilderPtr builder = std::make_shared<RepeatOperatorBuilder>(node->id(), node); RETURN_IF_ERROR(cur_pipe->add_operator(builder)); break; } case TPlanNodeType::ASSERT_NUM_ROWS_NODE: { RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe)); OperatorBuilderPtr builder = - std::make_shared<AssertNumRowsOperatorBuilder>(next_operator_builder_id(), node); + std::make_shared<AssertNumRowsOperatorBuilder>(node->id(), node); RETURN_IF_ERROR(cur_pipe->add_operator(builder)); break; } case TPlanNodeType::TABLE_FUNCTION_NODE: { RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe)); OperatorBuilderPtr builder = - std::make_shared<TableFunctionOperatorBuilder>(next_operator_builder_id(), node); + std::make_shared<TableFunctionOperatorBuilder>(node->id(), node); RETURN_IF_ERROR(cur_pipe->add_operator(builder)); break; } @@ -593,16 +588,16 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur RETURN_IF_ERROR(_build_pipelines(node->child(1), new_pipe)); } else { OperatorBuilderPtr builder = std::make_shared<EmptySourceOperatorBuilder>( - next_operator_builder_id(), node->child(1)->row_desc(), node->child(1)); + node->child(1)->id(), node->child(1)->row_desc(), node->child(1)); new_pipe->add_operator(builder); } OperatorBuilderPtr join_sink = - std::make_shared<HashJoinBuildSinkBuilder>(next_operator_builder_id(), join_node); + std::make_shared<HashJoinBuildSinkBuilder>(node->id(), join_node); RETURN_IF_ERROR(new_pipe->set_sink(join_sink)); RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe)); - OperatorBuilderPtr join_source = std::make_shared<HashJoinProbeOperatorBuilder>( - next_operator_builder_id(), join_node); + OperatorBuilderPtr join_source = + std::make_shared<HashJoinProbeOperatorBuilder>(node->id(), join_node); RETURN_IF_ERROR(cur_pipe->add_operator(join_source)); cur_pipe->add_dependency(new_pipe); @@ -611,13 +606,13 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur case TPlanNodeType::CROSS_JOIN_NODE: { auto new_pipe = add_pipeline(); RETURN_IF_ERROR(_build_pipelines(node->child(1), new_pipe)); - OperatorBuilderPtr join_sink = std::make_shared<NestLoopJoinBuildOperatorBuilder>( - next_operator_builder_id(), node); + OperatorBuilderPtr join_sink = + std::make_shared<NestLoopJoinBuildOperatorBuilder>(node->id(), node); RETURN_IF_ERROR(new_pipe->set_sink(join_sink)); RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe)); - OperatorBuilderPtr join_source = std::make_shared<NestLoopJoinProbeOperatorBuilder>( - next_operator_builder_id(), node); + OperatorBuilderPtr join_source = + std::make_shared<NestLoopJoinProbeOperatorBuilder>(node->id(), node); RETURN_IF_ERROR(cur_pipe->add_operator(join_source)); cur_pipe->add_dependency(new_pipe); @@ -633,8 +628,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur } case TPlanNodeType::SELECT_NODE: { RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe)); - OperatorBuilderPtr builder = - std::make_shared<SelectOperatorBuilder>(next_operator_builder_id(), node); + OperatorBuilderPtr builder = std::make_shared<SelectOperatorBuilder>(node->id(), node); RETURN_IF_ERROR(cur_pipe->add_operator(builder)); break; } @@ -650,21 +644,21 @@ Status PipelineFragmentContext::_build_operators_for_set_operation_node(ExecNode PipelinePtr cur_pipe) { auto build_pipeline = add_pipeline(); RETURN_IF_ERROR(_build_pipelines(node->child(0), build_pipeline)); - OperatorBuilderPtr sink_builder = std::make_shared<SetSinkOperatorBuilder<is_intersect>>( - next_operator_builder_id(), node); + OperatorBuilderPtr sink_builder = + std::make_shared<SetSinkOperatorBuilder<is_intersect>>(node->id(), node); RETURN_IF_ERROR(build_pipeline->set_sink(sink_builder)); for (int child_id = 1; child_id < node->children_count(); ++child_id) { auto probe_pipeline = add_pipeline(); RETURN_IF_ERROR(_build_pipelines(node->child(child_id), probe_pipeline)); OperatorBuilderPtr probe_sink_builder = - std::make_shared<SetProbeSinkOperatorBuilder<is_intersect>>( - next_operator_builder_id(), child_id, node); + std::make_shared<SetProbeSinkOperatorBuilder<is_intersect>>(node->id(), child_id, + node); RETURN_IF_ERROR(probe_pipeline->set_sink(probe_sink_builder)); } - OperatorBuilderPtr source_builder = std::make_shared<SetSourceOperatorBuilder<is_intersect>>( - next_operator_builder_id(), node); + OperatorBuilderPtr source_builder = + std::make_shared<SetSourceOperatorBuilder<is_intersect>>(node->id(), node); return cur_pipe->add_operator(source_builder); } @@ -734,7 +728,7 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr OperatorBuilderPtr sink_; switch (thrift_sink.type) { case TDataSinkType::DATA_STREAM_SINK: { - sink_ = std::make_shared<ExchangeSinkOperatorBuilder>(next_operator_builder_id(), + sink_ = std::make_shared<ExchangeSinkOperatorBuilder>(thrift_sink.stream_sink.dest_node_id, _sink.get(), this); break; } @@ -755,8 +749,8 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr break; } case TDataSinkType::RESULT_FILE_SINK: { - sink_ = std::make_shared<ResultFileSinkOperatorBuilder>(next_operator_builder_id(), - _sink.get()); + sink_ = std::make_shared<ResultFileSinkOperatorBuilder>( + thrift_sink.result_file_sink.dest_node_id, _sink.get()); break; } case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: { diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 2d9e45004b..e48d729f46 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -94,15 +94,18 @@ Status PipelineTask::prepare(RuntimeState* state) { RETURN_IF_ERROR(o->prepare(state)); } - _task_profile->add_info_string("Sink", fmt::format("{}({})", _sink->get_name(), _sink->id())); + _task_profile->add_info_string("Sink", + fmt::format("{}(dst_id={})", _sink->get_name(), _sink->id())); fmt::memory_buffer operator_ids_str; for (size_t i = 0; i < _operators.size(); i++) { if (i == 0) { - fmt::format_to(operator_ids_str, - fmt::format("[{}({})", _operators[i]->get_name(), _operators[i]->id())); + fmt::format_to( + operator_ids_str, + fmt::format("[{}(node_id={})", _operators[i]->get_name(), _operators[i]->id())); } else { fmt::format_to(operator_ids_str, - fmt::format(", {}({})", _operators[i]->get_name(), _operators[i]->id())); + fmt::format(", {}(node_id={})", _operators[i]->get_name(), + _operators[i]->id())); } } fmt::format_to(operator_ids_str, "]"); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
