cloud-fan commented on code in PR #55927:
URL: https://github.com/apache/spark/pull/55927#discussion_r3279795572


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -324,6 +333,49 @@ case class HashPartitioning(expressions: Seq[Expression], 
numPartitions: Int)
     newChildren: IndexedSeq[Expression]): HashPartitioning = copy(expressions 
= newChildren)
 }
 
+/**
+ * Represents a hash partitioning for equi-join inputs where rows with a NULL 
join key do not need
+ * to be co-located. Non-NULL join keys preserve the same partitioning 
contract as
+ * [[HashPartitioning]], while rows with any NULL join key may be spread 
across partitions. As a
+ * result, this partitioning intentionally does not satisfy a strict 
[[ClusteredDistribution]].
+ */
+case class NullAwareHashPartitioning(expressions: Seq[Expression], 
numPartitions: Int)
+  extends HashPartitioningLike {
+
+  override def satisfies0(required: Distribution): Boolean = {
+    (required match {
+      case UnspecifiedDistribution => true
+      case AllTuples => numPartitions == 1
+      case _ => false
+    }) || {
+      // Stateful operators require strict NULL-key co-location and therefore 
cannot consume
+      // null-aware hash partitioning as a compatible clustered layout.
+      required match {
+        case c @ ClusteredDistribution(
+            requiredClustering, requireAllClusterKeys, _, 
allowNullKeySpreading)
+            if allowNullKeySpreading =>
+          if (requireAllClusterKeys) {
+            c.areAllClusterKeysMatched(expressions)
+          } else {
+            expressions.forall(x => 
requiredClustering.exists(_.semanticEquals(x)))
+          }
+        case _ => false
+      }
+    }
+  }
+
+  override def createShuffleSpec(distribution: ClusteredDistribution): 
ShuffleSpec =
+    NullAwareHashShuffleSpec(this, distribution)
+
+  def partitionIdExpression: Expression = Pmod(
+    new CollationAwareMurmur3Hash(expressions), Literal(numPartitions)
+  )

Review Comment:
   After the single-eval refactor in `ShuffleExchangeExec` (the `case h: 
NullAwareHashPartitioning =>` branch in `getPartitionKeyExtractor`), this 
method is no longer called anywhere. `ShuffleExchangeExec` builds an equivalent 
`Pmod(CollationAwareMurmur3Hash(boundJoinKeys), Literal(n))` inline against the 
projected key row instead of calling `h.partitionIdExpression`. Grep confirms 
no external callers. Safe to delete the three-line def.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##########
@@ -2078,6 +2079,7 @@ class AdaptiveQueryExecSuite
 
     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
       SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      SQLConf.SHUFFLE_SPREAD_NULL_JOIN_KEYS_ENABLED.key -> "true",

Review Comment:
   Enabling `SHUFFLE_SPREAD_NULL_JOIN_KEYS_ENABLED` globally inside the 
`"SPARK-33551: Do not use AQE shuffle read for repartition"` test reshapes what 
that test verifies. The four `checkBHJ` / `checkSMJ` assertions below flip from 
`optimizeOutRepartition = true` to `false` precisely because the outer join's 
`NullAwareHashPartitioning` is intentionally not equivalent to the user's 
`repartition` `HashPartitioning` — i.e., the test no longer exercises the 
original SPARK-33551 regression scenario (optimize-out behavior on outer-join + 
repartition under the default flag).
   
   Two options:
   1. **Split**: keep the existing test with the flag off (preserving 
SPARK-33551 coverage), and add a new test asserting the flag-on inversion. This 
documents that the new feature deliberately changes the optimize-out outcome.
   2. **Branch in place**: run the same `checkBHJ` / `checkSMJ` calls twice 
within the test — once with the flag off (original assertions) and once with 
the flag on (new assertions).
   
   Either is fine; the current form silently retires SPARK-33551's regression 
coverage.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala:
##########
@@ -349,6 +351,8 @@ object ShuffleExchangeExec {
         // For HashPartitioning, the partitioning key is already a valid 
partition ID, as we use
         // `HashPartitioning.partitionIdExpression` to produce partitioning 
key.
         new PartitionIdPassthrough(n)
+      case NullAwareHashPartitioning(_, n) =>
+        new PartitionIdPassthrough(n)

Review Comment:
   Nit: the parallel `HashPartitioning` case immediately above carries the 
comment *"the partitioning key is already a valid partition ID, as we use 
`HashPartitioning.partitionIdExpression` to produce partitioning key."* Worth 
matching here so a reader who jumps to this case sees why 
`PartitionIdPassthrough` is the right partitioner. Something like:
   
   ```
   // The NullAware extractor below produces partition IDs directly:
   // `Pmod(hash, n)` for non-NULL keys, a round-robin counter for NULL keys.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to