agrawaldevesh commented on a change in pull request #29104:
URL: https://github.com/apache/spark/pull/29104#discussion_r459570948



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -46,14 +46,26 @@ case class BroadcastHashJoinExec(
     buildSide: BuildSide,
     condition: Option[Expression],
     left: SparkPlan,
-    right: SparkPlan)
+    right: SparkPlan,
+    isNullAwareAntiJoin: Boolean = false)
   extends HashJoin with CodegenSupport {
 
+  if (isNullAwareAntiJoin) {
+    // TODO support multi column NULL-aware anti join in future.

Review comment:
       Perhaps this comment block can be moved to the 
ExtractSingleColumnNullAwareAntiJoin ? To explain why we only specialize for 
single columns.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -133,10 +145,39 @@ case class BroadcastHashJoinExec(
     val numOutputRows = longMetric("numOutputRows")
 
     val broadcastRelation = buildPlan.executeBroadcast[HashedRelation]()
-    streamedPlan.execute().mapPartitions { streamedIter =>
-      val hashed = broadcastRelation.value.asReadOnlyCopy()
-      
TaskContext.get().taskMetrics().incPeakExecutionMemory(hashed.estimatedSize)
-      join(streamedIter, hashed, numOutputRows)
+    if (isNullAwareAntiJoin) {
+      streamedPlan.execute().mapPartitionsInternal { streamedIter =>
+        val hashed = broadcastRelation.value.asReadOnlyCopy()
+        
TaskContext.get().taskMetrics().incPeakExecutionMemory(hashed.estimatedSize)
+        if (hashed.isOriginInputEmpty) {
+          streamedIter
+        } else if (hashed.allNullColumnKeyExistsInOriginInput) {
+          Iterator.empty
+        } else {
+          val keyGenerator = UnsafeProjection.create(
+            BindReferences.bindReferences[Expression](
+              leftKeys,
+              AttributeSeq(left.output))
+          )
+          streamedIter.filter(row => {
+            val lookupKey: UnsafeRow = keyGenerator(row)
+            if (lookupKey.allNull()) {

Review comment:
       nit: Should we add a comment here that lookupKey contains only a single 
column ? It will make understanding "allNull" easier. 

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -81,6 +81,32 @@ private[execution] sealed trait HashedRelation extends 
KnownSizeEstimation {
    */
   def asReadOnlyCopy(): HashedRelation
 
+
+  /**
+   * Normally HashedRelation is built from an Source (input: 
Iterator[InternalRow]).
+   * This indicates the origin input is empty.
+   * Note that, the hashed relation can be empty despite the input being not 
empty,
+   * since the hashed relation skips over null keys.
+   */
+  var isOriginInputEmpty: Boolean

Review comment:
       You mean `isOriginalInputEmpty` ?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -323,11 +371,19 @@ private[joins] object UnsafeHashedRelation {
     // Create a mapping of buildKeys -> rows
     val keyGenerator = UnsafeProjection.create(key)
     var numFields = 0
+    val numKeys = key.length
+    val isOriginInputEmpty = !input.hasNext
+    var allNullColumnKeyExistsInOriginInput: Boolean = false
     while (input.hasNext) {
       val row = input.next().asInstanceOf[UnsafeRow]
       numFields = row.numFields()
       val key = keyGenerator(row)
-      if (!key.anyNull) {
+      if ((0 until numKeys).forall(key.isNullAt)) {
+        allNullColumnKeyExistsInOriginInput = true
+      }
+
+      // TODO keep anyNull key for multi column NAAJ support in future

Review comment:
       I would say to remove all TODO's around multi-key. It is distracting.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -453,6 +494,47 @@ case class BroadcastHashJoinExec(
     val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
     val (matched, checkCondition, _) = getJoinCondition(ctx, input)
     val numOutput = metricTerm(ctx, "numOutputRows")
+    val isLongHashedRelation = 
broadcastRelation.value.isInstanceOf[LongHashedRelation]
+
+    // fast stop if isOriginInputEmpty = true
+    // whether isNullAwareAntiJoin is true or false
+    // should accept all rows in streamedSide
+    if (broadcastRelation.value.isOriginInputEmpty) {
+      return s"""
+                |// Common Anti Join isOriginInputEmpty(true) accept all
+                |$numOutput.add(1);
+                |${consume(ctx, input)}
+          """.stripMargin
+    }
+
+    if (isNullAwareAntiJoin) {
+      if (broadcastRelation.value.allNullColumnKeyExistsInOriginInput) {
+        return s"""
+                  |// NAAJ isOriginInputEmpty(false) anyNullKeyExists(true) 
reject all
+            """.stripMargin
+      } else {
+        val found = ctx.freshName("found")
+        return s"""
+                  |// NAAJ isOriginInputEmpty(false) 
allNullColumnKeyExistsInOriginInput(false)
+                  |boolean $found = false;
+                  |// generate join key for stream side
+                  |${keyEv.code}
+                  |if (${ if (isLongHashedRelation) s"$anyNull" else 
s"${keyEv.value}.allNull()"}) {

Review comment:
       I am finding the `allNull` / `anyNull` to be mixed here. Why is one 
branch of the code checking for `anyNull`, and the other `allNull` ?
   
   Is `allNull` in the else branch a holdover from "We may support multi-column 
one day" ? 

##########
File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
##########
@@ -591,6 +591,15 @@ public boolean anyNull() {
     return BitSetMethods.anySet(baseObject, baseOffset, bitSetWidthInBytes / 
8);
   }
 
+  public boolean allNull() {

Review comment:
       IMHO, lets keep the implementation very "Single key" focussed. So my 
understanding here is that this expression is only called for "Key Row". That 
"Key row" should have only a single column, Right ? ie, can `numFields > 1` ?
   
   If not, perhaps we can reword this and/or inline it. It's only used in 
two/three places so perhaps okay to duplicate given that we are modifying a 
very core class `UnsafeRow` here.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
##########
@@ -186,6 +186,13 @@ trait HashJoin extends BaseJoinExec {
   private def antiJoin(
       streamIter: Iterator[InternalRow],
       hashedRelation: HashedRelation): Iterator[InternalRow] = {
+    // fast stop if isOriginInputEmpty = true
+    // whether isNullAwareAntiJoin is true or false

Review comment:
       Do we need to highlight NAAJ here or can the comment on line 190 be 
dropped ?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -453,6 +494,47 @@ case class BroadcastHashJoinExec(
     val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
     val (matched, checkCondition, _) = getJoinCondition(ctx, input)
     val numOutput = metricTerm(ctx, "numOutputRows")
+    val isLongHashedRelation = 
broadcastRelation.value.isInstanceOf[LongHashedRelation]
+
+    // fast stop if isOriginInputEmpty = true
+    // whether isNullAwareAntiJoin is true or false
+    // should accept all rows in streamedSide
+    if (broadcastRelation.value.isOriginInputEmpty) {
+      return s"""
+                |// Common Anti Join isOriginInputEmpty(true) accept all
+                |$numOutput.add(1);
+                |${consume(ctx, input)}
+          """.stripMargin
+    }
+
+    if (isNullAwareAntiJoin) {
+      if (broadcastRelation.value.allNullColumnKeyExistsInOriginInput) {
+        return s"""
+                  |// NAAJ isOriginInputEmpty(false) anyNullKeyExists(true) 
reject all

Review comment:
       Can you paste the gencode in this condition ? 
   
   It appears that we would run an empty loop here: 
   ```
   while (stream side has rows) {
   // drop the row
   }
   ```
   
   Should we instead like omit the whole loop ? Or will that be too complex ? 
How is an "Always false" filter handled in spark ?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -96,7 +122,8 @@ private[execution] object HashedRelation {
       input: Iterator[InternalRow],
       key: Seq[Expression],
       sizeEstimate: Int = 64,
-      taskMemoryManager: TaskMemoryManager = null): HashedRelation = {
+      taskMemoryManager: TaskMemoryManager = null,
+      isNullAware: Boolean = false): HashedRelation = {

Review comment:
       How about renaming this to `storeRowsWithNullKeys` ?
   
   Or adding a comment about what it truly means, either here or on line 386 
below.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -453,6 +494,47 @@ case class BroadcastHashJoinExec(
     val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
     val (matched, checkCondition, _) = getJoinCondition(ctx, input)
     val numOutput = metricTerm(ctx, "numOutputRows")
+    val isLongHashedRelation = 
broadcastRelation.value.isInstanceOf[LongHashedRelation]
+
+    // fast stop if isOriginInputEmpty = true
+    // whether isNullAwareAntiJoin is true or false

Review comment:
       Same comment as above, is this NAAJ highlight needed here ?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -133,10 +145,39 @@ case class BroadcastHashJoinExec(
     val numOutputRows = longMetric("numOutputRows")
 
     val broadcastRelation = buildPlan.executeBroadcast[HashedRelation]()
-    streamedPlan.execute().mapPartitions { streamedIter =>
-      val hashed = broadcastRelation.value.asReadOnlyCopy()
-      
TaskContext.get().taskMetrics().incPeakExecutionMemory(hashed.estimatedSize)
-      join(streamedIter, hashed, numOutputRows)
+    if (isNullAwareAntiJoin) {
+      streamedPlan.execute().mapPartitionsInternal { streamedIter =>
+        val hashed = broadcastRelation.value.asReadOnlyCopy()
+        
TaskContext.get().taskMetrics().incPeakExecutionMemory(hashed.estimatedSize)
+        if (hashed.isOriginInputEmpty) {
+          streamedIter
+        } else if (hashed.allNullColumnKeyExistsInOriginInput) {
+          Iterator.empty
+        } else {
+          val keyGenerator = UnsafeProjection.create(
+            BindReferences.bindReferences[Expression](
+              leftKeys,
+              AttributeSeq(left.output))
+          )
+          streamedIter.filter(row => {
+            val lookupKey: UnsafeRow = keyGenerator(row)
+            if (lookupKey.allNull()) {
+              false
+            } else {
+              // in isNullAware mode
+              // UnsafeHashedRelation include anyNull key, if match, dropped 
row
+              // Same as LongHashedRelation where lookupKey isNotNullAt(0)

Review comment:
       This comment is a bit confusing, is it possible to reword it ? Would a 
simpler explanation be: "Anti Join: Drop the row on the streamed side if it is 
a match on the build"

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -453,6 +494,47 @@ case class BroadcastHashJoinExec(
     val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
     val (matched, checkCondition, _) = getJoinCondition(ctx, input)
     val numOutput = metricTerm(ctx, "numOutputRows")
+    val isLongHashedRelation = 
broadcastRelation.value.isInstanceOf[LongHashedRelation]

Review comment:
       Can you move this check to in the branch it is used ? 

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -81,6 +81,32 @@ private[execution] sealed trait HashedRelation extends 
KnownSizeEstimation {
    */
   def asReadOnlyCopy(): HashedRelation
 
+
+  /**
+   * Normally HashedRelation is built from an Source (input: 
Iterator[InternalRow]).
+   * This indicates the origin input is empty.
+   * Note that, the hashed relation can be empty despite the input being not 
empty,
+   * since the hashed relation skips over null keys.
+   */
+  var isOriginInputEmpty: Boolean
+  def setOriginInputEmtpy(isOriginInputEmpty: Boolean): HashedRelation = {
+    this.isOriginInputEmpty = isOriginInputEmpty
+    this
+  }
+
+  /**
+   * It's only used in null aware anti join.
+   * This will be set true if Source (input: Iterator[InternalRow]) contains a 
key,
+   * which is allNullColumn.
+   */
+  var allNullColumnKeyExistsInOriginInput: Boolean
+  def setAllNullColumnKeyExistsInOriginInput(
+      allNullColumnKeyExistsInOriginInput: Boolean): HashedRelation = {
+    this.allNullColumnKeyExistsInOriginInput =

Review comment:
       nit: Should this line be split into two ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to