c21 commented on a change in pull request #29342:
URL: https://github.com/apache/spark/pull/29342#discussion_r469569295



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
##########
@@ -1188,4 +1188,42 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
         classOf[BroadcastNestedLoopJoinExec]))
     }
   }
+
+  test("SPARK-32399: Full outer shuffled hash join") {

Review comment:
       @agrawaldevesh - sure. I will follow your compromised suggestion here. 
Please see the previous comment for code coverage screenshot. thanks.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -71,8 +85,213 @@ case class ShuffledHashJoinExec(
     val numOutputRows = longMetric("numOutputRows")
     streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, 
buildIter) =>
       val hashed = buildHashedRelation(buildIter)
-      join(streamIter, hashed, numOutputRows)
+      joinType match {
+        case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+        case _ => join(streamIter, hashed, numOutputRows)
+      }
+    }
+  }
+
+  private def fullOuterJoin(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      numOutputRows: SQLMetric): Iterator[InternalRow] = {
+    val joinKeys = streamSideKeyGenerator()
+    val joinRow = new JoinedRow
+    val (joinRowWithStream, joinRowWithBuild) = {
+      buildSide match {
+        case BuildLeft => (joinRow.withRight _, joinRow.withLeft _)
+        case BuildRight => (joinRow.withLeft _, joinRow.withRight _)
+      }
+    }
+    val buildNullRow = new GenericInternalRow(buildOutput.length)
+    val streamNullRow = new GenericInternalRow(streamedOutput.length)
+    val streamNullJoinRow = new JoinedRow
+    val streamNullJoinRowWithBuild = {
+      buildSide match {
+        case BuildLeft =>
+          streamNullJoinRow.withRight(streamNullRow)
+          streamNullJoinRow.withLeft _
+        case BuildRight =>
+          streamNullJoinRow.withLeft(streamNullRow)
+          streamNullJoinRow.withRight _
+      }
+    }
+
+    val iter = if (hashedRelation.keyIsUnique) {
+      fullOuterJoinWithUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRow, streamNullJoinRow,
+        joinRowWithStream, joinRowWithBuild, streamNullJoinRowWithBuild, 
buildNullRow,
+        streamNullRow)
+    } else {
+      fullOuterJoinWithNonUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRow,
+        streamNullJoinRow, joinRowWithStream, joinRowWithBuild, 
streamNullJoinRowWithBuild,
+        buildNullRow, streamNullRow)
     }
