This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 29146c680e [refactor](profile)add node id info in pipeline profile 
(#21823)
29146c680e is described below

commit 29146c680ed41286fd81f694346e11d9b70739da
Author: zhangstar333 <[email protected]>
AuthorDate: Mon Jul 17 15:24:02 2023 +0800

    [refactor](profile)add node id info in pipeline profile (#21823)
---
 be/src/pipeline/pipeline_fragment_context.cpp | 90 +++++++++++++--------------
 be/src/pipeline/pipeline_task.cpp             | 11 ++--
 2 files changed, 49 insertions(+), 52 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 48316a5008..3c4957ad40 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -441,15 +441,14 @@ Status 
PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
     case TPlanNodeType::FILE_SCAN_NODE:
     case TPlanNodeType::META_SCAN_NODE:
     case TPlanNodeType::ES_SCAN_NODE: {
-        OperatorBuilderPtr operator_t =
-                
std::make_shared<ScanOperatorBuilder>(next_operator_builder_id(), node);
+        OperatorBuilderPtr operator_t = 
std::make_shared<ScanOperatorBuilder>(node->id(), node);
         RETURN_IF_ERROR(cur_pipe->add_operator(operator_t));
         break;
     }
     case TPlanNodeType::MYSQL_SCAN_NODE: {
 #ifdef DORIS_WITH_MYSQL
         OperatorBuilderPtr operator_t =
-                
std::make_shared<MysqlScanOperatorBuilder>(next_operator_builder_id(), node);
+                std::make_shared<MysqlScanOperatorBuilder>(node->id(), node);
         RETURN_IF_ERROR(cur_pipe->add_operator(operator_t));
         break;
 #else
@@ -459,25 +458,24 @@ Status 
PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
     }
     case TPlanNodeType::SCHEMA_SCAN_NODE: {
         OperatorBuilderPtr operator_t =
-                
std::make_shared<SchemaScanOperatorBuilder>(next_operator_builder_id(), node);
+                std::make_shared<SchemaScanOperatorBuilder>(node->id(), node);
         RETURN_IF_ERROR(cur_pipe->add_operator(operator_t));
         break;
     }
     case TPlanNodeType::EXCHANGE_NODE: {
         OperatorBuilderPtr operator_t =
-                
std::make_shared<ExchangeSourceOperatorBuilder>(next_operator_builder_id(), 
node);
+                std::make_shared<ExchangeSourceOperatorBuilder>(node->id(), 
node);
         RETURN_IF_ERROR(cur_pipe->add_operator(operator_t));
         break;
     }
     case TPlanNodeType::EMPTY_SET_NODE: {
         OperatorBuilderPtr operator_t =
-                
std::make_shared<EmptySetSourceOperatorBuilder>(next_operator_builder_id(), 
node);
+                std::make_shared<EmptySetSourceOperatorBuilder>(node->id(), 
node);
         RETURN_IF_ERROR(cur_pipe->add_operator(operator_t));
         break;
     }
     case TPlanNodeType::DATA_GEN_SCAN_NODE: {
-        OperatorBuilderPtr operator_t =
-                
std::make_shared<DataGenOperatorBuilder>(next_operator_builder_id(), node);
+        OperatorBuilderPtr operator_t = 
std::make_shared<DataGenOperatorBuilder>(node->id(), node);
         RETURN_IF_ERROR(cur_pipe->add_operator(operator_t));
         break;
     }
@@ -486,7 +484,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* 
node, PipelinePtr cur
         if (union_node->children_count() == 0 &&
             union_node->get_first_materialized_child_idx() == 0) { // only 
have const expr
             OperatorBuilderPtr builder =
-                    
std::make_shared<ConstValueOperatorBuilder>(next_operator_builder_id(), node);
+                    std::make_shared<ConstValueOperatorBuilder>(node->id(), 
node);
             RETURN_IF_ERROR(cur_pipe->add_operator(builder));
         } else {
             int child_count = union_node->children_count();
@@ -495,11 +493,11 @@ Status 
PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
                 auto new_child_pipeline = add_pipeline();
                 RETURN_IF_ERROR(_build_pipelines(union_node->child(child_id), 
new_child_pipeline));
                 OperatorBuilderPtr child_sink_builder = 
std::make_shared<UnionSinkOperatorBuilder>(
-                        next_operator_builder_id(), child_id, union_node, 
data_queue);
+                        union_node->id(), child_id, union_node, data_queue);
                 
RETURN_IF_ERROR(new_child_pipeline->set_sink(child_sink_builder));
             }
             OperatorBuilderPtr source_builder = 
std::make_shared<UnionSourceOperatorBuilder>(
-                    next_operator_builder_id(), union_node, data_queue);
+                    node->id(), union_node, data_queue);
             RETURN_IF_ERROR(cur_pipe->add_operator(source_builder));
         }
         break;
