This is an automated email from the ASF dual-hosted git repository. huajianlan pushed a commit to branch fe_local_shuffle in repository https://gitbox.apache.org/repos/asf/doris.git
commit 80541b0dd93fee5cdf8d4166a695c84b3b0e9960 Author: 924060929 <[email protected]> AuthorDate: Fri Mar 27 18:12:28 2026 +0800 [fix](local shuffle) remove incorrect upstream num_tasks raise in deferred exchanger creation _create_deferred_local_exchangers() was unconditionally raising the upstream pipeline's num_tasks to _num_instances before creating each exchanger. This was wrong for fan-out exchanges (PASSTHROUGH, BROADCAST, PASS_TO_ONE) above a serial/pooling scan, where the upstream should keep num_tasks=1 (one actual sender), not be inflated to _num_instances. The raise was never needed: _create_tree_helper already sets upstream num_tasks correctly via Pipeline::add_operator — serial operators call set_num_tasks(_parallel_instances), non-serial operators inherit the value from add_pipeline(). By the time _create_deferred_local_exchangers runs, sender_count is already accurate. The incorrect raise caused FE-planned PASSTHROUGH fan-out pipelines to have _num_instances sink tasks instead of 1, producing profile counts that were (_num_instances - 1) higher than BE-native planning. This was caught by the agg_finalize_serial_pooling_bucket and agg_finalize_serial_pooling_non_bucket regression cases in test_local_shuffle_fe_be_consistency.groovy. --- be/src/exec/pipeline/pipeline_fragment_context.cpp | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp b/be/src/exec/pipeline/pipeline_fragment_context.cpp index c935e157058..29a9a04c477 100644 --- a/be/src/exec/pipeline/pipeline_fragment_context.cpp +++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp @@ -668,14 +668,10 @@ Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip Status PipelineFragmentContext::_create_deferred_local_exchangers() { for (auto& info : _deferred_exchangers) { - // Mirror _inherit_pipeline_properties() from the native _add_local_exchange_impl path: - // the upstream (SINK) pipeline should run with _num_instances tasks, not the - // serial-scan parallelism (_parallel_instances=1 for pooling scan). At this point - // all operators have been added, so this is the final task-count assignment and - // cannot be overridden by a later add_operator call. - if (info.upstream_pipe->num_tasks() < _num_instances) { - info.upstream_pipe->set_num_tasks(_num_instances); - } + // upstream_pipe->num_tasks() is already the correct sender count: + // _create_tree_helper sets it via add_operator (serial operators call set_num_tasks with + // _parallel_instances, non-serial operators leave it at the value inherited from + // add_pipeline). No adjustment is needed here. const int sender_count = info.upstream_pipe->num_tasks(); switch (info.partition_type) { case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE: --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
