This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch fe_local_shuffle
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 80541b0dd93fee5cdf8d4166a695c84b3b0e9960
Author: 924060929 <[email protected]>
AuthorDate: Fri Mar 27 18:12:28 2026 +0800

    [fix](local shuffle) remove incorrect upstream num_tasks raise in deferred 
exchanger creation
    
    _create_deferred_local_exchangers() was unconditionally raising the upstream
    pipeline's num_tasks to _num_instances before creating each exchanger. This
    was wrong for fan-out exchanges (PASSTHROUGH, BROADCAST, PASS_TO_ONE) above
    a serial/pooling scan, where the upstream should keep num_tasks=1 (one 
actual
    sender), not be inflated to _num_instances.
    
    The raise was never needed: _create_tree_helper already sets upstream
    num_tasks correctly via Pipeline::add_operator — serial operators call
    set_num_tasks(_parallel_instances), non-serial operators inherit the value
    from add_pipeline(). By the time _create_deferred_local_exchangers runs,
    sender_count is already accurate.
    
    The incorrect raise caused FE-planned PASSTHROUGH fan-out pipelines to have
    _num_instances sink tasks instead of 1, producing profile counts that were
    (_num_instances - 1) higher than BE-native planning. This was caught by the
    agg_finalize_serial_pooling_bucket and 
agg_finalize_serial_pooling_non_bucket
    regression cases in test_local_shuffle_fe_be_consistency.groovy.
---
 be/src/exec/pipeline/pipeline_fragment_context.cpp | 12 ++++--------
 1 file changed, 4 insertions(+), 8 deletions(-)

diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp 
b/be/src/exec/pipeline/pipeline_fragment_context.cpp
index c935e157058..29a9a04c477 100644
--- a/be/src/exec/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp
@@ -668,14 +668,10 @@ Status 
PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip
 
 Status PipelineFragmentContext::_create_deferred_local_exchangers() {
     for (auto& info : _deferred_exchangers) {
-        // Mirror _inherit_pipeline_properties() from the native 
_add_local_exchange_impl path:
-        // the upstream (SINK) pipeline should run with _num_instances tasks, 
not the
-        // serial-scan parallelism (_parallel_instances=1 for pooling scan).  
At this point
-        // all operators have been added, so this is the final task-count 
assignment and
-        // cannot be overridden by a later add_operator call.
-        if (info.upstream_pipe->num_tasks() < _num_instances) {
-            info.upstream_pipe->set_num_tasks(_num_instances);
-        }
+        // upstream_pipe->num_tasks() is already the correct sender count:
+        // _create_tree_helper sets it via add_operator (serial operators call 
set_num_tasks with
+        // _parallel_instances, non-serial operators leave it at the value 
inherited from
+        // add_pipeline).  No adjustment is needed here.
         const int sender_count = info.upstream_pipe->num_tasks();
         switch (info.partition_type) {
         case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to