agrawaldevesh commented on a change in pull request #29104:
URL: https://github.com/apache/spark/pull/29104#discussion_r459804718



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -81,6 +81,34 @@ private[execution] sealed trait HashedRelation extends 
KnownSizeEstimation {
    */
   def asReadOnlyCopy(): HashedRelation
 
+
+  /**
+   * Normally HashedRelation is built from an Source (input: 
Iterator[InternalRow]).
+   * This indicates the original input is empty.
+   * Note that, the hashed relation can be empty despite the input being not 
empty,
+   * since the hashed relation skips over null keys.
+   */
+  var isOriginalInputEmpty: Boolean
+
+  def setOriginInputEmtpy(isOriginalInputEmpty: Boolean): HashedRelation = {

Review comment:
       oops Forgot to change the name of the setter.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -454,6 +491,48 @@ case class BroadcastHashJoinExec(
     val (matched, checkCondition, _) = getJoinCondition(ctx, input)
     val numOutput = metricTerm(ctx, "numOutputRows")
 
+    // fast stop if isOriginalInputEmpty = true, should accept all rows in 
streamedSide
+    if (broadcastRelation.value.isOriginalInputEmpty) {
+      return s"""
+                |// Anti Join isOriginalInputEmpty(true) accept all
+                |$numOutput.add(1);
+                |${consume(ctx, input)}
+          """.stripMargin
+    }
+
+    if (isNullAwareAntiJoin) {
+      if (broadcastRelation.value.allNullColumnKeyExistsInOriginalInput) {
+        return s"""
+                  |// NAAJ

Review comment:
       It would be awesome to verify using 
org.apache.spark.sql.execution.debug.DebugQuery#debugCodegen that this case 
does not generate an empty spin-loop -- ie a loop with no executable body. 

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -323,11 +374,18 @@ private[joins] object UnsafeHashedRelation {
     // Create a mapping of buildKeys -> rows
     val keyGenerator = UnsafeProjection.create(key)
     var numFields = 0
+    val numKeys = key.length
+    val isOriginalInputEmpty = !input.hasNext
+    var allNullColumnKeyExistsInOriginalInput: Boolean = false
     while (input.hasNext) {
       val row = input.next().asInstanceOf[UnsafeRow]
       numFields = row.numFields()
       val key = keyGenerator(row)
-      if (!key.anyNull) {
+      if ((0 until numKeys).forall(key.isNullAt)) {

Review comment:
       I think you should only do this check if isNullAware is true. Why pay 
the perf penalty of checking the row again otherwise ?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -323,11 +374,18 @@ private[joins] object UnsafeHashedRelation {
     // Create a mapping of buildKeys -> rows
     val keyGenerator = UnsafeProjection.create(key)
     var numFields = 0
+    val numKeys = key.length
+    val isOriginalInputEmpty = !input.hasNext
+    var allNullColumnKeyExistsInOriginalInput: Boolean = false
     while (input.hasNext) {
       val row = input.next().asInstanceOf[UnsafeRow]
       numFields = row.numFields()
       val key = keyGenerator(row)
-      if (!key.anyNull) {
+      if ((0 until numKeys).forall(key.isNullAt)) {
+        allNullColumnKeyExistsInOriginalInput = true
+      }
+
+      if (isNullAware || (!isNullAware && !key.anyNull)) {

Review comment:
       Can we simplify this to: `isNullAware || !key.anyNull` ? 

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -896,22 +967,32 @@ private[joins] object LongHashedRelation {
 
     // Create a mapping of key -> rows
     var numFields = 0
+    val isOriginalInputEmpty: Boolean = !input.hasNext
+    var allNullColumnKeyExistsInOriginalInput: Boolean = false
     while (input.hasNext) {
       val unsafeRow = input.next().asInstanceOf[UnsafeRow]
       numFields = unsafeRow.numFields()
       val rowKey = keyGenerator(unsafeRow)
       if (!rowKey.isNullAt(0)) {
+        // LongToUnsafeRowMap can't insert null key
         val key = rowKey.getLong(0)
         map.append(key, unsafeRow)
+      } else {
+        // LongHashedRelation is single-column key

Review comment:
       LongHashedRelation only stores single column keys.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to