+
+    val resultProj = UnsafeProjection.create(output, output)
+    iter.map { r =>
+      numOutputRows += 1
+      resultProj(r)
+    }
+  }
+
+  /**
+   * Full outer shuffled hash join with unique join keys:
+   * 1. Process rows from stream side by looking up hash relation.
+   *    Mark the matched rows from build side be looked up.
+   *    A `BitSet` is used to track matched rows with key index.
+   * 2. Process rows from build side by iterating hash relation.
+   *    Filter out rows from build side being matched already,
+   *    by checking key index from `BitSet`.
+   */
+  private def fullOuterJoinWithUniqueKey(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      joinKeys: UnsafeProjection,
+      joinRow: JoinedRow,
+      streamNullJoinRow: JoinedRow,
+      joinRowWithStream: InternalRow => JoinedRow,
+      joinRowWithBuild: InternalRow => JoinedRow,
+      streamNullJoinRowWithBuild: InternalRow => JoinedRow,
+      buildNullRow: GenericInternalRow,
+      streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+    val matchedKeys = new BitSet(hashedRelation.numKeysIndex)
+
+    // Process stream side with looking up hash relation
+    val streamResultIter = streamIter.map { srow =>
+      joinRowWithStream(srow)
+      val keys = joinKeys(srow)
+      if (keys.anyNull) {
+        joinRowWithBuild(buildNullRow)
+      } else {
+        val matched = hashedRelation.getWithKeyIndex(keys)
+        if (matched != null) {
+          val (keyIndex, buildIter) = (matched._1, matched._2)
+          val buildRow = buildIter.next
+          if (boundCondition(joinRowWithBuild(buildRow))) {
+            matchedKeys.set(keyIndex)
+            joinRow

Review comment:
       @cloud-fan - sure. updated.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
##########
@@ -1188,4 +1188,53 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
         classOf[BroadcastNestedLoopJoinExec]))
     }
   }
+
+  test("SPARK-32399: Full outer shuffled hash join") {
+    val inputDFs = Seq(
+      // Test unique join key
+      (spark.range(10).selectExpr("id as k1"),
+        spark.range(30).selectExpr("id as k2"),
+        $"k1" === $"k2"),
+      // Test non-unique join key
+      (spark.range(10).selectExpr("id % 5 as k1"),
+        spark.range(30).selectExpr("id % 5 as k2"),
+        $"k1" === $"k2"),
+      // Test string join key
+      (spark.range(10).selectExpr("cast(id * 3 as string) as k1"),
+        spark.range(30).selectExpr("cast(id as string) as k2"),
+        $"k1" === $"k2"),
+      // Test build side at right
+      (spark.range(30).selectExpr("cast(id / 3 as string) as k1"),
+        spark.range(10).selectExpr("cast(id as string) as k2"),
+        $"k1" === $"k2"),
+      // Test NULL join key
+      (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr("value 
as k1"),
+        spark.range(30).map(i => if (i % 4 == 0) i else 
null).selectExpr("value as k2"),
+        $"k1" === $"k2"),
+      // Test multiple join keys
+      (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr(
+        "value as k1", "cast(value % 5 as short) as k2", "cast(value * 3 as 
long) as k3"),
+        spark.range(30).map(i => if (i % 4 == 0) i else null).selectExpr(
+          "value as k4", "cast(value % 5 as short) as k5", "cast(value * 3 as 
long) as k6"),
+        $"k1" === $"k4" && $"k2" === $"k5" && $"k3" === $"k6")
+    )
+    inputDFs.foreach { case (df1, df2, joinExprs) =>
+      withSQLConf(
+        SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80",

Review comment:
       > You have used the word 'careful tuning' several times but I am not 
fully following out of my stupidity: Do you mean that to say that the broadcast 
threshold should be chosen to be larger than the build side size, so as to NOT 
trigger a BHJ ?
   
   @agrawaldevesh - shuffled hash join depends on these two configs (broadcast 
join threshold and number of shuffle partitions), to be enabled. Planner side 
of logic is 
[here](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala#L347-L355).
 Only when one side of join is smaller than (broadcast_join_threshold * 
number_of_shuffle_partitions), the shuffled hash join can be triggered (another 
requirement is another side should be 3x larger than this side, code is 
[here](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala#L365)).
   
   Updated with one comment for this.
   
   
   > Btw, is it worth testing with shuffle partitions = 1 ?
   
   I don't think so, as if there's only one shuffle partition, we should use 
broadcast hash join instead.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -179,6 +201,49 @@ private[joins] class UnsafeHashedRelation(
     }
   }
 
+  override def getWithKeyIndex(key: InternalRow): (Int, Iterator[InternalRow]) 
= {

Review comment:
       @cloud-fan - sure, added. Was not adding that for simplicity of code, 
but added now for better performance per request.

##########
File path: core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
##########
@@ -601,6 +656,14 @@ public boolean isDefined() {
       return isDefined;
     }
 
+    /**
+     * Returns index for key.
+     */
+    public int getKeyIndex() {
+      assert (isDefined);

Review comment:
       @cloud-fan - I am following other code in this file. I prefer to keep it 
as it is for consistency in this file, unless we want to change it strongly.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -66,6 +66,23 @@ private[execution] sealed trait HashedRelation extends 
KnownSizeEstimation {
     throw new UnsupportedOperationException
   }
 
+  /**
+   * Returns key index and matched rows.
+   *
+   * Returns null if there is no matched rows.
+   */
+  def getWithKeyIndex(key: InternalRow): (Int, Iterator[InternalRow])
+
+  /**
+   * Returns an iterator for keys index and rows of InternalRow type.
+   */
+  def valuesWithKeyIndex(): Iterator[(Int, InternalRow)]

Review comment:
       @cloud-fan - good suggestion. Updated.

##########
File path: core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
##########
@@ -428,6 +428,61 @@ public MapIterator destructiveIterator() {
     return new MapIterator(numValues, new Location(), true);
   }
 
+  /**
+   * Iterator for the entries of this map. This is to first iterate over key 
index array
+   * `longArray` then accessing values in `dataPages`. NOTE: this is different 
from `MapIterator`
+   * in the sense that key index is preserved here (See `UnsafeHashedRelation` 
for example of usage).
+   */
+  public final class MapIteratorWithKeyIndex implements Iterator<Location> {
+
+    private int keyIndex = 0;
+    private int numRecords;
+    private final Location loc;
+
+    private MapIteratorWithKeyIndex(int numRecords, Location loc) {
+      this.numRecords = numRecords;
+      this.loc = loc;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return numRecords > 0;
+    }
+
+    @Override
+    public Location next() {
+      if (!loc.isDefined() || !loc.nextValue()) {
+        while (longArray.get(keyIndex * 2) == 0) {
+          keyIndex++;
+        }
+        loc.with(keyIndex, (int) longArray.get(keyIndex * 2 + 1), true);
+        keyIndex++;
+      }
+      numRecords--;
+      return loc;
+    }
+  }
+
+  /**
+   * Returns an iterator for iterating over the entries of this map,
+   * by first iterating over the key index inside hash map's `longArray`.
+   *
+   * For efficiency, all calls to `next()` will return the same {@link 
Location} object.
+   *
+   * The returned iterator is NOT thread-safe. If the map is modified while 
iterating over it,
+   * the behavior of the returned iterator is undefined.
+   */
+  public MapIteratorWithKeyIndex iteratorWithKeyIndex() {
+    return new MapIteratorWithKeyIndex(numValues, new Location());
+  }
+
+  /**
+   * Number of allowed keys index.

Review comment:
       @cloud-fan - yes, updated with a better name.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
##########
@@ -1188,4 +1188,53 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
         classOf[BroadcastNestedLoopJoinExec]))
     }
   }
+
+  test("SPARK-32399: Full outer shuffled hash join") {
+    val inputDFs = Seq(
+      // Test unique join key
+      (spark.range(10).selectExpr("id as k1"),
+        spark.range(30).selectExpr("id as k2"),
+        $"k1" === $"k2"),
+      // Test non-unique join key
+      (spark.range(10).selectExpr("id % 5 as k1"),
+        spark.range(30).selectExpr("id % 5 as k2"),
+        $"k1" === $"k2"),
+      // Test string join key
+      (spark.range(10).selectExpr("cast(id * 3 as string) as k1"),
+        spark.range(30).selectExpr("cast(id as string) as k2"),
+        $"k1" === $"k2"),
+      // Test build side at right
+      (spark.range(30).selectExpr("cast(id / 3 as string) as k1"),
+        spark.range(10).selectExpr("cast(id as string) as k2"),
+        $"k1" === $"k2"),
+      // Test NULL join key
+      (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr("value 
as k1"),
+        spark.range(30).map(i => if (i % 4 == 0) i else 
null).selectExpr("value as k2"),
+        $"k1" === $"k2"),
+      // Test multiple join keys
+      (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr(

Review comment:
       @agrawaldevesh - sure. updated.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -71,8 +85,213 @@ case class ShuffledHashJoinExec(
     val numOutputRows = longMetric("numOutputRows")
     streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, 
buildIter) =>
       val hashed = buildHashedRelation(buildIter)
-      join(streamIter, hashed, numOutputRows)
+      joinType match {
+        case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+        case _ => join(streamIter, hashed, numOutputRows)
+      }
+    }
+  }
+
+  private def fullOuterJoin(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      numOutputRows: SQLMetric): Iterator[InternalRow] = {
+    val joinKeys = streamSideKeyGenerator()
+    val joinRow = new JoinedRow
+    val (joinRowWithStream, joinRowWithBuild) = {
+      buildSide match {
+        case BuildLeft => (joinRow.withRight _, joinRow.withLeft _)
+        case BuildRight => (joinRow.withLeft _, joinRow.withRight _)
+      }
+    }
+    val buildNullRow = new GenericInternalRow(buildOutput.length)
+    val streamNullRow = new GenericInternalRow(streamedOutput.length)
+    val streamNullJoinRow = new JoinedRow
+    val streamNullJoinRowWithBuild = {
+      buildSide match {
+        case BuildLeft =>
+          streamNullJoinRow.withRight(streamNullRow)
+          streamNullJoinRow.withLeft _
+        case BuildRight =>
+          streamNullJoinRow.withLeft(streamNullRow)
+          streamNullJoinRow.withRight _
+      }
+    }
+
+    val iter = if (hashedRelation.keyIsUnique) {
+      fullOuterJoinWithUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRow, streamNullJoinRow,
+        joinRowWithStream, joinRowWithBuild, streamNullJoinRowWithBuild, 
buildNullRow,
+        streamNullRow)
+    } else {
+      fullOuterJoinWithNonUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRow,
+        streamNullJoinRow, joinRowWithStream, joinRowWithBuild, 
streamNullJoinRowWithBuild,
+        buildNullRow, streamNullRow)
     }
+
+    val resultProj = UnsafeProjection.create(output, output)
+    iter.map { r =>
+      numOutputRows += 1
+      resultProj(r)
+    }
+  }
+
+  /**
+   * Full outer shuffled hash join with unique join keys:
+   * 1. Process rows from stream side by looking up hash relation.
+   *    Mark the matched rows from build side be looked up.
+   *    A `BitSet` is used to track matched rows with key index.
+   * 2. Process rows from build side by iterating hash relation.
+   *    Filter out rows from build side being matched already,
+   *    by checking key index from `BitSet`.
+   */
+  private def fullOuterJoinWithUniqueKey(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      joinKeys: UnsafeProjection,
+      joinRow: JoinedRow,
+      streamNullJoinRow: JoinedRow,
+      joinRowWithStream: InternalRow => JoinedRow,
+      joinRowWithBuild: InternalRow => JoinedRow,
+      streamNullJoinRowWithBuild: InternalRow => JoinedRow,
+      buildNullRow: GenericInternalRow,
+      streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+    val matchedKeys = new BitSet(hashedRelation.numKeysIndex)
+
+    // Process stream side with looking up hash relation
+    val streamResultIter = streamIter.map { srow =>
+      joinRowWithStream(srow)
+      val keys = joinKeys(srow)
+      if (keys.anyNull) {
+        joinRowWithBuild(buildNullRow)
+      } else {
+        val matched = hashedRelation.getWithKeyIndex(keys)
+        if (matched != null) {
+          val (keyIndex, buildIter) = (matched._1, matched._2)
+          val buildRow = buildIter.next
+          if (boundCondition(joinRowWithBuild(buildRow))) {
+            matchedKeys.set(keyIndex)
+            joinRow

Review comment:
       @cloud-fan - sure, updated.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -71,8 +85,213 @@ case class ShuffledHashJoinExec(
     val numOutputRows = longMetric("numOutputRows")
     streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, 
buildIter) =>
       val hashed = buildHashedRelation(buildIter)
-      join(streamIter, hashed, numOutputRows)
+      joinType match {
+        case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+        case _ => join(streamIter, hashed, numOutputRows)
+      }
+    }
+  }
+
+  private def fullOuterJoin(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      numOutputRows: SQLMetric): Iterator[InternalRow] = {
+    val joinKeys = streamSideKeyGenerator()
+    val joinRow = new JoinedRow
+    val (joinRowWithStream, joinRowWithBuild) = {
+      buildSide match {
+        case BuildLeft => (joinRow.withRight _, joinRow.withLeft _)
+        case BuildRight => (joinRow.withLeft _, joinRow.withRight _)
+      }
+    }
+    val buildNullRow = new GenericInternalRow(buildOutput.length)
+    val streamNullRow = new GenericInternalRow(streamedOutput.length)
+    val streamNullJoinRow = new JoinedRow
+    val streamNullJoinRowWithBuild = {
+      buildSide match {
+        case BuildLeft =>
+          streamNullJoinRow.withRight(streamNullRow)
+          streamNullJoinRow.withLeft _
+        case BuildRight =>
+          streamNullJoinRow.withLeft(streamNullRow)
+          streamNullJoinRow.withRight _
+      }
+    }
+
+    val iter = if (hashedRelation.keyIsUnique) {
+      fullOuterJoinWithUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRow, streamNullJoinRow,
+        joinRowWithStream, joinRowWithBuild, streamNullJoinRowWithBuild, 
buildNullRow,
+        streamNullRow)
+    } else {
+      fullOuterJoinWithNonUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRow,
+        streamNullJoinRow, joinRowWithStream, joinRowWithBuild, 
streamNullJoinRowWithBuild,
+        buildNullRow, streamNullRow)
     }
+
+    val resultProj = UnsafeProjection.create(output, output)
+    iter.map { r =>
+      numOutputRows += 1
+      resultProj(r)
+    }
+  }
+
+  /**
+   * Full outer shuffled hash join with unique join keys:
+   * 1. Process rows from stream side by looking up hash relation.
+   *    Mark the matched rows from build side be looked up.
+   *    A `BitSet` is used to track matched rows with key index.
+   * 2. Process rows from build side by iterating hash relation.
+   *    Filter out rows from build side being matched already,
+   *    by checking key index from `BitSet`.
+   */
+  private def fullOuterJoinWithUniqueKey(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      joinKeys: UnsafeProjection,
+      joinRow: JoinedRow,
+      streamNullJoinRow: JoinedRow,
+      joinRowWithStream: InternalRow => JoinedRow,
+      joinRowWithBuild: InternalRow => JoinedRow,
+      streamNullJoinRowWithBuild: InternalRow => JoinedRow,
+      buildNullRow: GenericInternalRow,
+      streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+    val matchedKeys = new BitSet(hashedRelation.numKeysIndex)
+
+    // Process stream side with looking up hash relation
+    val streamResultIter = streamIter.map { srow =>
+      joinRowWithStream(srow)
+      val keys = joinKeys(srow)
+      if (keys.anyNull) {
+        joinRowWithBuild(buildNullRow)
+      } else {
+        val matched = hashedRelation.getWithKeyIndex(keys)
+        if (matched != null) {
+          val (keyIndex, buildIter) = (matched._1, matched._2)
+          val buildRow = buildIter.next
+          if (boundCondition(joinRowWithBuild(buildRow))) {
+            matchedKeys.set(keyIndex)
+            joinRow
+          } else {
+            joinRowWithBuild(buildNullRow)
+          }
+        } else {
+          joinRowWithBuild(buildNullRow)
+        }
+      }
+    }
+
+    // Process build side with filtering out rows looked up and
+    // passed join condition already
+    val buildResultIter = hashedRelation.valuesWithKeyIndex().flatMap {
+      case (keyIndex, brow) =>
+        val isMatched = matchedKeys.get(keyIndex)
+        if (!isMatched) {
+          streamNullJoinRowWithBuild(brow)
+          Some(streamNullJoinRow)

Review comment:
       @cloud-fan - sure, updated.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -71,8 +85,213 @@ case class ShuffledHashJoinExec(
     val numOutputRows = longMetric("numOutputRows")
     streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, 
buildIter) =>
       val hashed = buildHashedRelation(buildIter)
-      join(streamIter, hashed, numOutputRows)
+      joinType match {
+        case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+        case _ => join(streamIter, hashed, numOutputRows)
+      }
+    }
+  }
+
+  private def fullOuterJoin(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      numOutputRows: SQLMetric): Iterator[InternalRow] = {
+    val joinKeys = streamSideKeyGenerator()
+    val joinRow = new JoinedRow
+    val (joinRowWithStream, joinRowWithBuild) = {
+      buildSide match {
+        case BuildLeft => (joinRow.withRight _, joinRow.withLeft _)
+        case BuildRight => (joinRow.withLeft _, joinRow.withRight _)
+      }
+    }
+    val buildNullRow = new GenericInternalRow(buildOutput.length)
+    val streamNullRow = new GenericInternalRow(streamedOutput.length)
+    val streamNullJoinRow = new JoinedRow
+    val streamNullJoinRowWithBuild = {
+      buildSide match {
+        case BuildLeft =>
+          streamNullJoinRow.withRight(streamNullRow)
+          streamNullJoinRow.withLeft _
+        case BuildRight =>
+          streamNullJoinRow.withLeft(streamNullRow)
+          streamNullJoinRow.withRight _
+      }
+    }
+
+    val iter = if (hashedRelation.keyIsUnique) {
+      fullOuterJoinWithUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRow, streamNullJoinRow,
+        joinRowWithStream, joinRowWithBuild, streamNullJoinRowWithBuild, 
buildNullRow,
+        streamNullRow)
+    } else {
+      fullOuterJoinWithNonUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRow,
+        streamNullJoinRow, joinRowWithStream, joinRowWithBuild, 
streamNullJoinRowWithBuild,
+        buildNullRow, streamNullRow)
     }
+
+    val resultProj = UnsafeProjection.create(output, output)
+    iter.map { r =>
+      numOutputRows += 1
+      resultProj(r)
+    }
+  }
+
+  /**
+   * Full outer shuffled hash join with unique join keys:
+   * 1. Process rows from stream side by looking up hash relation.
+   *    Mark the matched rows from build side be looked up.
+   *    A `BitSet` is used to track matched rows with key index.
+   * 2. Process rows from build side by iterating hash relation.
+   *    Filter out rows from build side being matched already,
+   *    by checking key index from `BitSet`.
+   */
+  private def fullOuterJoinWithUniqueKey(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      joinKeys: UnsafeProjection,
+      joinRow: JoinedRow,
+      streamNullJoinRow: JoinedRow,
+      joinRowWithStream: InternalRow => JoinedRow,
+      joinRowWithBuild: InternalRow => JoinedRow,
+      streamNullJoinRowWithBuild: InternalRow => JoinedRow,
+      buildNullRow: GenericInternalRow,
+      streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+    val matchedKeys = new BitSet(hashedRelation.numKeysIndex)
+
+    // Process stream side with looking up hash relation
+    val streamResultIter = streamIter.map { srow =>
+      joinRowWithStream(srow)
+      val keys = joinKeys(srow)
+      if (keys.anyNull) {
+        joinRowWithBuild(buildNullRow)
+      } else {
+        val matched = hashedRelation.getWithKeyIndex(keys)
+        if (matched != null) {
+          val (keyIndex, buildIter) = (matched._1, matched._2)
+          val buildRow = buildIter.next
+          if (boundCondition(joinRowWithBuild(buildRow))) {
+            matchedKeys.set(keyIndex)
+            joinRow
+          } else {
+            joinRowWithBuild(buildNullRow)
+          }
+        } else {
+          joinRowWithBuild(buildNullRow)
+        }
+      }
+    }
+
+    // Process build side with filtering out rows looked up and
+    // passed join condition already
+    val buildResultIter = hashedRelation.valuesWithKeyIndex().flatMap {
+      case (keyIndex, brow) =>
+        val isMatched = matchedKeys.get(keyIndex)
+        if (!isMatched) {
+          streamNullJoinRowWithBuild(brow)
+          Some(streamNullJoinRow)

Review comment:
       @cloud-fan - sure. updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to