github-actions[bot] commented on code in PR #63366:
URL: https://github.com/apache/doris/pull/63366#discussion_r3259079337


##########
fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java:
##########
@@ -1598,6 +1600,12 @@ public enum IgnoreSplitType {
                     "Whether to enable local shuffle on pipelineX engine."}, 
needForward = true)
     private boolean enableLocalShuffle = true;
 
+    @VarAttrDef.VarAttr(
+            name = ENABLE_LOCAL_SHUFFLE_PLANNER, fuzzy = false, varType = 
VariableAnnotation.EXPERIMENTAL,
+            description = {"是否在FE规划Local Shuffle",
+                    "Whether to plan local shuffle in frontend"}, needForward 
= true)
+    private boolean enableLocalShufflePlanner = true;
+

Review Comment:
   This default enables the FE planner to serialize the new 
`LOCAL_EXCHANGE_NODE`/`TLocalPartitionType` protocol to every BE as soon as the 
FE is upgraded. During a rolling upgrade, an old BE does not have 
`TPlanNodeType::LOCAL_EXCHANGE_NODE` handling in `_create_operator` and will 
reject/fail such fragments, while `RuntimeState::plan_local_shuffle()` also 
disables the legacy BE planner because `enable_local_shuffle_planner` is set. 
Please gate this path on BE capability/version (or keep the default off until 
all BEs support the new node) so mixed FE/BE deployments continue to execute 
queries during upgrade.



##########
fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java:
##########
@@ -933,4 +1020,229 @@ private String mergeIcebergAccessPathsWithId(
         }
         return StringUtils.join(mergeDisplayAccessPaths, ", ");
     }
