cloud-fan commented on code in PR #55927:
URL: https://github.com/apache/spark/pull/55927#discussion_r3279795572
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -324,6 +333,49 @@ case class HashPartitioning(expressions: Seq[Expression],
numPartitions: Int)
newChildren: IndexedSeq[Expression]): HashPartitioning = copy(expressions
= newChildren)
}
+/**
+ * Represents a hash partitioning for equi-join inputs where rows with a NULL
join key do not need
+ * to be co-located. Non-NULL join keys preserve the same partitioning
contract as
+ * [[HashPartitioning]], while rows with any NULL join key may be spread
across partitions. As a
+ * result, this partitioning intentionally does not satisfy a strict
[[ClusteredDistribution]].
+ */
+case class NullAwareHashPartitioning(expressions: Seq[Expression],
numPartitions: Int)
+ extends HashPartitioningLike {
+
+ override def satisfies0(required: Distribution): Boolean = {
+ (required match {
+ case UnspecifiedDistribution => true
+ case AllTuples => numPartitions == 1
+ case _ => false
+ }) || {
+ // Stateful operators require strict NULL-key co-location and therefore
cannot consume
+ // null-aware hash partitioning as a compatible clustered layout.
+ required match {
+ case c @ ClusteredDistribution(
+ requiredClustering, requireAllClusterKeys, _,
allowNullKeySpreading)
+ if allowNullKeySpreading =>
+ if (requireAllClusterKeys) {
+ c.areAllClusterKeysMatched(expressions)
+ } else {
+ expressions.forall(x =>
requiredClustering.exists(_.semanticEquals(x)))
+ }
+ case _ => false
+ }
+ }
+ }
+
+ override def createShuffleSpec(distribution: ClusteredDistribution):
ShuffleSpec =
+ NullAwareHashShuffleSpec(this, distribution)
+
+ def partitionIdExpression: Expression = Pmod(
+ new CollationAwareMurmur3Hash(expressions), Literal(numPartitions)
+ )
Review Comment:
After the single-eval refactor in `ShuffleExchangeExec` (the `case h:
NullAwareHashPartitioning =>` branch in `getPartitionKeyExtractor`), this
method is no longer called anywhere. `ShuffleExchangeExec` builds an equivalent
`Pmod(CollationAwareMurmur3Hash(boundJoinKeys), Literal(n))` inline against the
projected key row instead of calling `h.partitionIdExpression`. Grep confirms
no external callers. Safe to delete the three-line def.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##########
@@ -2078,6 +2079,7 @@ class AdaptiveQueryExecSuite
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+ SQLConf.SHUFFLE_SPREAD_NULL_JOIN_KEYS_ENABLED.key -> "true",
Review Comment:
Enabling `SHUFFLE_SPREAD_NULL_JOIN_KEYS_ENABLED` globally inside the
`"SPARK-33551: Do not use AQE shuffle read for repartition"` test reshapes what
that test verifies. The four `checkBHJ` / `checkSMJ` assertions below flip from
`optimizeOutRepartition = true` to `false` precisely because the outer join's
`NullAwareHashPartitioning` is intentionally not equivalent to the user's
`repartition` `HashPartitioning` — i.e., the test no longer exercises the
original SPARK-33551 regression scenario (optimize-out behavior on outer-join +
repartition under the default flag).
Two options:
1. **Split**: keep the existing test with the flag off (preserving
SPARK-33551 coverage), and add a new test asserting the flag-on inversion. This
documents that the new feature deliberately changes the optimize-out outcome.
2. **Branch in place**: run the same `checkBHJ` / `checkSMJ` calls twice
within the test — once with the flag off (original assertions) and once with
the flag on (new assertions).
Either is fine; the current form silently retires SPARK-33551's regression
coverage.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala:
##########
@@ -349,6 +351,8 @@ object ShuffleExchangeExec {
// For HashPartitioning, the partitioning key is already a valid
partition ID, as we use
// `HashPartitioning.partitionIdExpression` to produce partitioning
key.
new PartitionIdPassthrough(n)
+ case NullAwareHashPartitioning(_, n) =>
+ new PartitionIdPassthrough(n)
Review Comment:
Nit: the parallel `HashPartitioning` case immediately above carries the
comment *"the partitioning key is already a valid partition ID, as we use
`HashPartitioning.partitionIdExpression` to produce partitioning key."* Worth
matching here so a reader who jumps to this case sees why
`PartitionIdPassthrough` is the right partitioner. Something like:
```
// The NullAware extractor below produces partition IDs directly:
// `Pmod(hash, n)` for non-NULL keys, a round-robin counter for NULL keys.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]