github-actions[bot] commented on code in PR #64793:
URL: https://github.com/apache/doris/pull/64793#discussion_r3481028593


##########
fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java:
##########
@@ -344,31 +344,65 @@ public Pair<PlanNode, LocalExchangeType> 
enforceAndDeriveLocalExchange(
             // For a non-serial probe without the flag: propagate the probe's 
distribution.
             outputType = probePassthrough ? LocalExchangeType.PASSTHROUGH : 
null;
         } else if (isColocate() || isBucketShuffle()) {
-            // Both probe and build sides require BUCKET_HASH_SHUFFLE: the 
bucket distribution
-            // must be preserved on both inputs. A serial child on either side 
is handled the
-            // same way (serial exchange returns NOOP → enforceRequire() 
inserts the LE).
-            probeSideRequire = LocalExchangeTypeRequire.requireBucketHash();
-            // For BUCKET_SHUFFLE with serial build child: use 
requireBucketHash() (not
-            // requirePassToOne()). Unlike BROADCAST joins, BUCKET_SHUFFLE has 
no shared
-            // hash table mechanism — PASS_TO_ONE routes all data to task 0 
while tasks 1..N-1
-            // build empty hash tables, losing rows. BUCKET_HASH_SHUFFLE 
correctly distributes
-            // build data by bucket to match the probe side's bucket 
distribution.
-            // The serial exchange returns NOOP, so enforceRequire() will 
insert a
-            // BUCKET_HASH_SHUFFLE local exchange (with PASSTHROUGH fan-out 
for heavy-ops
-            // bottleneck avoidance).
-            buildSideRequire = LocalExchangeTypeRequire.requireBucketHash();
-            outputType = AddLocalExchange.resolveExchangeType(
-                    LocalExchangeTypeRequire.requireBucketHash());
+            if (canUpgradeBucketToLocalHash(translatorContext, parentRequire)) 
{
+                // Bucket → local-hash parallelism upgrade (bucket-to-hash 
upgrade): the fragment
+                // has noticeably more instances than buckets-with-data (see
+                // AddLocalExchange.isBucketUpgradeEligible) and nothing above 
this join needs
+                // bucket alignment — re-distribute both sides by their 
distribute keys with
+                // LOCAL_EXECUTION_HASH_SHUFFLE so the join runs at full 
instance parallelism
+                // instead of being capped at bucket count.  The LE keys come 
from
+                // childrenDistributeExprLists (pairwise-aligned per side, a 
subset of the
+                // equi-join keys), so both sides keep hashing the same values 
and the
+                // per-instance build/probe pairing stays correct.
+                //
+                // requireSpecific (not requireHash) on purpose: the children's
+                // BUCKET_HASH_SHUFFLE output must NOT satisfy this require, 
otherwise no LE
+                // is inserted and the join stays bucket-capped.
+                //
+                // Mark direct children so a stacked bucket join below keeps 
its BUCKET
+                // requires: if it also upgraded, its LOCAL hash output (keyed 
by ITS join
+                // keys) would type-satisfy our 
requireSpecific(LOCAL_EXECUTION_HASH) and
+                // suppress the LE that re-aligns data to OUR keys → wrong 
results.
+                
translatorContext.setHasBucketUpgradedAncestor(children.get(0), true);

Review Comment:
   This marker only protects a direct stacked join child. If there is a 
distribution-preserving wrapper between bucket joins, for example `HashJoin -> 
SelectNode -> HashJoin`, the marker is attached to the wrapper, but 
`SelectNode.enforceAndDeriveLocalExchange` forwards the same 
`requireSpecific(LOCAL_EXECUTION_HASH_SHUFFLE)` to its child and returns the 
child's output type. The lower `HashJoinNode` is then unmarked, can upgrade, 
and returns `LOCAL_EXECUTION_HASH_SHUFFLE`; `RequireSpecific.satisfy` compares 
only the exchange type, so the upper join treats it as already aligned and 
skips the re-hash LE that should repartition on the upper join keys. When the 
inner and outer join keys differ, build/probe rows can be sliced by different 
keys and the join can miss matches. Please propagate the 
bucket-upgraded-ancestor state through transparent wrappers, or make the 
local-hash requirement key-aware, and add coverage for a wrapped stacked 
bucket-join shape.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to