+
+    public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(
+            PlanTranslatorContext translatorContext, PlanNode parent, 
LocalExchangeTypeRequire parentRequire) {
+        ArrayList<PlanNode> newChildren = Lists.newArrayList();
+        for (int i = 0; i < children.size(); i++) {
+            Pair<PlanNode, LocalExchangeType> childOutput
+                    = enforceRequire(translatorContext, children.get(i), i, 
LocalExchangeTypeRequire.noRequire());
+            newChildren.add(childOutput.first);
+        }
+        this.children = newChildren;
+        return Pair.of(this, LocalExchangeType.NOOP);
+    }
+
+    /**
+     * Unified framework method: propagate serial flag → recurse child → 
satisfy check → Layer 1 skip → insert LE.
+     * Replaces the old 
enforceChild/enforceChildExchange/forceEnforceChildExchange trio.
+     *
+     * <h3>Data flow</h3>
+     * <ul>
+     *   <li><b>serial-ancestor flag</b> ({@link 
PlanTranslatorContext#hasSerialAncestorInPipeline})
+     *       — flows root → leaf during traversal.  Mirrors BE's
+     *       {@code any_of(operators[idx..end], is_serial_operator)} check 
used by
+     *       {@code _add_local_exchange} to skip LE insertion when an ancestor 
in the same
+     *       pipeline is already serial.  Reset at pipeline boundaries via
+     *       {@link #shouldResetSerialFlagForChild}.</li>
+     *   <li><b>shuffle-for-correctness flag</b>
+     *       ({@link PlanTranslatorContext#hasShuffleForCorrectnessAncestor}) 
— also flows
+     *       root → leaf.  Mirrors BE's {@code 
_followed_by_shuffled_operator}: tells a
+     *       child whether some downstream operator depends on hash 
distribution for
+     *       correctness, so {@code SetOperationNode} can pre-shuffle union 
branches.</li>
+     *   <li><b>return value</b> {@code Pair<PlanNode, LocalExchangeType>} — 
first is the
+     *       (possibly LE-wrapped) child; second is the actual output 
distribution as
+     *       observed by the parent.  Caller's {@code require.satisfy(output)} 
decides
+     *       whether more LE is needed.</li>
+     *   <li><b>parent.require</b> describes the constraint on the child 
output —
+     *       computed inside the parent's {@code 
enforceAndDeriveLocalExchange} per child.</li>
+     * </ul>
+     *
+     * <h3>Invariants</h3>
+     * <ul>
+     *   <li>Every serial → non-serial transition has an LE somewhere between 
them
+     *       (enforced by framework step 3 below, validated by
+     *       {@link 
AddLocalExchange#validateNoSerialWithoutLocalExchange}).</li>
+     *   <li>{@code LocalExchangeNode} itself is always non-serial — setting 
it serial
+     *       would defeat its purpose of fanning a 1-task pipeline back to N 
tasks.</li>
+     *   <li>For pipeline-breaking parents ({@code 
shouldResetSerialFlagForChild=true}),
+     *       the child starts a fresh pipeline so {@code hasSerialAncestor} is 
reset; the
+     *       node's own {@code isSerialNode()} still composes in for the 
child's view.</li>
+     *   <li>{@code RequireHash} accepts any hash flavour; {@code 
RequireSpecific} demands
+     *       an exact match (with the one PASSTHROUGH/ADAPTIVE_PASSTHROUGH 
compatibility).
+     *       Pick the looser one whenever correctness allows — see
+     *       {@link LocalExchangeNode.LocalExchangeTypeRequire}.</li>
+     * </ul>
+     *
+     * <h3>Layers</h3>
+     * Layer 1 (shouldSkipLE): mirrors BE's need_to_local_exchange — skip when 
this node or
+     * an ancestor in the same pipeline is serial (operators[idx..end] has 
serial → skip).
+     * Layer 2 (require/output): each Node declares require and output in 
enforceAndDeriveLocalExchange.
+     */
+    protected Pair<PlanNode, LocalExchangeType> enforceRequire(
+            PlanTranslatorContext translatorContext, PlanNode child, int 
childIndex,
+            LocalExchangeTypeRequire require) {
+        // 1. Propagate serial-ancestor flag to child.
+        // For pipeline-splitting operators (shouldReset=true, e.g. 
non-streaming AGG):
+        //   Drop inherited serial flag from parent (parent is in a different 
pipeline),
+        //   but keep this node's own serial status (child is in the same 
pipeline as this
+        //   node's sink, e.g. Exchange is in AGG_Sink pipeline).
+        // For non-splitting operators (shouldReset=false, e.g. streaming AGG):
+        //   Inherit parent's serial flag + this node's own.
+        boolean inheritedSerial = shouldResetSerialFlagForChild(childIndex)
+                ? false : translatorContext.hasSerialAncestorInPipeline(this);
+        boolean childHasSerialAncestor = inheritedSerial || isSerialNode();
+        translatorContext.setHasSerialAncestorInPipeline(child, 
childHasSerialAncestor);
+
+        // 1b. Propagate shuffle-for-correctness-ancestor flag to child.
+        // Mirrors BE's _followed_by_shuffled_operator: a downstream operator 
needs hash
+        // distribution for correctness, and the chain to here goes through 
HASH or NOOP
+        // requirements (so the dependency is preserved).
+        //   propagate = ((inheritedShuffled || 
self.requiresShuffleForCorrectness)
+        //                && require is hash)
+        //            || (inheritedShuffled && require is noop/passthrough)
+        boolean inheritedShuffled = 
translatorContext.hasShuffleForCorrectnessAncestor(this);
+        boolean selfOrInheritedShuffled = inheritedShuffled || 
requiresShuffleForCorrectness();
+        boolean requireIsHash = require.preferType().isHashShuffle();
+        boolean requireIsNoop = require.preferType() == 
LocalExchangeNode.LocalExchangeType.NOOP;
+        boolean childShuffledAncestor = (selfOrInheritedShuffled && 
requireIsHash)
+                || (inheritedShuffled && requireIsNoop);
+        translatorContext.setHasShuffleForCorrectnessAncestor(child, 
childShuffledAncestor);
+
+        // 2. Recurse child (Layer 2: child declares its own require/output)
+        Pair<PlanNode, LocalExchangeType> childOutput =
+                child.enforceAndDeriveLocalExchange(translatorContext, this, 
require);
+
+        // Steps 2.5 and 3 both react to a serial child but address different 
concerns:
+        //   - Step 2.5 rewrites the OUTPUT-side view (what we tell 
satisfy/parent about
+        //     the child's actual distribution).  A serial pipeline runs with 
1 task so
+        //     its distribution claim is meaningless — flatten to NOOP so the 
satisfy
+        //     check below doesn't get fooled by a stale "I output 
BUCKET_HASH" claim.
+        //   - Step 3 rewrites the REQUIRE-side decision (what we want from 
the child).
+        //     If we previously asked for nothing (noRequire) but the child 
turns out
+        //     to be serial and we're not, upgrade to requirePassthrough so an 
LE is
+        //     inserted to restore parallelism.
+
+        // 2.5. Serial child override (output side): if child is serial on BE, 
force its
+        //      reported output to NOOP.  Distribution is irrelevant when the 
child runs
+        //      with 1 task; downstream parallelism is restored either by step 
3 (LE
+        //      insertion) or skipped entirely by step 4b (we're also serial).
+        if 
(childOutput.first.isSerialOperatorOnBe(translatorContext.getConnectContext())) 
{
+            childOutput = Pair.of(childOutput.first, LocalExchangeType.NOOP);
+        }
+
+        // 3. Framework-level serial child check (require side, mirrors BE 
base class
+        //    required_data_distribution): if child will be serial on BE but 
this node is
+        //    not serial, the pipeline has a 1-task serial child feeding an 
N-task non-serial
+        //    parent.  Without LE, pipeline splits (AGG/JOIN) create paired 
pipelines with
+        //    mismatched num_tasks → crash.  Upgrade noRequire to 
requirePassthrough so an
+        //    LE is inserted below to restore parallelism.
+        if (require instanceof LocalExchangeNode.NoRequire
+                && 
childOutput.first.isSerialOperatorOnBe(translatorContext.getConnectContext())
+                && 
!isSerialOperatorOnBe(translatorContext.getConnectContext())) {
+            require = LocalExchangeTypeRequire.requirePassthrough();
+        }
+
+        // 4. Satisfy check: child output meets requirement → done
+        if (require.satisfy(childOutput.second)) {
+            return childOutput;
+        }
+
+        // 4. Layer 1: skip LE when serial operator or ancestor in same 
pipeline
+        // Equivalent to BE's need_to_local_exchange: 
any_of(operators[idx..end], is_serial) → skip
+        if (translatorContext.hasSerialAncestorInPipeline(this) || 
isSerialNode()) {
+            return childOutput;

Review Comment:
   This skip uses `isSerialNode()` even though the comment above 
`isSerialOperatorOnBe()` says an `isSerialNode()` only actually runs with one 
BE task when `fragment.useSerialSource(context)` is true. For fragments where 
`useSerialSource` is false (for example 
`ignore_storage_data_distribution=false`, query cache, or NAAJ), a node such as 
a scalar aggregate or unpartitioned exchange can still return 
`isSerialNode()==true` but BE will execute it with normal parallelism 
(`is_serial_operator=false` in thrift). In that case this branch skips a 
required LocalExchange even though BE would not consider the ancestor serial, 
so downstream hash/passthrough requirements can be silently dropped. The 
serial-ancestor propagation at line 1094 has the same issue. Please base these 
planner decisions on 
`isSerialOperatorOnBe(translatorContext.getConnectContext())`, not the 
syntactic `isSerialNode()`, except for the explicitly documented 
heavy-op/local-fragment cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to