924060929 opened a new pull request, #63366:
URL: https://github.com/apache/doris/pull/63366

   Previously, local exchange (LE) nodes were inserted exclusively by the
   BE's `_plan_local_exchange` at pipeline build time.  The FE had no
   visibility into which operators needed a fan-out or shuffle before
   execution, making it impossible to validate, optimize, or override LE
   decisions at planning time.
   
   This PR introduces a full FE-side local exchange planner that mirrors
   BE semantics, brings several correctness fixes, and leaves the legacy
   BE path fully intact behind a feature flag.  See "Current architecture
   notes" at the bottom for what the FE planner does and does not own.
   
   A new `AddLocalExchange` pass runs after normal fragment assignment.
   It walks each fragment's plan tree bottom-up, calling the polymorphic
   `PlanNode.enforceAndDeriveLocalExchange()` on every node.  Nodes
   declare what distribution they require of their children; the framework
   inserts `LocalExchangeNode` where needed.
   
   `LocalExchangeNode` represents intra-fragment data redistribution and
   supports the full set of exchange types: PASSTHROUGH, HASH_SHUFFLE,
   BUCKET_HASH_SHUFFLE, GLOBAL_EXECUTION_HASH_SHUFFLE, BROADCAST,
   PASS_TO_ONE, ADAPTIVE_PASSTHROUGH, LOCAL_MERGE_SORT, and NOOP.
   
   The pass is guarded by `enable_local_shuffle_planner` (default true).
   When disabled, BE continues to run its own `_plan_local_exchange` as
   before, keeping the old path fully intact.
   
   `maxPerBeInstances` (max pipeline instances assigned to any single BE)
   is used instead of a global `instanceCount`.  Planning is a no-op when
   `maxPerBeInstances == 1` — inserting LE on a single-threaded pipeline
   would cause task-count mismatches and pipeline starvation.
   
   When a serial operator (e.g. OlapScanNode with a single tablet bucket)
   feeds a non-serial parent without an intermediate LE, downstream tasks
   starve waiting for data that never arrives.  The framework detects this
   case and inserts a PASSTHROUGH LE to restore N-task parallelism, exactly
   matching BE's `required_data_distribution()` serial → PASSTHROUGH rule.
   
   `LocalExchangeTypeRequire` abstracts two strategies:
   - `RequireHash` — always resolves to `LOCAL_EXECUTION_HASH_SHUFFLE`
     (safe for intra-fragment hash partitioning).
   - `RequireSpecific` — preserves BUCKET_HASH_SHUFFLE /
     GLOBAL_EXECUTION_HASH_SHUFFLE without degradation.
   
   PR #62438 added `enable_local_exchange_before_agg`, but its BE guard
   `!_needs_finalize && !enable_local_exchange_before_agg → base` conflated
   two semantically different cases in AggSink and DistinctStreamingAgg:
   
   - **AggSink**: `!finalize && hasKeys` covered both LOCAL preagg
     (performance-only) and FIRST_MERGE dedup (correctness-critical).
     The flag-gated early-return wrongly skipped HASH for FIRST_MERGE,
     producing PASSTHROUGH-over-serial-child → wrong aggregation results.
   
   - **DistinctStreamingAgg**: `!finalize` covered both streaming preagg
     (`useStreamingPreagg=true`, performance) and non-streaming dedup
     (`useStreamingPreagg=false`, correctness).  Same class of bug.
   
   FE fix:
   - AggSink: restrict the flag-gated base path to `!isMerge()` LOCAL
     phases.  FIRST_MERGE always emits HASH regardless of the flag.
   - DistinctStreamingAgg: restrict to `useStreamingPreagg=true`.
     Non-streaming dedup always emits HASH.
   
   Also add `requiresShuffleForCorrectness()` to mirror BE's
   `is_shuffled_operator()`, so SetOperationNode propagates the
   "downstream depends on hash" flag correctly instead of using the
   coarser `parentRequire.preferType().isHashShuffle()` check that
   over-inserted HASH LE on every union branch under a streaming preagg.
   
   These fixes reduce FE/BE consistency mismatches from 8 to 3
   (only pre-existing NLJ optimization differences remain).
   
   - `enable_local_shuffle_planner` — use FE planner (default true)
   - `enable_local_shuffle` — master switch for local shuffle
   - `enable_local_exchange_before_agg` — HASH LE before non-final agg
     (default true, mirrors apache/doris#62438)
   
   `validateNoSerialWithoutLocalExchange()` walks the final plan tree and
   logs a warning whenever a serial operator feeds a non-serial parent
   without an intermediate LocalExchangeNode, catching planning gaps
   before execution.
   
   - `test_enable_local_exchange_before_agg.groovy` — 10 agg patterns
     with the flag on and off; covers the FIRST_MERGE and
     DistinctStreamingAgg correctness fixes.
   - `test_local_shuffle_fe_be_consistency.groovy` — runs the same SQL
     with `enable_local_shuffle_planner=true` and `=false` across the
     full operator matrix (Agg, Sort, Analytic, HashJoin, NLJ, Set, Union,
     TableFunction, AssertNumRows, RQG-derived corner cases) and asserts
     result rows are identical.  Only data correctness is asserted — the
     two planners legitimately differ on the exact exchange counts/types
     they emit, so plan-shape equality is intentionally not checked.
   - `test_local_shuffle_rqg_bugs.groovy` — reproduces 20+ RQG-found
     crashes and wrong-result cases.
   - `test_old_coordinator_local_shuffle.groovy` — verifies the old
     coordinator path is unaffected.
   - `test_multilevel_join_agg_local_shuffle.groovy` — multi-level join
     and aggregation plan shapes.
   
   - `multi_version.h`: replace `atomic_load/atomic_store` (deprecated in
     libstdc++ C++20 / LLVM 20) with `std::shared_mutex`-based RW locking.
   - `memory.cpp`: fix `std::max` type mismatch (`long` vs `int64_t`)
     on macOS.
   - `bucketed_aggregation_sink_operator.h`: fix `ExchangeType::NOOP` →
     `TLocalPartitionType::NOOP` after thrift enum rename.
   
   This PR puts the FE planner in the driver's seat for LE insertion but
   intentionally does NOT remove the BE-side machinery — readers should be
   aware of three pieces the FE planner shares with or defers to BE:
   
   1. **`is_serial_operator` is computed on both sides.**  FE computes the
      flag and writes it into Thrift, but BE's
      `OperatorBase::is_serial_operator()` is still overridden per operator
      in C++ and used for BE-side runtime decisions.  Any future change to
      the BE override needs to be mirrored on the FE side (and vice versa)
      to keep the planner's view consistent with execution.
   
   2. **The legacy BE planner stays as a fallback.**
      `pipeline_fragment_context.cpp::_plan_local_exchange` is preserved
      and gated by `runtime_state.h::plan_local_shuffle()`: when
      `enable_local_shuffle_planner=false`, BE plans LE itself, exactly as
      before.  The two paths are mutually exclusive, never both running on
      the same query.
   
   3. **`_propagate_local_exchange_num_tasks` is kept as a runtime safety
      net.**  The two propagation passes in
      `pipeline_fragment_context.cpp` fix up paired pipelines whose
      `num_tasks` end up mismatched (e.g. when AGG/SORT/JOIN pipeline
      splits leave a serial Exchange feeding an N-task sink).  FE's
      framework-level serial→non-serial fan-out (`enforceRequire` step 3)
      and the `validateNoSerialWithoutLocalExchange` check aim to make
      these mismatches impossible by construction, but the BE-side fixup
      remains as a defensive guard.
   
   Co-authored-by: Gabriel <[email protected]>
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to