@@ -511,19 +509,19 @@ Status 
PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
         if (agg_node->is_streaming_preagg()) {
             auto data_queue = std::make_shared<DataQueue>(1);
             OperatorBuilderPtr pre_agg_sink = 
std::make_shared<StreamingAggSinkOperatorBuilder>(
-                    next_operator_builder_id(), agg_node, data_queue);
+                    node->id(), agg_node, data_queue);
             RETURN_IF_ERROR(new_pipe->set_sink(pre_agg_sink));
 
             OperatorBuilderPtr pre_agg_source = 
std::make_shared<StreamingAggSourceOperatorBuilder>(
-                    next_operator_builder_id(), agg_node, data_queue);
+                    node->id(), agg_node, data_queue);
             RETURN_IF_ERROR(cur_pipe->add_operator(pre_agg_source));
         } else {
             OperatorBuilderPtr agg_sink =
-                    
std::make_shared<AggSinkOperatorBuilder>(next_operator_builder_id(), agg_node);
+                    std::make_shared<AggSinkOperatorBuilder>(node->id(), 
agg_node);
             RETURN_IF_ERROR(new_pipe->set_sink(agg_sink));
 
-            OperatorBuilderPtr agg_source = 
std::make_shared<AggSourceOperatorBuilder>(
-                    next_operator_builder_id(), agg_node);
+            OperatorBuilderPtr agg_source =
+                    std::make_shared<AggSourceOperatorBuilder>(node->id(), 
agg_node);
             RETURN_IF_ERROR(cur_pipe->add_operator(agg_source));
         }
         break;
@@ -532,12 +530,11 @@ Status 
PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
         auto new_pipeline = add_pipeline();
         RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipeline));
 
-        OperatorBuilderPtr sort_sink =
-                
std::make_shared<SortSinkOperatorBuilder>(next_operator_builder_id(), node);
+        OperatorBuilderPtr sort_sink = 
std::make_shared<SortSinkOperatorBuilder>(node->id(), node);
         RETURN_IF_ERROR(new_pipeline->set_sink(sort_sink));
 
         OperatorBuilderPtr sort_source =
-                
std::make_shared<SortSourceOperatorBuilder>(next_operator_builder_id(), node);
+                std::make_shared<SortSourceOperatorBuilder>(node->id(), node);
         RETURN_IF_ERROR(cur_pipe->add_operator(sort_source));
         break;
     }
@@ -545,13 +542,12 @@ Status 
PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
         auto new_pipeline = add_pipeline();
         RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipeline));
 
-        OperatorBuilderPtr partition_sort_sink = 
std::make_shared<PartitionSortSinkOperatorBuilder>(
-                next_operator_builder_id(), node);
+        OperatorBuilderPtr partition_sort_sink =
+                std::make_shared<PartitionSortSinkOperatorBuilder>(node->id(), 
node);
         RETURN_IF_ERROR(new_pipeline->set_sink(partition_sort_sink));
 
         OperatorBuilderPtr partition_sort_source =
-                
std::make_shared<PartitionSortSourceOperatorBuilder>(next_operator_builder_id(),
-                                                                     node);
+                
std::make_shared<PartitionSortSourceOperatorBuilder>(node->id(), node);
         RETURN_IF_ERROR(cur_pipe->add_operator(partition_sort_source));
         break;
     }
@@ -560,32 +556,31 @@ Status 
PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
         RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipeline));
 
         OperatorBuilderPtr analytic_sink =
-                
std::make_shared<AnalyticSinkOperatorBuilder>(next_operator_builder_id(), node);
+                std::make_shared<AnalyticSinkOperatorBuilder>(node->id(), 
node);
         RETURN_IF_ERROR(new_pipeline->set_sink(analytic_sink));
 
         OperatorBuilderPtr analytic_source =
