924060929 opened a new pull request, #63366:
URL: https://github.com/apache/doris/pull/63366
Previously, local exchange (LE) nodes were inserted exclusively by the
BE's `_plan_local_exchange` at pipeline build time. The FE had no
visibility into which operators needed a fan-out or shuffle before
execution, making it impossible to validate, optimize, or override LE
decisions at planning time.
This PR introduces a full FE-side local exchange planner that mirrors
BE semantics, brings several correctness fixes, and leaves the legacy
BE path fully intact behind a feature flag. See "Current architecture
notes" at the bottom for what the FE planner does and does not own.
A new `AddLocalExchange` pass runs after normal fragment assignment.
It walks each fragment's plan tree bottom-up, calling the polymorphic
`PlanNode.enforceAndDeriveLocalExchange()` on every node. Nodes
declare what distribution they require of their children; the framework
inserts `LocalExchangeNode` where needed.
`LocalExchangeNode` represents intra-fragment data redistribution and
supports the full set of exchange types: PASSTHROUGH, HASH_SHUFFLE,
BUCKET_HASH_SHUFFLE, GLOBAL_EXECUTION_HASH_SHUFFLE, BROADCAST,
PASS_TO_ONE, ADAPTIVE_PASSTHROUGH, LOCAL_MERGE_SORT, and NOOP.
The pass is guarded by `enable_local_shuffle_planner` (default true).
When disabled, BE continues to run its own `_plan_local_exchange` as
before, keeping the old path fully intact.
`maxPerBeInstances` (max pipeline instances assigned to any single BE)
is used instead of a global `instanceCount`. Planning is a no-op when
`maxPerBeInstances == 1` — inserting LE on a single-threaded pipeline
would cause task-count mismatches and pipeline starvation.
When a serial operator (e.g. OlapScanNode with a single tablet bucket)
feeds a non-serial parent without an intermediate LE, downstream tasks
starve waiting for data that never arrives. The framework detects this
case and inserts a PASSTHROUGH LE to restore N-task parallelism, exactly
matching BE's `required_data_distribution()` serial → PASSTHROUGH rule.
`LocalExchangeTypeRequire` abstracts two strategies:
- `RequireHash` — always resolves to `LOCAL_EXECUTION_HASH_SHUFFLE`
(safe for intra-fragment hash partitioning).
- `RequireSpecific` — preserves BUCKET_HASH_SHUFFLE /
GLOBAL_EXECUTION_HASH_SHUFFLE without degradation.
PR #62438 added `enable_local_exchange_before_agg`, but its BE guard
`!_needs_finalize && !enable_local_exchange_before_agg → base` conflated
two semantically different cases in AggSink and DistinctStreamingAgg:
- **AggSink**: `!finalize && hasKeys` covered both LOCAL preagg
(performance-only) and FIRST_MERGE dedup (correctness-critical).
The flag-gated early-return wrongly skipped HASH for FIRST_MERGE,
producing PASSTHROUGH-over-serial-child → wrong aggregation results.
- **DistinctStreamingAgg**: `!finalize` covered both streaming preagg
(`useStreamingPreagg=true`, performance) and non-streaming dedup
(`useStreamingPreagg=false`, correctness). Same class of bug.
FE fix:
- AggSink: restrict the flag-gated base path to `!isMerge()` LOCAL
phases. FIRST_MERGE always emits HASH regardless of the flag.
- DistinctStreamingAgg: restrict to `useStreamingPreagg=true`.
Non-streaming dedup always emits HASH.
Also add `requiresShuffleForCorrectness()` to mirror BE's
`is_shuffled_operator()`, so SetOperationNode propagates the
"downstream depends on hash" flag correctly instead of using the
coarser `parentRequire.preferType().isHashShuffle()` check that
over-inserted HASH LE on every union branch under a streaming preagg.
These fixes reduce FE/BE consistency mismatches from 8 to 3
(only pre-existing NLJ optimization differences remain).
- `enable_local_shuffle_planner` — use FE planner (default true)
- `enable_local_shuffle` — master switch for local shuffle
- `enable_local_exchange_before_agg` — HASH LE before non-final agg
(default true, mirrors apache/doris#62438)
`validateNoSerialWithoutLocalExchange()` walks the final plan tree and
logs a warning whenever a serial operator feeds a non-serial parent
without an intermediate LocalExchangeNode, catching planning gaps
before execution.
- `test_enable_local_exchange_before_agg.groovy` — 10 agg patterns
with the flag on and off; covers the FIRST_MERGE and
DistinctStreamingAgg correctness fixes.
- `test_local_shuffle_fe_be_consistency.groovy` — runs the same SQL
with `enable_local_shuffle_planner=true` and `=false` across the
full operator matrix (Agg, Sort, Analytic, HashJoin, NLJ, Set, Union,
TableFunction, AssertNumRows, RQG-derived corner cases) and asserts
result rows are identical. Only data correctness is asserted — the
two planners legitimately differ on the exact exchange counts/types
they emit, so plan-shape equality is intentionally not checked.
- `test_local_shuffle_rqg_bugs.groovy` — reproduces 20+ RQG-found
crashes and wrong-result cases.
- `test_old_coordinator_local_shuffle.groovy` — verifies the old
coordinator path is unaffected.
- `test_multilevel_join_agg_local_shuffle.groovy` — multi-level join
and aggregation plan shapes.
- `multi_version.h`: replace `atomic_load/atomic_store` (deprecated in
libstdc++ C++20 / LLVM 20) with `std::shared_mutex`-based RW locking.
- `memory.cpp`: fix `std::max` type mismatch (`long` vs `int64_t`)
on macOS.
- `bucketed_aggregation_sink_operator.h`: fix `ExchangeType::NOOP` →
`TLocalPartitionType::NOOP` after thrift enum rename.
This PR puts the FE planner in the driver's seat for LE insertion but
intentionally does NOT remove the BE-side machinery — readers should be
aware of three pieces the FE planner shares with or defers to BE:
1. **`is_serial_operator` is computed on both sides.** FE computes the
flag and writes it into Thrift, but BE's
`OperatorBase::is_serial_operator()` is still overridden per operator
in C++ and used for BE-side runtime decisions. Any future change to
the BE override needs to be mirrored on the FE side (and vice versa)
to keep the planner's view consistent with execution.
2. **The legacy BE planner stays as a fallback.**
`pipeline_fragment_context.cpp::_plan_local_exchange` is preserved
and gated by `runtime_state.h::plan_local_shuffle()`: when
`enable_local_shuffle_planner=false`, BE plans LE itself, exactly as
before. The two paths are mutually exclusive, never both running on
the same query.
3. **`_propagate_local_exchange_num_tasks` is kept as a runtime safety
net.** The two propagation passes in
`pipeline_fragment_context.cpp` fix up paired pipelines whose
`num_tasks` end up mismatched (e.g. when AGG/SORT/JOIN pipeline
splits leave a serial Exchange feeding an N-task sink). FE's
framework-level serial→non-serial fan-out (`enforceRequire` step 3)
and the `validateNoSerialWithoutLocalExchange` check aim to make
these mismatches impossible by construction, but the BE-side fixup
remains as a defensive guard.
Co-authored-by: Gabriel <[email protected]>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]