This is an automated email from the ASF dual-hosted git repository. 924060929 pushed a commit to branch fe_local_shuffle_rebase_wip in repository https://gitbox.apache.org/repos/asf/doris.git
commit 133432cbe59f61e4ececf1cf23063955938bd379 Author: 924060929 <[email protected]> AuthorDate: Wed May 20 17:38:33 2026 +0800 [doc](local shuffle) DANGER ZONE on sender_count to prevent repeated mistakes Two recent attempts to "fix" _create_deferred_local_exchangers' sender_count have regressed CI: - ef4ea66a66f changed it to std::max(num_tasks, _num_instances) reasoning about build 948971's _running_sink_operators=-5 hang. That broke MTMV refresh in build 949402 (LE id=5 PASSTHROUGH, _num_senders=6, only 1 close arrived, hang). Reverted in c848ecf08da. Add a multi-line DANGER ZONE comment right above the assignment that documents the formula, the two failed alternatives with the case names that regressed, why the BE-planned _add_local_exchange_impl analog is not actually analogous, and where to look first if a future hang has _running_sink_operators < 0. No code change — the comment is the patch. --- be/src/exec/pipeline/pipeline_fragment_context.cpp | 28 ++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp b/be/src/exec/pipeline/pipeline_fragment_context.cpp index 1cbc06f5e2d..05cf647a5be 100644 --- a/be/src/exec/pipeline/pipeline_fragment_context.cpp +++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp @@ -718,6 +718,34 @@ Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip Status PipelineFragmentContext::_create_deferred_local_exchangers() { for (auto& info : _deferred_exchangers) { + // DANGER ZONE — do not "fix" this line without reading the history. + // + // sender_count seeds Exchanger::_running_sink_operators, which the source side + // waits to reach 0 via sub_running_sink_operators on each sink LocalState close. + // The correct value is THIS pipeline-instance's sink task count, which is exactly + // info.upstream_pipe->num_tasks() — one PipelineTask per task, one close per task. + // + // Tempting wrong fix #1: `std::max(num_tasks, _num_instances)` to mirror the + // BE-planned path in _add_local_exchange_impl (~line 1023). THIS BREAKS the + // common FE-planned shape of `serial scan → LE(PT) → ...`: upstream_pipe + // genuinely has num_tasks=1, only 1 close arrives, but seed becomes + // _num_instances so _running_sink_operators never reaches 0 — downstream + // sources hang on SHUFFLE_DATA_DEPENDENCY (e.g. MTMV refresh from + // mtmv_up_down_job_p0/load.groovy stays at Status=RUNNING; build 949402 + // regressed exactly this way). BE-planned mode uses max() because its + // `cur_pipe` is the source-side pipeline (always raised to _num_instances by + // add_pipeline) — not analogous to our `upstream_pipe` here, which is the + // sink-side pipeline that may legitimately stay at 1 for serial sources. + // + // Tempting wrong fix #2: multiply by _num_instances on the theory shared_state + // is shared across all instances. Same hang — each fragment-instance + // PipelineFragmentContext has its OWN _op_id_to_shared_state map, so the + // exchanger is per-instance, not per-BE. num_tasks() is already the right + // close-count for one instance. + // + // If a hang shows up with `_running_sink_operators < 0`, the bug is upstream: + // _propagate_local_exchange_num_tasks left num_tasks too low (or too high) for + // this fragment shape. Fix THAT pass, not this seed value. const int sender_count = info.upstream_pipe->num_tasks(); switch (info.partition_type) { case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE: --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
