This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch tpc_preview6
in repository https://gitbox.apache.org/repos/asf/doris.git

commit ff0972f4c83b043eb6ee949350cace453850769c
Author: englefly <[email protected]>
AuthorDate: Thu Feb 5 20:39:17 2026 +0800

    support count pushdown. tpcds 14/ tpch13
    ds14 增加了agg push,执行时间 4.7 -> 4.8
    h13 增加了 agg push,应该让p6 恢复到 p4 的成绩,从10sec 恢复到 7 sec
---
 be/src/pipeline/exec/operator.cpp             |  4 ++
 be/src/pipeline/pipeline_fragment_context.cpp | 57 ++++++++++++++++++++++++---
 2 files changed, 55 insertions(+), 6 deletions(-)

diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index 26b63443dd6..5ef03bc5d07 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -62,6 +62,8 @@
 #include "pipeline/exec/partitioned_aggregation_source_operator.h"
 #include "pipeline/exec/partitioned_hash_join_probe_operator.h"
 #include "pipeline/exec/partitioned_hash_join_sink_operator.h"
+#include "pipeline/exec/queue_sink_operator.h"
+#include "pipeline/exec/queue_source_operator.h"
 #include "pipeline/exec/rec_cte_anchor_sink_operator.h"
 #include "pipeline/exec/rec_cte_scan_operator.h"
 #include "pipeline/exec/rec_cte_sink_operator.h"
@@ -809,6 +811,7 @@ DECLARE_OPERATOR(SetSinkLocalState<false>)
 DECLARE_OPERATOR(PartitionedHashJoinSinkLocalState)
 DECLARE_OPERATOR(GroupCommitBlockSinkLocalState)
 DECLARE_OPERATOR(CacheSinkLocalState)
+DECLARE_OPERATOR(QueueSinkLocalState)
 DECLARE_OPERATOR(DictSinkLocalState)
 DECLARE_OPERATOR(RecCTESinkLocalState)
 DECLARE_OPERATOR(RecCTEAnchorSinkLocalState)
@@ -845,6 +848,7 @@ DECLARE_OPERATOR(MetaScanLocalState)
 DECLARE_OPERATOR(LocalExchangeSourceLocalState)
 DECLARE_OPERATOR(PartitionedHashJoinProbeLocalState)
 DECLARE_OPERATOR(CacheSourceLocalState)
+DECLARE_OPERATOR(QueueSourceLocalState)
 DECLARE_OPERATOR(RecCTESourceLocalState)
 DECLARE_OPERATOR(RecCTEScanLocalState)
 
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 83196022f4e..75627d85b83 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -82,6 +82,8 @@
 #include "pipeline/exec/partitioned_aggregation_source_operator.h"
 #include "pipeline/exec/partitioned_hash_join_probe_operator.h"
 #include "pipeline/exec/partitioned_hash_join_sink_operator.h"
+#include "pipeline/exec/queue_sink_operator.h"
+#include "pipeline/exec/queue_source_operator.h"
 #include "pipeline/exec/rec_cte_anchor_sink_operator.h"
 #include "pipeline/exec/rec_cte_scan_operator.h"
 #include "pipeline/exec/rec_cte_sink_operator.h"
@@ -1314,12 +1316,53 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
                 RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
                 cur_pipe = new_pipe;
             } else {
-                op = std::make_shared<DistinctStreamingAggOperatorX>(
-                        pool, next_operator_id(), tnode, descs, 
_require_bucket_distribution);
-                
op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
-                _require_bucket_distribution =
-                        _require_bucket_distribution || 
op->require_data_distribution();
-                RETURN_IF_ERROR(cur_pipe->add_operator(op, 
_parallel_instances));
+                // Check if parent is SetProbeSinkOperatorX by checking if the 
sink is a SetProbeSinkOperatorX
+                bool parent_is_set_probe = false;
+                if (cur_pipe->sink() != nullptr) {
+                    auto sink = cur_pipe->sink();
+                    // Try to dynamic_cast to both template instances of 
SetProbeSinkOperatorX
+                    if (dynamic_cast<SetProbeSinkOperatorX<true>*>(sink) ||
+                        dynamic_cast<SetProbeSinkOperatorX<false>*>(sink)) {
+                        parent_is_set_probe = true;
+                    }
+                }
+
+                if (parent_is_set_probe) {
+                    // Create QueueSourceOperatorX
+                    auto queue_source_id = next_operator_id();
+                    OperatorPtr queue_source_op = 
std::make_shared<QueueSourceOperatorX>(
+                            pool, tnode.node_id, queue_source_id);
+                    RETURN_IF_ERROR(cur_pipe->add_operator(queue_source_op, 
_parallel_instances));
+
+                    // Create new pipeline for QueueSinkOperatorX
+                    const auto downstream_pipeline_id = cur_pipe->id();
+                    if (!_dag.contains(downstream_pipeline_id)) {
+                        _dag.insert({downstream_pipeline_id, {}});
+                    }
+                    PipelinePtr queue_side_pipe = add_pipeline(cur_pipe);
+                    
_dag[downstream_pipeline_id].push_back(queue_side_pipe->id());
+
+                    // Create QueueSinkOperatorX
+                    auto queue_sink_id = next_sink_operator_id();
+                    DataSinkOperatorPtr queue_sink_op = 
std::make_shared<QueueSinkOperatorX>(
+                            queue_sink_id, queue_source_id, 
queue_source_op->operator_id());
+                    RETURN_IF_ERROR(queue_side_pipe->set_sink(queue_sink_op));
+
+                    // Create DistinctStreamingAggOperatorX in the new pipeline
+                    // Note: we assign to op so that _create_tree_helper will 
set its child correctly
+                    op = std::make_shared<DistinctStreamingAggOperatorX>(pool, 
next_operator_id(),
+                                                                         
tnode, descs, _require_bucket_distribution);
+                    RETURN_IF_ERROR(queue_side_pipe->add_operator(op, 
_parallel_instances));
+                    RETURN_IF_ERROR(queue_source_op->set_child(op));
+                    cur_pipe = queue_side_pipe;
+                } else {
+                    op = std::make_shared<DistinctStreamingAggOperatorX>(pool, 
next_operator_id(),
+                                                                         
tnode, descs, _require_bucket_distribution);
+                    
op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
+                    _require_bucket_distribution =
+                            _require_bucket_distribution || 
op->require_data_distribution();
+                    RETURN_IF_ERROR(cur_pipe->add_operator(op, 
_parallel_instances));
+                }
             }
         } else if (is_streaming_agg) {
             if (need_create_cache_op) {
@@ -1721,6 +1764,7 @@ Status 
PipelineFragmentContext::_build_operators_for_set_operation_node(
         PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
         _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
 
+        // Create appropriate sink operator based on child_id
         DataSinkOperatorPtr sink;
         if (child_id == 0) {
             sink.reset(new SetSinkOperatorX<is_intersect>(child_id, 
next_sink_operator_id(),
@@ -1729,6 +1773,7 @@ Status 
PipelineFragmentContext::_build_operators_for_set_operation_node(
             sink.reset(new SetProbeSinkOperatorX<is_intersect>(
                     child_id, next_sink_operator_id(), op->operator_id(), 
pool, tnode, descs));
         }
+        // Common code for both cases
         sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
         RETURN_IF_ERROR(probe_side_pipe->set_sink(sink));
         RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, 
_runtime_state.get()));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to