This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ad1313cce67 [refactor](pipelineX) refine _build_side_pipelines (#25871)
ad1313cce67 is described below

commit ad1313cce67e6342f253bed0da6fe0ca42197492
Author: Mryange <[email protected]>
AuthorDate: Thu Oct 26 10:32:23 2023 +0800

    [refactor](pipelineX) refine _build_side_pipelines (#25871)
---
 .../pipeline_x/pipeline_x_fragment_context.cpp     | 38 +++++-----------------
 .../pipeline_x/pipeline_x_fragment_context.h       | 25 +++++++++++---
 2 files changed, 28 insertions(+), 35 deletions(-)

diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index a1f69e6f02b..78a888af9df 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -473,9 +473,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
                      _runtime_states[i].get()});
         }
     }
-    _build_side_pipelines.clear();
-    _union_child_pipelines.clear();
-    _set_child_pipelines.clear();
+    _pipeline_parent_map.clear();
     _dag.clear();
     _op_id_to_le_state.clear();
 
@@ -589,16 +587,7 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
                                                   const DescriptorTbl& descs, 
OperatorXPtr& op,
                                                   PipelinePtr& cur_pipe, int 
parent_idx,
                                                   int child_idx) {
-    if (_build_side_pipelines.find(parent_idx) != _build_side_pipelines.end() 
&& child_idx > 0) {
-        cur_pipe = _build_side_pipelines[parent_idx];
-    }
-    if (_union_child_pipelines.find(parent_idx) != 
_union_child_pipelines.end()) {
-        cur_pipe = _union_child_pipelines[parent_idx][child_idx];
-    }
-    if (_set_child_pipelines.find(parent_idx) != _set_child_pipelines.end()) {
-        cur_pipe = _set_child_pipelines[parent_idx][child_idx];
-    }
-
+    _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
     std::stringstream error_msg;
     switch (tnode.node_type) {
     case TPlanNodeType::OLAP_SCAN_NODE: {
@@ -704,7 +693,8 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
         sink->set_dests_id({op->operator_id()});
         RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
         RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, 
_runtime_state.get()));
-        _build_side_pipelines.insert({sink->node_id(), build_side_pipe});
+        _pipeline_parent_map.push(op->node_id(), cur_pipe);
+        _pipeline_parent_map.push(op->node_id(), build_side_pipe);
         break;
     }
     case TPlanNodeType::CROSS_JOIN_NODE: {
@@ -723,7 +713,8 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
         sink->set_dests_id({op->operator_id()});
         RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
         RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, 
_runtime_state.get()));
-        _build_side_pipelines.insert({sink->node_id(), build_side_pipe});
+        _pipeline_parent_map.push(op->node_id(), cur_pipe);
+        _pipeline_parent_map.push(op->node_id(), build_side_pipe);
         break;
     }
     case TPlanNodeType::UNION_NODE: {
@@ -735,7 +726,6 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
         if (_dag.find(downstream_pipeline_id) == _dag.end()) {
             _dag.insert({downstream_pipeline_id, {}});
         }
-        int father_id = tnode.node_id;
         for (int i = 0; i < child_count; i++) {
             PipelinePtr build_side_pipe = add_pipeline();
             _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
@@ -745,13 +735,8 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
             RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
             RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, 
_runtime_state.get()));
             // preset children pipelines. if any pipeline found this as its 
father, will use the prepared pipeline to build.
-            if (_union_child_pipelines.find(father_id) == 
_union_child_pipelines.end()) {
-                _union_child_pipelines.insert({father_id, {build_side_pipe}});
-            } else {
-                _union_child_pipelines[father_id].push_back(build_side_pipe);
-            }
+            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
         }
-
         break;
     }
     case TPlanNodeType::SORT_NODE: {
@@ -877,8 +862,6 @@ Status 
PipelineXFragmentContext::_build_operators_for_set_operation_node(
         _dag.insert({downstream_pipeline_id, {}});
     }
 
-    int parent_id = tnode.node_id;
-
     for (int child_id = 0; child_id < tnode.num_children; child_id++) {
         PipelinePtr probe_side_pipe = add_pipeline();
         _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
@@ -895,12 +878,7 @@ Status 
PipelineXFragmentContext::_build_operators_for_set_operation_node(
         RETURN_IF_ERROR(probe_side_pipe->set_sink(sink));
         RETURN_IF_ERROR(probe_side_pipe->sink_x()->init(tnode, 
_runtime_state.get()));
         // prepare children pipelines. if any pipeline found this as its 
father, will use the prepared pipeline to build.
-        if (child_id == 0) {
-            DCHECK(_set_child_pipelines.find(parent_id) == 
_set_child_pipelines.end());
-            _set_child_pipelines.insert({parent_id, {probe_side_pipe}});
-        } else {
-            _set_child_pipelines[parent_id].push_back(probe_side_pipe);
-        }
+        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
     }
 
     return Status::OK();
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 45eb7f48cc5..4d2a59277e9 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -175,14 +175,29 @@ private:
     // build probe operator and build operator in separate pipelines. To do 
this, we should build
     // ProbeSide first, and use `_pipelines_to_build` to store which pipeline 
the build operator
     // is in, so we can build BuildSide once we complete probe side.
-    std::map<int, PipelinePtr> _build_side_pipelines;
+    struct pipeline_parent_map {
+        std::map<int, std::vector<PipelinePtr>> _build_side_pipelines;
+        void push(int parent_node_id, PipelinePtr pipeline) {
+            if (!_build_side_pipelines.contains(parent_node_id)) {
+                _build_side_pipelines.insert({parent_node_id, {pipeline}});
+            } else {
+                _build_side_pipelines[parent_node_id].push_back(pipeline);
+            }
+        }
+        void pop(PipelinePtr& cur_pipe, int parent_node_id, int child_idx) {
+            if (!_build_side_pipelines.contains(parent_node_id)) {
+                return;
+            }
+            DCHECK(_build_side_pipelines.contains(parent_node_id));
+            auto& child_pipeline = _build_side_pipelines[parent_node_id];
+            DCHECK(child_idx < child_pipeline.size());
+            cur_pipe = child_pipeline[child_idx];
+        }
+        void clear() { _build_side_pipelines.clear(); }
+    } _pipeline_parent_map;
 
     std::map<UniqueId, RuntimeState*> _instance_id_to_runtime_state;
     std::mutex _state_map_lock;
-
-    // TODO: Unify `_union_child_pipelines`, `_set_child_pipelines`, 
`_build_side_pipelines`.
-    std::map<int, std::vector<PipelinePtr>> _union_child_pipelines;
-    std::map<int, std::vector<PipelinePtr>> _set_child_pipelines;
     // We can guarantee that a plan node ID can correspond to an operator ID,
     // but some operators do not have a corresponding plan node ID.
     // We set these IDs as negative numbers, which are not visible to the user.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to