-                
std::make_shared<AnalyticSourceOperatorBuilder>(next_operator_builder_id(), 
node);
+                std::make_shared<AnalyticSourceOperatorBuilder>(node->id(), 
node);
         RETURN_IF_ERROR(cur_pipe->add_operator(analytic_source));
         break;
     }
     case TPlanNodeType::REPEAT_NODE: {
         RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
-        OperatorBuilderPtr builder =
-                
std::make_shared<RepeatOperatorBuilder>(next_operator_builder_id(), node);
+        OperatorBuilderPtr builder = 
std::make_shared<RepeatOperatorBuilder>(node->id(), node);
         RETURN_IF_ERROR(cur_pipe->add_operator(builder));
         break;
     }
     case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
         RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
         OperatorBuilderPtr builder =
-                
std::make_shared<AssertNumRowsOperatorBuilder>(next_operator_builder_id(), 
node);
+                std::make_shared<AssertNumRowsOperatorBuilder>(node->id(), 
node);
         RETURN_IF_ERROR(cur_pipe->add_operator(builder));
         break;
     }
     case TPlanNodeType::TABLE_FUNCTION_NODE: {
         RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
         OperatorBuilderPtr builder =
-                
std::make_shared<TableFunctionOperatorBuilder>(next_operator_builder_id(), 
node);
+                std::make_shared<TableFunctionOperatorBuilder>(node->id(), 
node);
         RETURN_IF_ERROR(cur_pipe->add_operator(builder));
         break;
     }
@@ -596,16 +591,16 @@ Status 
PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
             RETURN_IF_ERROR(_build_pipelines(node->child(1), new_pipe));
         } else {
             OperatorBuilderPtr builder = 
std::make_shared<EmptySourceOperatorBuilder>(
-                    next_operator_builder_id(), node->child(1)->row_desc(), 
node->child(1));
+                    node->child(1)->id(), node->child(1)->row_desc(), 
node->child(1));
             new_pipe->add_operator(builder);
         }
         OperatorBuilderPtr join_sink =
-                
std::make_shared<HashJoinBuildSinkBuilder>(next_operator_builder_id(), 
join_node);
+                std::make_shared<HashJoinBuildSinkBuilder>(node->id(), 
join_node);
         RETURN_IF_ERROR(new_pipe->set_sink(join_sink));
 
         RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
-        OperatorBuilderPtr join_source = 
std::make_shared<HashJoinProbeOperatorBuilder>(
-                next_operator_builder_id(), join_node);
+        OperatorBuilderPtr join_source =
+                std::make_shared<HashJoinProbeOperatorBuilder>(node->id(), 
join_node);
         RETURN_IF_ERROR(cur_pipe->add_operator(join_source));
 
         cur_pipe->add_dependency(new_pipe);
@@ -614,13 +609,13 @@ Status 
PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
     case TPlanNodeType::CROSS_JOIN_NODE: {
         auto new_pipe = add_pipeline();
         RETURN_IF_ERROR(_build_pipelines(node->child(1), new_pipe));
-        OperatorBuilderPtr join_sink = 
std::make_shared<NestLoopJoinBuildOperatorBuilder>(
-                next_operator_builder_id(), node);
+        OperatorBuilderPtr join_sink =
+                std::make_shared<NestLoopJoinBuildOperatorBuilder>(node->id(), 
node);
         RETURN_IF_ERROR(new_pipe->set_sink(join_sink));
 
         RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
-        OperatorBuilderPtr join_source = 
std::make_shared<NestLoopJoinProbeOperatorBuilder>(
-                next_operator_builder_id(), node);
+        OperatorBuilderPtr join_source =
+                std::make_shared<NestLoopJoinProbeOperatorBuilder>(node->id(), 
node);
         RETURN_IF_ERROR(cur_pipe->add_operator(join_source));
 
         cur_pipe->add_dependency(new_pipe);
@@ -636,8 +631,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* 
node, PipelinePtr cur
     }
     case TPlanNodeType::SELECT_NODE: {
         RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
-        OperatorBuilderPtr builder =
-                
std::make_shared<SelectOperatorBuilder>(next_operator_builder_id(), node);
+        OperatorBuilderPtr builder = 
std::make_shared<SelectOperatorBuilder>(node->id(), node);
         RETURN_IF_ERROR(cur_pipe->add_operator(builder));
         break;
     }
