peter-toth commented on code in PR #55927:
URL: https://github.com/apache/spark/pull/55927#discussion_r3265009056
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -345,6 +390,42 @@ case class CoalescedHashPartitioning(from:
HashPartitioning, partitions: Seq[Coa
copy(from = from.copy(expressions = newChildren))
}
+case class CoalescedNullAwareHashPartitioning(
+ from: NullAwareHashPartitioning,
+ partitions: Seq[CoalescedBoundary]) extends HashPartitioningLike {
+
+ override def expressions: Seq[Expression] = from.expressions
+
+ override def satisfies0(required: Distribution): Boolean = {
Review Comment:
This body is identical to `NullAwareHashPartitioning.satisfies0` at line 340
— same outer `UnspecifiedDistribution`/`AllTuples`/`_ => false` match, same
`ClusteredDistribution` inner match guarded on `allowNullKeySpreading`, same
`requireAllClusterKeys` branching. This is the same kind of duplication
addressed elsewhere in this PR by extracting
`HashShuffleSpecCompatibility.isCompatible` (lines 944-955).
Two cleaner shapes:
- Lift the inner block to a private helper, e.g. `private def
nullAwareSatisfies0(exprs, n, required)` shared by both classes.
- Or just delegate: since boundaries don't change satisfaction semantics for
the `allowNullKeySpreading` contract,
`CoalescedNullAwareHashPartitioning.satisfies0(required)` is essentially
`from.satisfies0(required)` *except* for the `AllTuples` case where
`numPartitions` differs — that single divergence is easy to handle inline.
Side note: both overrides skip the `StatefulOpClusteredDistribution` case
that `HashPartitioningLike.satisfies0` handles. Currently unreachable
(streaming joins use `StatefulOpClusteredDistribution`, not
`ClusteredDistribution`, so they never opt into `allowNullKeySpreading`), but a
one-line comment that the omission is deliberate would help the next reader.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledJoin.scala:
##########
@@ -28,6 +29,15 @@ import
org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Dist
trait ShuffledJoin extends JoinCodegenSupport {
def isSkewJoin: Boolean
+ private lazy val canSpreadNullJoinKeys: Boolean = {
Review Comment:
The gate opts in based on join type alone, ignoring whether the shuffle keys
are actually nullable. For an outer join on non-nullable keys (e.g. `f.k = d.k`
where both `k` are NOT NULL — common after a NOT NULL filter or on
schema-non-null columns), the new path:
1. Adds a per-row `joinKeys.anyNull()` check in
`ShuffleExchangeExec.getPartitionKeyExtractor` that always returns false.
2. Produces `NullAwareHashPartitioning` as the join's output partitioning,
which doesn't satisfy ordinary `ClusteredDistribution`. The
`AdaptiveQueryExecSuite` diff in this PR (`optimizeOutRepartition = false`
cases around lines 2079-2127) shows the cost — a downstream
`df.repartition($"b")` is no longer collapsed even though the underlying
NULL-skew problem can never have existed.
Two options worth considering:
- Gate also on `leftKeys.exists(_.nullable) || rightKeys.exists(_.nullable)`
so a non-nullable-key outer join falls back to plain `HashPartitioning`.
- If the simpler shape is preferred, add a sentence to the lazy val's
comment explicitly calling out the trade-off (skew reduction vs. potentially
unnecessary downstream re-shuffle / lost `optimizeOutRepartition`) so future
readers don't read it as an oversight.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala:
##########
@@ -59,6 +59,27 @@ class ExchangeSuite extends SharedSparkSession {
)
}
+ test("null-aware hash shuffle spreads identical NULL keys from one mapper") {
+ val input =
Seq.fill(64)(Tuple1(null.asInstanceOf[Integer])).toDF("k").coalesce(1)
+ val plan = input.queryExecution.executedPlan
+ val exchange = ShuffleExchangeExec(NullAwareHashPartitioning(plan.output,
4), plan)
+ val partitionSizes = exchange.execute().collectPartitions().map(_.length)
+
+ assert(partitionSizes.sorted === Array(16, 16, 16, 16))
+ }
+
+ test("null-aware hash shuffle preserves retry determinism with local
sorting") {
+ withSQLConf(SQLConf.SORT_BEFORE_REPARTITION.key -> "true") {
Review Comment:
Retry-determinism for the new partitioning has two paths and only one is
covered: with `SORT_BEFORE_REPARTITION = true` the local sort makes row order
deterministic; with `false`, the code in `ShuffleExchangeExec` (line ~497-498)
instead relies on `isOrderSensitive = true` to propagate the parent's
determinism level. Only the sorted half is exercised here.
A second test running the same
`ShuffleExchangeExec(NullAwareHashPartitioning, ...)` with
`SORT_BEFORE_REPARTITION = false` and asserting that `outputDeterministicLevel`
inherits the parent's level would catch a future regression that drops
`isNullAwareHashPartitioning` from the `isOrderSensitive` clause (which would
silently make retries unsafe for the unsorted path). Mirroring the structure of
the existing test is enough — no new infrastructure required.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]