sunchao commented on code in PR #55927:
URL: https://github.com/apache/spark/pull/55927#discussion_r3268206193


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala:
##########
@@ -403,6 +405,24 @@ object ShuffleExchangeExec {
       case h: HashPartitioning =>
         val projection = UnsafeProjection.create(h.partitionIdExpression :: 
Nil, outputAttributes)
         row => projection(row).getInt(0)
+      case h: NullAwareHashPartitioning =>
+        val partitionIdProjection =
+          UnsafeProjection.create(h.partitionIdExpression :: Nil, 
outputAttributes)
+        val joinKeyProjection = UnsafeProjection.create(h.expressions, 
outputAttributes)
+        var nullKeyPartition =
+          new 
XORShiftRandom(TaskContext.get().partitionId()).nextInt(h.numPartitions)
+        row => {
+          val joinKeys = joinKeyProjection(row)
+          if (joinKeys.anyNull()) {
+            // NULL join keys cannot match under ordinary equi-join semantics. 
Spread them
+            // round-robin within each map task so identical rows do not 
collapse to one reducer.
+            val partition = nullKeyPartition
+            nullKeyPartition = (nullKeyPartition + 1) % h.numPartitions
+            partition
+          } else {
+            partitionIdProjection(row).getInt(0)
+          }
+        }

Review Comment:
   I updated it to only evaluate the join keys once but the logic becomes more 
complicated. Please take another look!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to