924060929 commented on code in PR #64793:
URL: https://github.com/apache/doris/pull/64793#discussion_r3481432797


##########
fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java:
##########
@@ -344,31 +344,65 @@ public Pair<PlanNode, LocalExchangeType> 
enforceAndDeriveLocalExchange(
             // For a non-serial probe without the flag: propagate the probe's 
distribution.
             outputType = probePassthrough ? LocalExchangeType.PASSTHROUGH : 
null;
         } else if (isColocate() || isBucketShuffle()) {
-            // Both probe and build sides require BUCKET_HASH_SHUFFLE: the 
bucket distribution
-            // must be preserved on both inputs. A serial child on either side 
is handled the
-            // same way (serial exchange returns NOOP → enforceRequire() 
inserts the LE).
-            probeSideRequire = LocalExchangeTypeRequire.requireBucketHash();
-            // For BUCKET_SHUFFLE with serial build child: use 
requireBucketHash() (not
-            // requirePassToOne()). Unlike BROADCAST joins, BUCKET_SHUFFLE has 
no shared
-            // hash table mechanism — PASS_TO_ONE routes all data to task 0 
while tasks 1..N-1
-            // build empty hash tables, losing rows. BUCKET_HASH_SHUFFLE 
correctly distributes
-            // build data by bucket to match the probe side's bucket 
distribution.
-            // The serial exchange returns NOOP, so enforceRequire() will 
insert a
-            // BUCKET_HASH_SHUFFLE local exchange (with PASSTHROUGH fan-out 
for heavy-ops
-            // bottleneck avoidance).
-            buildSideRequire = LocalExchangeTypeRequire.requireBucketHash();
-            outputType = AddLocalExchange.resolveExchangeType(
-                    LocalExchangeTypeRequire.requireBucketHash());
+            if (canUpgradeBucketToLocalHash(translatorContext, parentRequire)) 
{
+                // Bucket → local-hash parallelism upgrade (bucket-to-hash 
upgrade): the fragment
+                // has noticeably more instances than buckets-with-data (see
+                // AddLocalExchange.isBucketUpgradeEligible) and nothing above 
this join needs
+                // bucket alignment — re-distribute both sides by their 
distribute keys with
+                // LOCAL_EXECUTION_HASH_SHUFFLE so the join runs at full 
instance parallelism
+                // instead of being capped at bucket count.  The LE keys come 
from
+                // childrenDistributeExprLists (pairwise-aligned per side, a 
subset of the
+                // equi-join keys), so both sides keep hashing the same values 
and the
+                // per-instance build/probe pairing stays correct.
+                //
+                // requireSpecific (not requireHash) on purpose: the children's
+                // BUCKET_HASH_SHUFFLE output must NOT satisfy this require, 
otherwise no LE
+                // is inserted and the join stays bucket-capped.
+                //
+                // Mark direct children so a stacked bucket join below keeps 
its BUCKET
+                // requires: if it also upgraded, its LOCAL hash output (keyed 
by ITS join
+                // keys) would type-satisfy our 
requireSpecific(LOCAL_EXECUTION_HASH) and
+                // suppress the LE that re-aligns data to OUR keys → wrong 
results.
+                
translatorContext.setHasBucketUpgradedAncestor(children.get(0), true);

Review Comment:
   Thanks — I looked into this carefully. The marker-propagation gap you 
spotted is real, but I do not think it can produce a wrong result, because the 
scenario needs the inner and outer joins' **probe-side** keys to differ, and 
for two stacked bucket-shuffle joins in the **same pooled fragment** they 
cannot.
   
   The fragment's distribution is the bucketed scan's (say `fact` bucketed by 
`k`). Any bucket-shuffle join in that fragment must shuffle its other side to 
match `fact`'s buckets, i.e. join on `fact.k`. So every stacked level's probe 
side is distributed by `fact.k`. `EXPLAIN VERBOSE` on a 3-table stacked bucket 
join confirms it:
   
   ```
   6:VHASH JOIN  join op: INNER JOIN(BUCKET_SHUFFLE)   -- outer
   |  distribute expr lists: k[#13]    -- probe side
   |  distribute expr lists: k[#3]     -- build side (p2.k)
   5:VHASH JOIN  join op: INNER JOIN(BUCKET_SHUFFLE)   -- inner
   |  distribute expr lists: k[#10]    -- probe side
   |  distribute expr lists: k[#7]     -- build side (p1.k)
   ```
   
   Outer probe key `k[#13]` and inner probe key `k[#10]` are the same column 
(`fact.k`) carried up under different slot ids — same values, same hash 
distribution. So once the inner join upgrades, its 
`LOCAL_EXECUTION_HASH_SHUFFLE(fact.k)` output is exactly what the outer probe 
needs; the outer's re-align LE would re-hash `fact.k` by `fact.k`, i.e. it is 
redundant. A transparent wrapper that lets that redundant LE be skipped 
therefore cannot slice rows by a different key — they are already keyed by 
`fact.k`.
   
   The build sides do differ (`p1.k` vs `p2.k`), but each join's build arrives 
via its own cross-fragment network exchange (independently bucket-shuffled to 
`fact`'s buckets); there is no stacking and the marker does not cross fragment 
boundaries, so each build is aligned on its own.
   
   One caveat on the proposed fix: propagating the ancestor flag through the 
wrapper would actually produce a worse plan — with the flag set, the lower join 
claims NOOP and the LE then gets inserted at the *wrapper's* enforceRequire 
using the wrapper's (empty) distribute exprs, i.e. a keyless LOCAL hash 
exchange between wrapper and lower join, rather than the outer join re-aligning 
on its own keys.
   
   So I am treating the marker as defensive (the forced re-align is redundant 
given the shared bucket key) and not changing it here. Happy to revisit if 
there is a concrete shape where two stacked bucket-shuffle joins end up with 
different probe-side keys — I could not construct one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to