cloud-fan commented on code in PR #55927:
URL: https://github.com/apache/spark/pull/55927#discussion_r3265910820


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -86,7 +86,8 @@ case class ClusteredDistribution(
     clustering: Seq[Expression],
     requireAllClusterKeys: Boolean = SQLConf.get.getConf(
       SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION),
-    requiredNumPartitions: Option[Int] = None) extends Distribution {
+    requiredNumPartitions: Option[Int] = None,
+    allowNullKeySpreading: Boolean = false) extends Distribution {

Review Comment:
   Worth a Scaladoc on this field describing the contract: it's a *permission*, 
not a requirement (an ordinary `HashPartitioning` still satisfies this 
distribution when the flag is `true`; the flag only weakens what the *default* 
partitioning produced by `createPartitioning` looks like). And it's the 
consumer-side knob — the partitioning-side marker (`NullAwareHashPartitioning` 
today, or a flag on `HashPartitioning` per the comment below) is what tells 
downstream operators they need to re-shuffle for strict `ClusteredDistribution`.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -345,6 +390,42 @@ case class CoalescedHashPartitioning(from: 
HashPartitioning, partitions: Seq[Coa
     copy(from = from.copy(expressions = newChildren))
 }
 
+case class CoalescedNullAwareHashPartitioning(

Review Comment:
   Missing Scaladoc here (and on `NullAwareHashShuffleSpec` below). 
`CoalescedHashPartitioning` documents what it represents — worth matching that 
here.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledJoin.scala:
##########
@@ -28,6 +29,15 @@ import 
org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Dist
 trait ShuffledJoin extends JoinCodegenSupport {
   def isSkewJoin: Boolean
 
+  private lazy val canSpreadNullJoinKeys: Boolean = {
+    // Null-safe equality usually rewrites to non-null shuffle keys. The 
NullType corner can still
+    // produce NULL shuffle keys, but shuffled join execution already treats 
those rows as
+    // unmatched, so spreading them does not change the result.
+    val isOuterJoin = joinType == LeftOuter || joinType == RightOuter || 
joinType == FullOuter
+    conf.getConf(SQLConf.SHUFFLE_SPREAD_NULL_JOIN_KEYS_ENABLED) &&
+      isOuterJoin
+  }

Review Comment:
   Two improvements on this gate:
   
   **(1) Static nullability check.** Outer joins on non-nullable keys (PK/FK / 
NOT-NULL columns / post-`IsNotNull` filtered keys) gain nothing from the 
null-aware path but still pay both the runtime per-row `anyNull` check and the 
downstream re-shuffle cost from `outputPartitioning` no longer satisfying 
strict `ClusteredDistribution`. The analyzer already tracks 
`Expression.nullable` — use it here to make the mechanism a no-op when there's 
no NULL to spread.
   
   **(2) Reframe the comment around the structural reason.** The current 
comment only addresses the `<=>` corner. The real "why this PR exists" story is 
the preserved-side / pushdown-asymmetry argument — worth leading with that, 
with the `<=>` and `NullType` notes as a corollary.
   
   ```suggestion
     private lazy val canSpreadNullJoinKeys: Boolean = {
       // NULL keys on the preserved side of an outer join must be emitted but 
can never
       // satisfy `a.k = b.k` under three-valued logic, so their reducer 
placement is a
       // pure layout choice. Inner joins don't have this problem because
       // InferFiltersFromConstraints pushes IsNotNull(key) to both sides; for 
outer joins
       // that pushdown is blocked on the preserved side(s) -- which is exactly 
where
       // NULL-key skew can land.
       //
       // For null-safe equality (`<=>`), ExtractEquiJoinKeys rewrites to 
(coalesce, isNull)
       // shuffle keys, which are non-null for any concrete type. The NullType 
corner can
       // still produce NULL shuffle keys, but shuffled join execution already 
treats those
       // rows as unmatched, so spreading them does not change the result.
       val isOuterJoin = joinType == LeftOuter || joinType == RightOuter || 
joinType == FullOuter
       conf.getConf(SQLConf.SHUFFLE_SPREAD_NULL_JOIN_KEYS_ENABLED) &&
         isOuterJoin &&
         (leftKeys.exists(_.nullable) || rightKeys.exists(_.nullable))
     }
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala:
##########
@@ -403,6 +405,24 @@ object ShuffleExchangeExec {
       case h: HashPartitioning =>
         val projection = UnsafeProjection.create(h.partitionIdExpression :: 
Nil, outputAttributes)
         row => projection(row).getInt(0)
+      case h: NullAwareHashPartitioning =>
+        val partitionIdProjection =
+          UnsafeProjection.create(h.partitionIdExpression :: Nil, 
outputAttributes)
+        val joinKeyProjection = UnsafeProjection.create(h.expressions, 
outputAttributes)
+        var nullKeyPartition =
+          new 
XORShiftRandom(TaskContext.get().partitionId()).nextInt(h.numPartitions)
+        row => {
+          val joinKeys = joinKeyProjection(row)
+          if (joinKeys.anyNull()) {
+            // NULL join keys cannot match under ordinary equi-join semantics. 
Spread them
+            // round-robin within each map task so identical rows do not 
collapse to one reducer.
+            val partition = nullKeyPartition
+            nullKeyPartition = (nullKeyPartition + 1) % h.numPartitions
+            partition
+          } else {
+            partitionIdProjection(row).getInt(0)
+          }
+        }

Review Comment:
   Join keys are evaluated twice for non-NULL rows on this path: once via 
`joinKeyProjection(row)` to call `anyNull()`, again via 
`partitionIdProjection(row).getInt(0)` which re-evaluates the same expressions 
to compute the hash. For most expression shapes that's a tight loop, but 
redundant.
   
   Could evaluate the keys once, check `anyNull` on the materialized row, then 
hash directly from that row.
   
   Combined with the static-nullability gate at 
`ShuffledJoin.canSpreadNullJoinKeys` (which skips this path entirely when keys 
are statically non-nullable), the residual overhead becomes "check the null 
bitset once per row when at least one key is nullable" — about as low as this 
gets without adaptive observation of actual NULL frequency.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -324,6 +329,46 @@ case class HashPartitioning(expressions: Seq[Expression], 
numPartitions: Int)
     newChildren: IndexedSeq[Expression]): HashPartitioning = copy(expressions 
= newChildren)
 }
 
+/**
+ * Represents a hash partitioning for equi-join inputs where rows with a NULL 
join key do not need
+ * to be co-located. Non-NULL join keys preserve the same partitioning 
contract as
+ * [[HashPartitioning]], while rows with any NULL join key may be spread 
across partitions.
+ */
+case class NullAwareHashPartitioning(expressions: Seq[Expression], 
numPartitions: Int)

Review Comment:
   **Design alternative worth considering: a `spreadNullKeys: Boolean = false` 
field on `HashPartitioning` instead of a parallel type hierarchy.**
   
   The marker this carries is one bit ("NULL keys may be spread, so I don't 
deliver strict same-key co-location"). Encoding it as a parallel type means 
duplicating `hashKeyPositions`, `canCreatePartitioning`, `createPartitioning`, 
`numPartitions`, and (modulo the helper just extracted) `isCompatibleWith` in 
`NullAwareHashShuffleSpec`, plus reproducing `CoalescedHashPartitioning` as 
`CoalescedNullAwareHashPartitioning`, plus a new arm in every dispatcher 
(`ShuffleExchangeExec.prepareShuffleDependency`'s `part` and 
`getPartitionKeyExtractor`, `AQEShuffleReadExec.outputPartitioning`).
   
   With a flag:
   - `HashPartitioning.satisfies0` only matches strict `ClusteredDistribution` 
when `!spreadNullKeys`, only matches `allowNullKeySpreading=true` distributions 
when `spreadNullKeys`.
   - `HashShuffleSpec` carries the flag; one extra clause in `isCompatibleWith`.
   - `CoalescedHashPartitioning` already wraps a `HashPartitioning` — it 
inherits the flag transparently. No new coalesced class.
   - Dispatchers branch on `h.spreadNullKeys` instead of branching on type, so 
every existing `case h: HashPartitioning =>` site (`BucketingUtils`, 
`V1Writes`, `EnsureRequirements`, `AQEUtils`, `basicPhysicalOperators`, etc.) 
keeps working unchanged.
   
   The one argument for distinct types is EXPLAIN-string visibility — a 
one-line `toString` fix on the flagged variant.
   
   Separately on this class's Scaladoc: worth calling out that 
`NullAwareHashPartitioning` intentionally does NOT satisfy a strict 
`ClusteredDistribution` (NULL clustering keys aren't co-located). That's the 
non-obvious downstream contract — it's what forces downstream `GROUP BY` / 
window / strict equi-join to re-shuffle.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala:
##########
@@ -345,4 +346,249 @@ class OuterJoinSuite extends SharedSparkSession with 
SQLTestData {
     val df2 = join("SHUFFLE_MERGE(t1)")
     checkAnswer(df1, identity, df2.collect().toSeq)
   }
+
+  test("ordinary outer equi-join spreads NULL keys in shuffle partitioning") {
+    val nullableLeft = Seq(
+      (Integer.valueOf(1), "left-1"),
+      (null.asInstanceOf[Integer], "left-null-1"),
+      (null.asInstanceOf[Integer], "left-null-2")).toDF("k", "lv")
+    val nullableRight = Seq(
+      (Integer.valueOf(1), "right-1"),
+      (null.asInstanceOf[Integer], "right-null")).toDF("k", "rv")
+    val joinCondition = (nullableLeft("k") === nullableRight("k")).expr
+    val join = Join(nullableLeft.logicalPlan, nullableRight.logicalPlan,
+      LeftOuter, Some(joinCondition), JoinHint.NONE)
+
+    ExtractEquiJoinKeys.unapply(join).foreach {

Review Comment:
   `ExtractEquiJoinKeys.unapply(join).foreach { case ... => withSQLConf { ... } 
}` silently passes if `unapply` returns `None` — all assertions live inside the 
foreach body, so a regression in `ExtractEquiJoinKeys` would make these tests 
report success without exercising anything. Prefer:
   
   ```scala
   val (_, leftKeys, rightKeys, boundCondition, _, _, _, _) =
     ExtractEquiJoinKeys.unapply(join).getOrElse(fail("Failed to extract 
equi-join keys"))
   withSQLConf(...) {
     ...
   }
   ```
   
   Applies to all six new tests in this file using this pattern.
   
   Separately: all six new tests use `SortMergeJoinExec`. 
`ShuffledHashJoinExec` also extends `ShuffledJoin` and supports `LeftOuter` / 
`RightOuter` / `FullOuter` (with the matching build side), so it picks up the 
same `canSpreadNullJoinKeys` behavior — worth at least one end-to-end test on 
that path too. And the `NullType` `<=>` corner case (which the safety argument 
for `<=>` rests on) isn't directly exercised by any of the new tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to