@@ -653,21 +647,21 @@ Status 
PipelineFragmentContext::_build_operators_for_set_operation_node(ExecNode
                                                                         
PipelinePtr cur_pipe) {
     auto build_pipeline = add_pipeline();
     RETURN_IF_ERROR(_build_pipelines(node->child(0), build_pipeline));
-    OperatorBuilderPtr sink_builder = 
std::make_shared<SetSinkOperatorBuilder<is_intersect>>(
-            next_operator_builder_id(), node);
+    OperatorBuilderPtr sink_builder =
+            std::make_shared<SetSinkOperatorBuilder<is_intersect>>(node->id(), 
node);
     RETURN_IF_ERROR(build_pipeline->set_sink(sink_builder));
 
     for (int child_id = 1; child_id < node->children_count(); ++child_id) {
         auto probe_pipeline = add_pipeline();
         RETURN_IF_ERROR(_build_pipelines(node->child(child_id), 
probe_pipeline));
         OperatorBuilderPtr probe_sink_builder =
-                std::make_shared<SetProbeSinkOperatorBuilder<is_intersect>>(
-                        next_operator_builder_id(), child_id, node);
+                
std::make_shared<SetProbeSinkOperatorBuilder<is_intersect>>(node->id(), 
child_id,
+                                                                            
node);
         RETURN_IF_ERROR(probe_pipeline->set_sink(probe_sink_builder));
     }
 
-    OperatorBuilderPtr source_builder = 
std::make_shared<SetSourceOperatorBuilder<is_intersect>>(
-            next_operator_builder_id(), node);
+    OperatorBuilderPtr source_builder =
+            
std::make_shared<SetSourceOperatorBuilder<is_intersect>>(node->id(), node);
     return cur_pipe->add_operator(source_builder);
 }
 
@@ -737,7 +731,7 @@ Status PipelineFragmentContext::_create_sink(int sender_id, 
const TDataSink& thr
     OperatorBuilderPtr sink_;
     switch (thrift_sink.type) {
     case TDataSinkType::DATA_STREAM_SINK: {
-        sink_ = 
std::make_shared<ExchangeSinkOperatorBuilder>(next_operator_builder_id(),
+        sink_ = 
std::make_shared<ExchangeSinkOperatorBuilder>(thrift_sink.stream_sink.dest_node_id,
                                                               _sink.get(), 
this);
         break;
     }
@@ -758,8 +752,8 @@ Status PipelineFragmentContext::_create_sink(int sender_id, 
const TDataSink& thr
         break;
     }
     case TDataSinkType::RESULT_FILE_SINK: {
-        sink_ = 
std::make_shared<ResultFileSinkOperatorBuilder>(next_operator_builder_id(),
-                                                                _sink.get());
+        sink_ = std::make_shared<ResultFileSinkOperatorBuilder>(
+                thrift_sink.result_file_sink.dest_node_id, _sink.get());
         break;
     }
     case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index c44cec514c..a4eac4a93d 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -109,15 +109,18 @@ Status PipelineTask::prepare(RuntimeState* state) {
         RETURN_IF_ERROR(o->prepare(state));
     }
 
-    _task_profile->add_info_string("Sink", fmt::format("{}({})", 
_sink->get_name(), _sink->id()));
+    _task_profile->add_info_string("Sink",
+                                   fmt::format("{}(dst_id={})", 
_sink->get_name(), _sink->id()));
     fmt::memory_buffer operator_ids_str;
     for (size_t i = 0; i < _operators.size(); i++) {
         if (i == 0) {
-            fmt::format_to(operator_ids_str,
-                           fmt::format("[{}({})", _operators[i]->get_name(), 
_operators[i]->id()));
+            fmt::format_to(
+                    operator_ids_str,
+                    fmt::format("[{}(node_id={})", _operators[i]->get_name(), 
_operators[i]->id()));
         } else {
             fmt::format_to(operator_ids_str,
-                           fmt::format(", {}({})", _operators[i]->get_name(), 
_operators[i]->id()));
+                           fmt::format(", {}(node_id={})", 
_operators[i]->get_name(),
+                                       _operators[i]->id()));
         }
     }
     fmt::format_to(operator_ids_str, "]");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to