c21 commented on a change in pull request #29342:
URL: https://github.com/apache/spark/pull/29342#discussion_r467157300



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -97,7 +102,9 @@ private[execution] object HashedRelation {
       key: Seq[Expression],
       sizeEstimate: Int = 64,
       taskMemoryManager: TaskMemoryManager = null,
-      isNullAware: Boolean = false): HashedRelation = {
+      isNullAware: Boolean = false,
+      isLookupAware: Boolean = false,
+      value: Option[Seq[Expression]] = None): HashedRelation = {

Review comment:
       @maropu - updated.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -58,8 +65,19 @@ case class ShuffledHashJoinExec(
     val buildTime = longMetric("buildTime")
     val start = System.nanoTime()
     val context = TaskContext.get()
+
+    val (isLookupAware, value) =

Review comment:
       @maropu - updated. Just curious - could you help point me where 
specifically this format rule is? I am seeing in-consistency in codebase for 
this, thanks.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledJoin.scala
##########
@@ -40,4 +41,24 @@ trait ShuffledJoin extends BaseJoinExec {
       throw new IllegalArgumentException(
         s"ShuffledJoin should not take $x as the JoinType")
   }
+
+  override def output: Seq[Attribute] = {
+    joinType match {
+      case _: InnerLike =>
+        left.output ++ right.output
+      case LeftOuter =>
+        left.output ++ right.output.map(_.withNullability(true))
+      case RightOuter =>
+        left.output.map(_.withNullability(true)) ++ right.output
+      case FullOuter =>
+        (left.output ++ right.output).map(_.withNullability(true))
+      case j: ExistenceJoin =>
+        left.output :+ j.exists
+      case LeftExistence(_) =>
+        left.output
+      case x =>
+        throw new IllegalArgumentException(
+          s"ShuffledJoin not take $x as the JoinType")

Review comment:
       @maropu - updated.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -327,23 +353,48 @@ private[joins] object UnsafeHashedRelation {
     // Create a mapping of buildKeys -> rows
     val keyGenerator = UnsafeProjection.create(key)
     var numFields = 0
-    while (input.hasNext) {
-      val row = input.next().asInstanceOf[UnsafeRow]
-      numFields = row.numFields()
-      val key = keyGenerator(row)
-      if (!key.anyNull) {
+
+    if (isLookupAware) {
+      // Add one extra boolean value at the end as part of the row,
+      // to track the information that whether the corresponding key
+      // has been looked up or not. See `ShuffledHashJoin.fullOuterJoin` for 
example of usage.
+      val valueGenerator = UnsafeProjection.create(value.get :+ Literal(false))
+
+      while (input.hasNext) {
+        val row = input.next().asInstanceOf[UnsafeRow]
+        numFields = row.numFields() + 1
+        val key = keyGenerator(row)
+        val value = valueGenerator(row)
         val loc = binaryMap.lookup(key.getBaseObject, key.getBaseOffset, 
key.getSizeInBytes)
         val success = loc.append(
           key.getBaseObject, key.getBaseOffset, key.getSizeInBytes,
-          row.getBaseObject, row.getBaseOffset, row.getSizeInBytes)
+          value.getBaseObject, value.getBaseOffset, value.getSizeInBytes)
         if (!success) {
           binaryMap.free()
           // scalastyle:off throwerror
           throw new SparkOutOfMemoryError("There is not enough memory to build 
hash map")
           // scalastyle:on throwerror
         }
-      } else if (isNullAware) {
-        return EmptyHashedRelationWithAllNullKeys
+      }
+    } else {
+      while (input.hasNext) {
+        val row = input.next().asInstanceOf[UnsafeRow]
+        numFields = row.numFields()
+        val key = keyGenerator(row)
+        if (!key.anyNull) {
+          val loc = binaryMap.lookup(key.getBaseObject, key.getBaseOffset, 
key.getSizeInBytes)
+          val success = loc.append(
+            key.getBaseObject, key.getBaseOffset, key.getSizeInBytes,
+            row.getBaseObject, row.getBaseOffset, row.getSizeInBytes)
+          if (!success) {
+            binaryMap.free()
+            // scalastyle:off throwerror
+            throw new SparkOutOfMemoryError("There is not enough memory to 
build hash map")
+            // scalastyle:on throwerror
+          }
+        } else if (isNullAware) {
+          return EmptyHashedRelationWithAllNullKeys
+        }

Review comment:
       @maropu - thanks, it looks better, updated.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -305,6 +312,23 @@ private[joins] class UnsafeHashedRelation(
   override def read(kryo: Kryo, in: Input): Unit = Utils.tryOrIOException {
     read(() => in.readInt(), () => in.readLong(), in.readBytes)
   }
+
+  override def values(): Iterator[InternalRow] = {
+    val iter = binaryMap.iterator()
+
+    new Iterator[InternalRow] {
+      override def hasNext: Boolean = iter.hasNext
+
+      override def next(): InternalRow = {
+        if (!hasNext) {
+          throw new NoSuchElementException("End of the iterator")
+        }
+        val loc = iter.next()
+        resultRow.pointTo(loc.getValueBase, loc.getValueOffset, 
loc.getValueLength)
+        resultRow

Review comment:
       @maropu - updated.

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
##########
@@ -260,8 +260,8 @@ trait JoinSelectionHelper {
       canBuildLocalHashMapBySize(right, conf) && muchSmaller(right, left)
     }
     getBuildSide(
-      canBuildLeft(joinType) && buildLeft,
-      canBuildRight(joinType) && buildRight,
+      canBuildShuffledHashJoinLeft(joinType) && buildLeft,

Review comment:
       @maropu - `autoBroadcastJoinThreshold` has nothing to do with 
`canBuildShuffledHashJoinLeft/Right` and `canBuildBroadcastLeft/Right`. I am 
just refactoring the decision related to join type here. 
`autoBroadcastJoinThreshold` is used in `canBroadcastBySize` and 
`canBuildLocalHashMapBySize` which I didn't touch. IMO it would be risky to 
introduce a new config for SHJ here as users would have existing pipelines to 
be carefully tuning `autoBroadcastJoinThreshold` to enable SHJ.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -97,7 +102,9 @@ private[execution] object HashedRelation {
       key: Seq[Expression],
       sizeEstimate: Int = 64,
       taskMemoryManager: TaskMemoryManager = null,
-      isNullAware: Boolean = false): HashedRelation = {
+      isNullAware: Boolean = false,
+      isLookupAware: Boolean = false,

Review comment:
       @agrawaldevesh - added a `@param` documentation to explain a bit. 
Honestly I couldn't figure a better name besides `isLookupAware`, as I feel if 
a hash map is aware of looking up, which can mean that when it's looked up, it 
will take some action, e.g. here track the row has been looked up. Please 
suggest a better name if any in your mind, thanks.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -97,7 +102,9 @@ private[execution] object HashedRelation {
       key: Seq[Expression],
       sizeEstimate: Int = 64,
       taskMemoryManager: TaskMemoryManager = null,
-      isNullAware: Boolean = false): HashedRelation = {
+      isNullAware: Boolean = false,
+      isLookupAware: Boolean = false,
+      value: Option[Seq[Expression]] = None): HashedRelation = {

Review comment:
       @agrawaldevesh - updated.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -110,10 +117,10 @@ private[execution] object HashedRelation {
 
     if (isNullAware && !input.hasNext) {
       EmptyHashedRelation
-    } else if (key.length == 1 && key.head.dataType == LongType) {
+    } else if (key.length == 1 && key.head.dataType == LongType && 
!isLookupAware) {

Review comment:
       @maropu - we cannot, as we need to insert NULL key inside 
`HashedRelation` for full outer join. Updated with one comment.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -71,8 +89,134 @@ case class ShuffledHashJoinExec(
     val numOutputRows = longMetric("numOutputRows")
     streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, 
buildIter) =>
       val hashed = buildHashedRelation(buildIter)
-      join(streamIter, hashed, numOutputRows)
+      joinType match {
+        case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+        case _ => join(streamIter, hashed, numOutputRows)
+      }
+    }
+  }
+
+  /**
+   * Full outer shuffled hash join has three steps:
+   * 1. Construct hash relation from build side,
+   *    with extra boolean value at the end of row to track look up information
+   *    (done in `buildHashedRelation`).
+   * 2. Process rows from stream side by looking up hash relation,
+   *    and mark the matched rows from build side be looked up.
+   * 3. Process rows from build side by iterating hash relation,
+   *    and filter out rows from build side being looked up already.
+   */
+  private def fullOuterJoin(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      numOutputRows: SQLMetric): Iterator[InternalRow] = {
+    abstract class HashJoinedRow extends JoinedRow {

Review comment:
       @maropu - updated.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -71,8 +89,134 @@ case class ShuffledHashJoinExec(
     val numOutputRows = longMetric("numOutputRows")
     streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, 
buildIter) =>
       val hashed = buildHashedRelation(buildIter)
-      join(streamIter, hashed, numOutputRows)
+      joinType match {
+        case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+        case _ => join(streamIter, hashed, numOutputRows)
+      }
+    }
+  }
+
+  /**
+   * Full outer shuffled hash join has three steps:
+   * 1. Construct hash relation from build side,
+   *    with extra boolean value at the end of row to track look up information
+   *    (done in `buildHashedRelation`).
+   * 2. Process rows from stream side by looking up hash relation,
+   *    and mark the matched rows from build side be looked up.
+   * 3. Process rows from build side by iterating hash relation,
+   *    and filter out rows from build side being looked up already.
+   */
+  private def fullOuterJoin(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      numOutputRows: SQLMetric): Iterator[InternalRow] = {
+    abstract class HashJoinedRow extends JoinedRow {
+      /** Updates this JoinedRow by updating its stream side row. Returns 
itself. */
+      def withStream(newStream: InternalRow): JoinedRow
+
+      /** Updates this JoinedRow by updating its build side row. Returns 
itself. */
+      def withBuild(newBuild: InternalRow): JoinedRow
     }
+    val joinRow: HashJoinedRow = buildSide match {
+      case BuildLeft =>
+        new HashJoinedRow {
+          override def withStream(newStream: InternalRow): JoinedRow = 
withRight(newStream)
+          override def withBuild(newBuild: InternalRow): JoinedRow = 
withLeft(newBuild)
+        }
+      case BuildRight =>
+        new HashJoinedRow {
+          override def withStream(newStream: InternalRow): JoinedRow = 
withLeft(newStream)
+          override def withBuild(newBuild: InternalRow): JoinedRow = 
withRight(newBuild)
+        }
+    }
+    val joinKeys = streamSideKeyGenerator()
+    val buildRowGenerator = UnsafeProjection.create(buildOutput, buildOutput)
+    val buildNullRow = new GenericInternalRow(buildOutput.length)
+    val streamNullRow = new GenericInternalRow(streamedOutput.length)
+
+    def markRowLookedUp(row: UnsafeRow): Unit =
+      row.setBoolean(row.numFields() - 1, true)
+
+    // Process stream side with looking up hash relation
+    val streamResultIter =
+      if (hashedRelation.keyIsUnique) {
+        streamIter.map { srow =>
+          joinRow.withStream(srow)
+          val keys = joinKeys(srow)
+          if (keys.anyNull) {
+            joinRow.withBuild(buildNullRow)
+          } else {
+            val matched = hashedRelation.getValue(keys)
+            if (matched != null) {
+              val buildRow = buildRowGenerator(matched)
+              if (boundCondition(joinRow.withBuild(buildRow))) {
+                markRowLookedUp(matched.asInstanceOf[UnsafeRow])
+                joinRow
+              } else {
+                joinRow.withBuild(buildNullRow)
+              }
+            } else {
+              joinRow.withBuild(buildNullRow)
+            }
+          }
+        }
+      } else {
+        streamIter.flatMap { srow =>
+          joinRow.withStream(srow)
+          val keys = joinKeys(srow)
+          if (keys.anyNull) {
+            Iterator.single(joinRow.withBuild(buildNullRow))
+          } else {
+            val buildIter = hashedRelation.get(keys)
+            new RowIterator {
+              private var found = false
+              override def advanceNext(): Boolean = {
+                while (buildIter != null && buildIter.hasNext) {
+                  val matched = buildIter.next()
+                  val buildRow = buildRowGenerator(matched)
+                  if (boundCondition(joinRow.withBuild(buildRow))) {
+                    markRowLookedUp(matched.asInstanceOf[UnsafeRow])
+                    found = true
+                    return true
+                  }
+                }
+                if (!found) {
+                  joinRow.withBuild(buildNullRow)
+                  found = true
+                  return true
+                }
+                false
+              }
+              override def getRow: InternalRow = joinRow
+            }.toScala
+          }
+        }
+      }
+
+    // Process build side with filtering out rows looked up already
+    val buildResultIter = hashedRelation.values().flatMap { brow =>
+      val unsafebrow = brow.asInstanceOf[UnsafeRow]
+      val isLookup = unsafebrow.getBoolean(unsafebrow.numFields() - 1)
+      if (!isLookup) {
+        val buildRow = buildRowGenerator(unsafebrow)
+        joinRow.withBuild(buildRow)
+        joinRow.withStream(streamNullRow)
+        Some(joinRow)
+      } else {
+        None
+      }
+    }
+
+    val resultProj = UnsafeProjection.create(output, output)
+    (streamResultIter ++ buildResultIter).map { r =>
+      numOutputRows += 1
+      resultProj(r)
+    }
+  }
+
+  // TODO: support full outer shuffled hash join code-gen

Review comment:
       @agrawaldevesh - updated.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -885,6 +936,12 @@ class LongHashedRelation(
    * Returns an iterator for keys of InternalRow type.
    */
   override def keys(): Iterator[InternalRow] = map.keys()
+
+  override def values(): Iterator[InternalRow] = {

Review comment:
       @agrawaldevesh - "value" here means the value of the hash-table. 
Actually I don't use `value` for extra bit at the end of row in this PR. I use 
`value` either to be expressions of original value before creating a hash-table 
(`UnsafeHasedRelation.apply()`), or here to be the values of hash-table.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -71,8 +89,134 @@ case class ShuffledHashJoinExec(
     val numOutputRows = longMetric("numOutputRows")
     streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, 
buildIter) =>
       val hashed = buildHashedRelation(buildIter)
-      join(streamIter, hashed, numOutputRows)
+      joinType match {
+        case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+        case _ => join(streamIter, hashed, numOutputRows)
+      }
+    }
+  }
+
+  /**
+   * Full outer shuffled hash join has three steps:
+   * 1. Construct hash relation from build side,
+   *    with extra boolean value at the end of row to track look up information
+   *    (done in `buildHashedRelation`).
+   * 2. Process rows from stream side by looking up hash relation,
+   *    and mark the matched rows from build side be looked up.
+   * 3. Process rows from build side by iterating hash relation,
+   *    and filter out rows from build side being looked up already.
+   */
+  private def fullOuterJoin(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      numOutputRows: SQLMetric): Iterator[InternalRow] = {
+    abstract class HashJoinedRow extends JoinedRow {
+      /** Updates this JoinedRow by updating its stream side row. Returns 
itself. */
+      def withStream(newStream: InternalRow): JoinedRow
+
+      /** Updates this JoinedRow by updating its build side row. Returns 
itself. */
+      def withBuild(newBuild: InternalRow): JoinedRow
     }
+    val joinRow: HashJoinedRow = buildSide match {
+      case BuildLeft =>
+        new HashJoinedRow {
+          override def withStream(newStream: InternalRow): JoinedRow = 
withRight(newStream)
+          override def withBuild(newBuild: InternalRow): JoinedRow = 
withLeft(newBuild)
+        }
+      case BuildRight =>
+        new HashJoinedRow {
+          override def withStream(newStream: InternalRow): JoinedRow = 
withLeft(newStream)
+          override def withBuild(newBuild: InternalRow): JoinedRow = 
withRight(newBuild)
+        }
+    }
+    val joinKeys = streamSideKeyGenerator()
+    val buildRowGenerator = UnsafeProjection.create(buildOutput, buildOutput)
+    val buildNullRow = new GenericInternalRow(buildOutput.length)
+    val streamNullRow = new GenericInternalRow(streamedOutput.length)
+
+    def markRowLookedUp(row: UnsafeRow): Unit =
+      row.setBoolean(row.numFields() - 1, true)
+
+    // Process stream side with looking up hash relation
+    val streamResultIter =
+      if (hashedRelation.keyIsUnique) {
+        streamIter.map { srow =>
+          joinRow.withStream(srow)
+          val keys = joinKeys(srow)
+          if (keys.anyNull) {
+            joinRow.withBuild(buildNullRow)
+          } else {
+            val matched = hashedRelation.getValue(keys)
+            if (matched != null) {
+              val buildRow = buildRowGenerator(matched)
+              if (boundCondition(joinRow.withBuild(buildRow))) {
+                markRowLookedUp(matched.asInstanceOf[UnsafeRow])

Review comment:
       @agrawaldevesh - yes I agree the marking does not have to be in hash 
table, and we can have a separate bitset to keep this information to save 
space. To make the bitset efficient, we need to tie the implementation of 
bitset with `BytesToBytesMap`, and manage it with off heap memory manager. When 
writing the PR, I first checked if there's a ready to use data structure in the 
code base but couldn't find one. That involves much more code change and review 
cycle, and I would like to start with a simpler solution as full out shuffled 
hash join is a new feature, and we can evolve the storage optimization if 
needed in the future.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -71,8 +89,134 @@ case class ShuffledHashJoinExec(
     val numOutputRows = longMetric("numOutputRows")
     streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, 
buildIter) =>
       val hashed = buildHashedRelation(buildIter)
-      join(streamIter, hashed, numOutputRows)
+      joinType match {
+        case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+        case _ => join(streamIter, hashed, numOutputRows)
+      }
+    }
+  }
+
+  /**
+   * Full outer shuffled hash join has three steps:
+   * 1. Construct hash relation from build side,
+   *    with extra boolean value at the end of row to track look up information
+   *    (done in `buildHashedRelation`).
+   * 2. Process rows from stream side by looking up hash relation,
+   *    and mark the matched rows from build side be looked up.
+   * 3. Process rows from build side by iterating hash relation,
+   *    and filter out rows from build side being looked up already.
+   */
+  private def fullOuterJoin(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      numOutputRows: SQLMetric): Iterator[InternalRow] = {
+    abstract class HashJoinedRow extends JoinedRow {
+      /** Updates this JoinedRow by updating its stream side row. Returns 
itself. */
+      def withStream(newStream: InternalRow): JoinedRow
+
+      /** Updates this JoinedRow by updating its build side row. Returns 
itself. */
+      def withBuild(newBuild: InternalRow): JoinedRow
     }
+    val joinRow: HashJoinedRow = buildSide match {
+      case BuildLeft =>
+        new HashJoinedRow {
+          override def withStream(newStream: InternalRow): JoinedRow = 
withRight(newStream)
+          override def withBuild(newBuild: InternalRow): JoinedRow = 
withLeft(newBuild)
+        }
+      case BuildRight =>
+        new HashJoinedRow {
+          override def withStream(newStream: InternalRow): JoinedRow = 
withLeft(newStream)
+          override def withBuild(newBuild: InternalRow): JoinedRow = 
withRight(newBuild)
+        }
+    }
+    val joinKeys = streamSideKeyGenerator()
+    val buildRowGenerator = UnsafeProjection.create(buildOutput, buildOutput)
+    val buildNullRow = new GenericInternalRow(buildOutput.length)
+    val streamNullRow = new GenericInternalRow(streamedOutput.length)
+
+    def markRowLookedUp(row: UnsafeRow): Unit =
+      row.setBoolean(row.numFields() - 1, true)
+
+    // Process stream side with looking up hash relation
+    val streamResultIter =
+      if (hashedRelation.keyIsUnique) {
+        streamIter.map { srow =>
+          joinRow.withStream(srow)
+          val keys = joinKeys(srow)
+          if (keys.anyNull) {
+            joinRow.withBuild(buildNullRow)
+          } else {
+            val matched = hashedRelation.getValue(keys)
+            if (matched != null) {
+              val buildRow = buildRowGenerator(matched)
+              if (boundCondition(joinRow.withBuild(buildRow))) {
+                markRowLookedUp(matched.asInstanceOf[UnsafeRow])
+                joinRow
+              } else {
+                joinRow.withBuild(buildNullRow)
+              }
+            } else {
+              joinRow.withBuild(buildNullRow)
+            }
+          }
+        }
+      } else {
+        streamIter.flatMap { srow =>
+          joinRow.withStream(srow)
+          val keys = joinKeys(srow)
+          if (keys.anyNull) {
+            Iterator.single(joinRow.withBuild(buildNullRow))
+          } else {
+            val buildIter = hashedRelation.get(keys)
+            new RowIterator {
+              private var found = false
+              override def advanceNext(): Boolean = {
+                while (buildIter != null && buildIter.hasNext) {
+                  val matched = buildIter.next()
+                  val buildRow = buildRowGenerator(matched)
+                  if (boundCondition(joinRow.withBuild(buildRow))) {
+                    markRowLookedUp(matched.asInstanceOf[UnsafeRow])
+                    found = true
+                    return true
+                  }
+                }
+                if (!found) {
+                  joinRow.withBuild(buildNullRow)
+                  found = true
+                  return true
+                }
+                false
+              }
+              override def getRow: InternalRow = joinRow
+            }.toScala
+          }
+        }
+      }
+
+    // Process build side with filtering out rows looked up already
+    val buildResultIter = hashedRelation.values().flatMap { brow =>

Review comment:
       @agrawaldevesh - normally I would change, but I need a `buildRow` before 
in Line 201, and `brow` here is somewhat consistency with `srow` in other 
places of codebase. Could we leave it as it is? Thanks.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -71,8 +89,134 @@ case class ShuffledHashJoinExec(
     val numOutputRows = longMetric("numOutputRows")
     streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, 
buildIter) =>
       val hashed = buildHashedRelation(buildIter)
-      join(streamIter, hashed, numOutputRows)
+      joinType match {
+        case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)

Review comment:
       @agrawaldevesh, @maropu - The feature is essentially disabled by 
default, as shuffled hash join is disabled by default by [config 
`spark.sql.join.preferSortMergeJoin=true`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala#L213).
 So I don't think we need a separate config to disable this feature. In 
addition, set -1 to `spark.sql.autoBroadcastJoinThreshold` would [disable 
BHJ](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala#L278)
 but not SHJ. [SHJ needs the config to be carefully tuned to be enabled, but 
not 
-1](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala#L354).
 So when user explicitly enables shuffled hash join by (1).setting 
`spark.sql.join.preferSortMergeJoin`=false, and (2).carefully tuning 
`spark.sql.autoBroadcastJoinThreshold`, they should probably
  have a clear mind what they are doing.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -76,6 +76,11 @@ private[execution] sealed trait HashedRelation extends 
KnownSizeEstimation {
    */
   def keys(): Iterator[InternalRow]
 
+  /**
+   * Returns an iterator for values of InternalRow type.
+   */
+  def values(): Iterator[InternalRow]

Review comment:
       @agrawaldevesh - this is to return all `values` of a key-value pair 
hashed relation.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -314,7 +338,9 @@ private[joins] object UnsafeHashedRelation {
       key: Seq[Expression],
       sizeEstimate: Int,
       taskMemoryManager: TaskMemoryManager,
-      isNullAware: Boolean = false): HashedRelation = {
+      isNullAware: Boolean = false,

Review comment:
       @agrawaldevesh - updated with a check to throw exception in this case.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -314,7 +338,9 @@ private[joins] object UnsafeHashedRelation {
       key: Seq[Expression],
       sizeEstimate: Int,
       taskMemoryManager: TaskMemoryManager,
-      isNullAware: Boolean = false): HashedRelation = {
+      isNullAware: Boolean = false,
+      isLookupAware: Boolean = false,
+      value: Option[Seq[Expression]] = None): HashedRelation = {

Review comment:
       @agrawaldevesh - `value` here is the expression for value of key-value 
hash map. e.g. a HasedRelation(key: int, value: <int, double>). `value` here is 
the expression <int, double>. Similar to parameter `key` above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to