924060929 commented on code in PR #64793:
URL: https://github.com/apache/doris/pull/64793#discussion_r3481432797
##########
fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java:
##########
@@ -344,31 +344,65 @@ public Pair<PlanNode, LocalExchangeType>
enforceAndDeriveLocalExchange(
// For a non-serial probe without the flag: propagate the probe's
distribution.
outputType = probePassthrough ? LocalExchangeType.PASSTHROUGH :
null;
} else if (isColocate() || isBucketShuffle()) {
- // Both probe and build sides require BUCKET_HASH_SHUFFLE: the
bucket distribution
- // must be preserved on both inputs. A serial child on either side
is handled the
- // same way (serial exchange returns NOOP → enforceRequire()
inserts the LE).
- probeSideRequire = LocalExchangeTypeRequire.requireBucketHash();
- // For BUCKET_SHUFFLE with serial build child: use
requireBucketHash() (not
- // requirePassToOne()). Unlike BROADCAST joins, BUCKET_SHUFFLE has
no shared
- // hash table mechanism — PASS_TO_ONE routes all data to task 0
while tasks 1..N-1
- // build empty hash tables, losing rows. BUCKET_HASH_SHUFFLE
correctly distributes
- // build data by bucket to match the probe side's bucket
distribution.
- // The serial exchange returns NOOP, so enforceRequire() will
insert a
- // BUCKET_HASH_SHUFFLE local exchange (with PASSTHROUGH fan-out
for heavy-ops
- // bottleneck avoidance).
- buildSideRequire = LocalExchangeTypeRequire.requireBucketHash();
- outputType = AddLocalExchange.resolveExchangeType(
- LocalExchangeTypeRequire.requireBucketHash());
+ if (canUpgradeBucketToLocalHash(translatorContext, parentRequire))
{
+ // Bucket → local-hash parallelism upgrade (bucket-to-hash
upgrade): the fragment
+ // has noticeably more instances than buckets-with-data (see
+ // AddLocalExchange.isBucketUpgradeEligible) and nothing above
this join needs
+ // bucket alignment — re-distribute both sides by their
distribute keys with
+ // LOCAL_EXECUTION_HASH_SHUFFLE so the join runs at full
instance parallelism
+ // instead of being capped at bucket count. The LE keys come
from
+ // childrenDistributeExprLists (pairwise-aligned per side, a
subset of the
+ // equi-join keys), so both sides keep hashing the same values
and the
+ // per-instance build/probe pairing stays correct.
+ //
+ // requireSpecific (not requireHash) on purpose: the children's
+ // BUCKET_HASH_SHUFFLE output must NOT satisfy this require,
otherwise no LE
+ // is inserted and the join stays bucket-capped.
+ //
+ // Mark direct children so a stacked bucket join below keeps
its BUCKET
+ // requires: if it also upgraded, its LOCAL hash output (keyed
by ITS join
+ // keys) would type-satisfy our
requireSpecific(LOCAL_EXECUTION_HASH) and
+ // suppress the LE that re-aligns data to OUR keys → wrong
results.
+
translatorContext.setHasBucketUpgradedAncestor(children.get(0), true);
Review Comment:
Thanks — I looked into this carefully. The marker-propagation gap you
spotted is real, but I do not think it can produce a wrong result, because the
scenario needs the inner and outer joins' **probe-side** keys to differ, and
for two stacked bucket-shuffle joins in the **same pooled fragment** they
cannot.
The fragment's distribution is the bucketed scan's (say `fact` bucketed by
`k`). Any bucket-shuffle join in that fragment must shuffle its other side to
match `fact`'s buckets, i.e. join on `fact.k`. So every stacked level's probe
side is distributed by `fact.k`. `EXPLAIN VERBOSE` on a 3-table stacked bucket
join confirms it:
```
6:VHASH JOIN join op: INNER JOIN(BUCKET_SHUFFLE) -- outer
| distribute expr lists: k[#13] -- probe side
| distribute expr lists: k[#3] -- build side (p2.k)
5:VHASH JOIN join op: INNER JOIN(BUCKET_SHUFFLE) -- inner
| distribute expr lists: k[#10] -- probe side
| distribute expr lists: k[#7] -- build side (p1.k)
```
Outer probe key `k[#13]` and inner probe key `k[#10]` are the same column
(`fact.k`) carried up under different slot ids — same values, same hash
distribution. So once the inner join upgrades, its
`LOCAL_EXECUTION_HASH_SHUFFLE(fact.k)` output is exactly what the outer probe
needs; the outer's re-align LE would re-hash `fact.k` by `fact.k`, i.e. it is
redundant. A transparent wrapper that lets that redundant LE be skipped
therefore cannot slice rows by a different key — they are already keyed by
`fact.k`.
The build sides do differ (`p1.k` vs `p2.k`), but each join's build arrives
via its own cross-fragment network exchange (independently bucket-shuffled to
`fact`'s buckets); there is no stacking and the marker does not cross fragment
boundaries, so each build is aligned on its own.
One caveat on the proposed fix: propagating the ancestor flag through the
wrapper would actually produce a worse plan — with the flag set, the lower join
claims NOOP and the LE then gets inserted at the *wrapper's* enforceRequire
using the wrapper's (empty) distribute exprs, i.e. a keyless LOCAL hash
exchange between wrapper and lower join, rather than the outer join re-aligning
on its own keys.
So I am treating the marker as defensive (the forced re-align is redundant
given the shared bucket key) and not changing it here. Happy to revisit if
there is a concrete shape where two stacked bucket-shuffle joins end up with
different probe-side keys — I could not construct one.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]