[GitHub] [spark] agrawaldevesh commented on a change in pull request #29342: [SPARK-32399][SQL] Full outer shuffled hash join

2020-08-15 Thread GitBox


agrawaldevesh commented on a change in pull request #29342:
URL: https://github.com/apache/spark/pull/29342#discussion_r470892621



##
File path: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
##
@@ -1188,4 +1188,53 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
 classOf[BroadcastNestedLoopJoinExec]))
 }
   }
+
+  test("SPARK-32399: Full outer shuffled hash join") {
+val inputDFs = Seq(
+  // Test unique join key
+  (spark.range(10).selectExpr("id as k1"),
+spark.range(30).selectExpr("id as k2"),
+$"k1" === $"k2"),
+  // Test non-unique join key
+  (spark.range(10).selectExpr("id % 5 as k1"),
+spark.range(30).selectExpr("id % 5 as k2"),
+$"k1" === $"k2"),
+  // Test string join key
+  (spark.range(10).selectExpr("cast(id * 3 as string) as k1"),
+spark.range(30).selectExpr("cast(id as string) as k2"),
+$"k1" === $"k2"),
+  // Test build side at right
+  (spark.range(30).selectExpr("cast(id / 3 as string) as k1"),
+spark.range(10).selectExpr("cast(id as string) as k2"),
+$"k1" === $"k2"),
+  // Test NULL join key
+  (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr("value 
as k1"),
+spark.range(30).map(i => if (i % 4 == 0) i else 
null).selectExpr("value as k2"),
+$"k1" === $"k2"),
+  // Test multiple join keys
+  (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr(

Review comment:
   Oh I see you have combined it into the multiple join key code path. 
   
   Is it worth having uncorrelated nulls as a single key code test too ? (like 
on line 1211)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] agrawaldevesh commented on a change in pull request #29342: [SPARK-32399][SQL] Full outer shuffled hash join

2020-08-14 Thread GitBox


agrawaldevesh commented on a change in pull request #29342:
URL: https://github.com/apache/spark/pull/29342#discussion_r470873077



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##
@@ -71,8 +85,215 @@ case class ShuffledHashJoinExec(
 val numOutputRows = longMetric("numOutputRows")
 streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, 
buildIter) =>
   val hashed = buildHashedRelation(buildIter)
-  join(streamIter, hashed, numOutputRows)
+  joinType match {
+case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+case _ => join(streamIter, hashed, numOutputRows)
+  }
+}
+  }
+
+  private def fullOuterJoin(
+  streamIter: Iterator[InternalRow],
+  hashedRelation: HashedRelation,
+  numOutputRows: SQLMetric): Iterator[InternalRow] = {
+val joinKeys = streamSideKeyGenerator()
+val joinRow = new JoinedRow
+val (joinRowWithStream, joinRowWithBuild) = {
+  buildSide match {
+case BuildLeft => (joinRow.withRight _, joinRow.withLeft _)
+case BuildRight => (joinRow.withLeft _, joinRow.withRight _)
+  }
+}
+val buildNullRow = new GenericInternalRow(buildOutput.length)
+val streamNullRow = new GenericInternalRow(streamedOutput.length)
+lazy val streamNullJoinRowWithBuild = {
+  buildSide match {
+case BuildLeft =>
+  joinRow.withRight(streamNullRow)
+  joinRow.withLeft _
+case BuildRight =>
+  joinRow.withLeft(streamNullRow)
+  joinRow.withRight _
+  }
+}
+
+val iter = if (hashedRelation.keyIsUnique) {
+  fullOuterJoinWithUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRowWithStream,
+joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, 
streamNullRow)
+} else {
+  fullOuterJoinWithNonUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRowWithStream,
+joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, 
streamNullRow)
 }
+
+val resultProj = UnsafeProjection.create(output, output)
+iter.map { r =>
+  numOutputRows += 1
+  resultProj(r)
+}
+  }
+
+  /**
+   * Full outer shuffled hash join with unique join keys:
+   * 1. Process rows from stream side by looking up hash relation.
+   *Mark the matched rows from build side be looked up.
+   *A `BitSet` is used to track matched rows with key index.
+   * 2. Process rows from build side by iterating hash relation.
+   *Filter out rows from build side being matched already,
+   *by checking key index from `BitSet`.
+   */
+  private def fullOuterJoinWithUniqueKey(
+  streamIter: Iterator[InternalRow],
+  hashedRelation: HashedRelation,
+  joinKeys: UnsafeProjection,
+  joinRowWithStream: InternalRow => JoinedRow,
+  joinRowWithBuild: InternalRow => JoinedRow,
+  streamNullJoinRowWithBuild: => InternalRow => JoinedRow,
+  buildNullRow: GenericInternalRow,
+  streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+val matchedKeys = new BitSet(hashedRelation.maxNumKeysIndex)
+
+// Process stream side with looking up hash relation
+val streamResultIter = streamIter.map { srow =>
+  joinRowWithStream(srow)
+  val keys = joinKeys(srow)
+  if (keys.anyNull) {
+joinRowWithBuild(buildNullRow)
+  } else {
+val matched = hashedRelation.getValueWithKeyIndex(keys)
+if (matched != null) {
+  val keyIndex = matched.getKeyIndex
+  val buildRow = matched.getValue
+  val joinRow = joinRowWithBuild(buildRow)
+  if (boundCondition(joinRow)) {
+matchedKeys.set(keyIndex)
+joinRow
+  } else {
+joinRowWithBuild(buildNullRow)
+  }
+} else {
+  joinRowWithBuild(buildNullRow)
+}
+  }
+}
+
+// Process build side with filtering out the matched rows
+val buildResultIter = hashedRelation.valuesWithKeyIndex().flatMap {
+  valueRowWithKeyIndex =>
+val keyIndex = valueRowWithKeyIndex.getKeyIndex
+val isMatched = matchedKeys.get(keyIndex)
+if (!isMatched) {
+  val buildRow = valueRowWithKeyIndex.getValue
+  Some(streamNullJoinRowWithBuild(buildRow))
+} else {
+  None
+}
+}
+
+streamResultIter ++ buildResultIter
+  }
+
+  /**
+   * Full outer shuffled hash join with unique join keys:
+   * 1. Process rows from stream side by looking up hash relation.
+   *Mark the matched rows from build side be looked up.
+   *A `HashSet[Long]` is used to track matched rows with
+   *key index (Int) and value index (Int) together.
+   * 2. Process rows from build side by iterating hash relation.
+   *Filter out rows from build side being matched already,
+   *by checking key index and value index from `HashSet`.
+   *
+   * The "value index" is defined as 

[GitHub] [spark] agrawaldevesh commented on a change in pull request #29342: [SPARK-32399][SQL] Full outer shuffled hash join

2020-08-13 Thread GitBox


agrawaldevesh commented on a change in pull request #29342:
URL: https://github.com/apache/spark/pull/29342#discussion_r470263293



##
File path: core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
##
@@ -428,6 +428,62 @@ public MapIterator destructiveIterator() {
 return new MapIterator(numValues, new Location(), true);
   }
 
+  /**
+   * Iterator for the entries of this map. This is to first iterate over key 
index array
+   * `longArray` then accessing values in `dataPages`. NOTE: this is different 
from `MapIterator`
+   * in the sense that key index is preserved here
+   * (See `UnsafeHashedRelation` for example of usage).
+   */
+  public final class MapIteratorWithKeyIndex implements Iterator {
+
+private int keyIndex = 0;
+private int numRecords;
+private final Location loc;
+
+private MapIteratorWithKeyIndex(int numRecords, Location loc) {
+  this.numRecords = numRecords;
+  this.loc = loc;
+}
+
+@Override
+public boolean hasNext() {
+  return numRecords > 0;
+}
+
+@Override
+public Location next() {
+  if (!loc.isDefined() || !loc.nextValue()) {
+while (longArray.get(keyIndex * 2) == 0) {
+  keyIndex++;
+}
+loc.with(keyIndex, (int) longArray.get(keyIndex * 2 + 1), true);
+keyIndex++;

Review comment:
   Exactly, that way we know that numRecords is 'safe' and shouldn't 
overflow keyIndex





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] agrawaldevesh commented on a change in pull request #29342: [SPARK-32399][SQL] Full outer shuffled hash join

2020-08-13 Thread GitBox


agrawaldevesh commented on a change in pull request #29342:
URL: https://github.com/apache/spark/pull/29342#discussion_r470248197



##
File path: core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
##
@@ -428,6 +428,62 @@ public MapIterator destructiveIterator() {
 return new MapIterator(numValues, new Location(), true);
   }
 
+  /**
+   * Iterator for the entries of this map. This is to first iterate over key 
index array
+   * `longArray` then accessing values in `dataPages`. NOTE: this is different 
from `MapIterator`
+   * in the sense that key index is preserved here
+   * (See `UnsafeHashedRelation` for example of usage).
+   */
+  public final class MapIteratorWithKeyIndex implements Iterator {
+
+private int keyIndex = 0;
+private int numRecords;
+private final Location loc;
+
+private MapIteratorWithKeyIndex(int numRecords, Location loc) {
+  this.numRecords = numRecords;
+  this.loc = loc;
+}
+
+@Override
+public boolean hasNext() {
+  return numRecords > 0;
+}
+
+@Override
+public Location next() {
+  if (!loc.isDefined() || !loc.nextValue()) {
+while (longArray.get(keyIndex * 2) == 0) {
+  keyIndex++;
+}
+loc.with(keyIndex, (int) longArray.get(keyIndex * 2 + 1), true);
+keyIndex++;

Review comment:
   I don't have strong preferences for checking of `keyIndex`. I was more 
referring to making sure numRecords <= numValues. I think if we guarantee that, 
then keyIndex shouldn't grow beyond longArray.size().
   
   I also think that the bound check may not be relatively expensive compared 
to the `taskMemoryManager.getPage(fullKeyAddress)` call buried inside of 
Location.with. That should be pretty memory bound.  

##
File path: core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
##
@@ -428,6 +428,62 @@ public MapIterator destructiveIterator() {
 return new MapIterator(numValues, new Location(), true);
   }
 
+  /**
+   * Iterator for the entries of this map. This is to first iterate over key 
index array
+   * `longArray` then accessing values in `dataPages`. NOTE: this is different 
from `MapIterator`
+   * in the sense that key index is preserved here
+   * (See `UnsafeHashedRelation` for example of usage).
+   */
+  public final class MapIteratorWithKeyIndex implements Iterator {
+
+private int keyIndex = 0;

Review comment:
   SGTM





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] agrawaldevesh commented on a change in pull request #29342: [SPARK-32399][SQL] Full outer shuffled hash join

2020-08-13 Thread GitBox


agrawaldevesh commented on a change in pull request #29342:
URL: https://github.com/apache/spark/pull/29342#discussion_r470229887



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##
@@ -71,8 +85,210 @@ case class ShuffledHashJoinExec(
 val numOutputRows = longMetric("numOutputRows")
 streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, 
buildIter) =>
   val hashed = buildHashedRelation(buildIter)
-  join(streamIter, hashed, numOutputRows)
+  joinType match {
+case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+case _ => join(streamIter, hashed, numOutputRows)
+  }
+}
+  }
+
+  private def fullOuterJoin(
+  streamIter: Iterator[InternalRow],
+  hashedRelation: HashedRelation,
+  numOutputRows: SQLMetric): Iterator[InternalRow] = {
+val joinKeys = streamSideKeyGenerator()
+val joinRow = new JoinedRow
+val (joinRowWithStream, joinRowWithBuild) = {
+  buildSide match {
+case BuildLeft => (joinRow.withRight _, joinRow.withLeft _)
+case BuildRight => (joinRow.withLeft _, joinRow.withRight _)
+  }
+}
+val buildNullRow = new GenericInternalRow(buildOutput.length)
+val streamNullRow = new GenericInternalRow(streamedOutput.length)
+val streamNullJoinRow = new JoinedRow
+val streamNullJoinRowWithBuild = {
+  buildSide match {
+case BuildLeft =>
+  streamNullJoinRow.withRight(streamNullRow)
+  streamNullJoinRow.withLeft _
+case BuildRight =>
+  streamNullJoinRow.withLeft(streamNullRow)
+  streamNullJoinRow.withRight _
+  }
+}
+
+val iter = if (hashedRelation.keyIsUnique) {
+  fullOuterJoinWithUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRowWithStream,
+joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, 
streamNullRow)
+} else {
+  fullOuterJoinWithNonUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRowWithStream,
+joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, 
streamNullRow)
 }
+
+val resultProj = UnsafeProjection.create(output, output)
+iter.map { r =>
+  numOutputRows += 1
+  resultProj(r)
+}
+  }
+
+  /**
+   * Full outer shuffled hash join with unique join keys:
+   * 1. Process rows from stream side by looking up hash relation.
+   *Mark the matched rows from build side be looked up.
+   *A `BitSet` is used to track matched rows with key index.
+   * 2. Process rows from build side by iterating hash relation.
+   *Filter out rows from build side being matched already,
+   *by checking key index from `BitSet`.
+   */
+  private def fullOuterJoinWithUniqueKey(
+  streamIter: Iterator[InternalRow],
+  hashedRelation: HashedRelation,
+  joinKeys: UnsafeProjection,
+  joinRowWithStream: InternalRow => JoinedRow,
+  joinRowWithBuild: InternalRow => JoinedRow,
+  streamNullJoinRowWithBuild: InternalRow => JoinedRow,
+  buildNullRow: GenericInternalRow,
+  streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+val matchedKeys = new BitSet(hashedRelation.maxNumKeysIndex)
+
+// Process stream side with looking up hash relation
+val streamResultIter = streamIter.map { srow =>
+  joinRowWithStream(srow)
+  val keys = joinKeys(srow)
+  if (keys.anyNull) {
+joinRowWithBuild(buildNullRow)
+  } else {
+val matched = hashedRelation.getValueWithKeyIndex(keys)
+if (matched != null) {
+  val keyIndex = matched.getKeyIndex
+  val buildRow = matched.getValue
+  val joinRow = joinRowWithBuild(buildRow)
+  if (boundCondition(joinRow)) {
+matchedKeys.set(keyIndex)
+joinRow
+  } else {
+joinRowWithBuild(buildNullRow)
+  }
+} else {
+  joinRowWithBuild(buildNullRow)
+}
+  }
+}
+
+// Process build side with filtering out rows looked up and
+// passed join condition already
+val buildResultIter = hashedRelation.valuesWithKeyIndex().flatMap {
+  valueRowWithKeyIndex =>
+val keyIndex = valueRowWithKeyIndex.getKeyIndex
+val isMatched = matchedKeys.get(keyIndex)
+if (!isMatched) {
+  val buildRow = valueRowWithKeyIndex.getValue
+  Some(streamNullJoinRowWithBuild(buildRow))
+} else {
+  None
+}
+}
+
+streamResultIter ++ buildResultIter
+  }
+
+  /**
+   * Full outer shuffled hash join with unique join keys:
+   * 1. Process rows from stream side by looking up hash relation.
+   *Mark the matched rows from build side be looked up.
+   *A `HashSet[Long]` is used to track matched rows with
+   *key index (Int) and value index (Int) together.
+   * 2. Process rows from build side by iterating hash relation.
+   *Filter out rows from build side being matched 

[GitHub] [spark] agrawaldevesh commented on a change in pull request #29342: [SPARK-32399][SQL] Full outer shuffled hash join

2020-08-13 Thread GitBox


agrawaldevesh commented on a change in pull request #29342:
URL: https://github.com/apache/spark/pull/29342#discussion_r470211909



##
File path: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
##
@@ -1188,4 +1188,53 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
 classOf[BroadcastNestedLoopJoinExec]))
 }
   }
+
+  test("SPARK-32399: Full outer shuffled hash join") {
+val inputDFs = Seq(
+  // Test unique join key
+  (spark.range(10).selectExpr("id as k1"),
+spark.range(30).selectExpr("id as k2"),
+$"k1" === $"k2"),
+  // Test non-unique join key
+  (spark.range(10).selectExpr("id % 5 as k1"),
+spark.range(30).selectExpr("id % 5 as k2"),
+$"k1" === $"k2"),
+  // Test string join key
+  (spark.range(10).selectExpr("cast(id * 3 as string) as k1"),
+spark.range(30).selectExpr("cast(id as string) as k2"),
+$"k1" === $"k2"),
+  // Test build side at right
+  (spark.range(30).selectExpr("cast(id / 3 as string) as k1"),
+spark.range(10).selectExpr("cast(id as string) as k2"),
+$"k1" === $"k2"),
+  // Test NULL join key
+  (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr("value 
as k1"),
+spark.range(30).map(i => if (i % 4 == 0) i else 
null).selectExpr("value as k2"),
+$"k1" === $"k2"),
+  // Test multiple join keys
+  (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr(
+"value as k1", "cast(value % 5 as short) as k2", "cast(value * 3 as 
long) as k3"),
+spark.range(30).map(i => if (i % 4 == 0) i else null).selectExpr(
+  "value as k4", "cast(value % 5 as short) as k5", "cast(value * 3 as 
long) as k6"),
+$"k1" === $"k4" && $"k2" === $"k5" && $"k3" === $"k6")
+)
+inputDFs.foreach { case (df1, df2, joinExprs) =>
+  withSQLConf(
+SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80",

Review comment:
   Okay fine :-P the assert on line 1234 convinces me that the magic number 
"works". Tricking this thing to use SHJ is hard !.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] agrawaldevesh commented on a change in pull request #29342: [SPARK-32399][SQL] Full outer shuffled hash join

2020-08-13 Thread GitBox


agrawaldevesh commented on a change in pull request #29342:
URL: https://github.com/apache/spark/pull/29342#discussion_r470091083



##
File path: core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
##
@@ -601,6 +657,14 @@ public boolean isDefined() {
   return isDefined;
 }
 
+/**
+ * Returns index for key.
+ */
+public int getKeyIndex() {

Review comment:
   See comment above about possibility eliminating this notion of keyIndex 
and sticking with pos.

##
File path: core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
##
@@ -428,6 +428,62 @@ public MapIterator destructiveIterator() {
 return new MapIterator(numValues, new Location(), true);
   }
 
+  /**
+   * Iterator for the entries of this map. This is to first iterate over key 
index array
+   * `longArray` then accessing values in `dataPages`. NOTE: this is different 
from `MapIterator`
+   * in the sense that key index is preserved here
+   * (See `UnsafeHashedRelation` for example of usage).
+   */
+  public final class MapIteratorWithKeyIndex implements Iterator {
+
+private int keyIndex = 0;
+private int numRecords;
+private final Location loc;
+
+private MapIteratorWithKeyIndex(int numRecords, Location loc) {
+  this.numRecords = numRecords;
+  this.loc = loc;
+}
+
+@Override
+public boolean hasNext() {
+  return numRecords > 0;
+}
+
+@Override
+public Location next() {
+  if (!loc.isDefined() || !loc.nextValue()) {
+while (longArray.get(keyIndex * 2) == 0) {
+  keyIndex++;
+}
+loc.with(keyIndex, (int) longArray.get(keyIndex * 2 + 1), true);
+keyIndex++;

Review comment:
   Should there be any bounds check done on `numRecords` to ensure that 
keyIndex won't wrap around ? Or is this tooo internal an iterator to care about 
this ? 
   
   Basically keyIndex can grow beyond the longArray.size() if numRecords is 
sufficiently big ? 

##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
##
@@ -116,7 +116,9 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
*
* - Shuffle hash join:
* Only supported for equi-joins, while the join keys do not need to be 
sortable.
-   * Supported for all join types except full outer joins.
+   * Supported for all join types.
+   * Building hash map from table is a memory-intensive operation and it 
could cause OOM

Review comment:
   Should we add some commentary about what is meant by "hash map" here ? 
Is it the hash map you are using for storing the matched-bits or is it the 
build hash table ? (something true of all hash joins ?) 

##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##
@@ -66,6 +66,30 @@ private[execution] sealed trait HashedRelation extends 
KnownSizeEstimation {
 throw new UnsupportedOperationException
   }
 
+  /**
+   * Returns key index and matched rows.

Review comment:
   Same comment here. I cannot find the concept "key index" anywhere else 
in the Spark code. Is there another "commonly used name" for "key index / 
keyIndex" that can use ? 
   
   If not, lets figure out where to clearly define it.
   

##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##
@@ -71,8 +85,210 @@ case class ShuffledHashJoinExec(
 val numOutputRows = longMetric("numOutputRows")
 streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, 
buildIter) =>
   val hashed = buildHashedRelation(buildIter)
-  join(streamIter, hashed, numOutputRows)
+  joinType match {
+case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+case _ => join(streamIter, hashed, numOutputRows)
+  }
+}
+  }
+
+  private def fullOuterJoin(
+  streamIter: Iterator[InternalRow],
+  hashedRelation: HashedRelation,
+  numOutputRows: SQLMetric): Iterator[InternalRow] = {
+val joinKeys = streamSideKeyGenerator()
+val joinRow = new JoinedRow
+val (joinRowWithStream, joinRowWithBuild) = {
+  buildSide match {
+case BuildLeft => (joinRow.withRight _, joinRow.withLeft _)
+case BuildRight => (joinRow.withLeft _, joinRow.withRight _)
+  }
+}
+val buildNullRow = new GenericInternalRow(buildOutput.length)
+val streamNullRow = new GenericInternalRow(streamedOutput.length)
+val streamNullJoinRow = new JoinedRow
+val streamNullJoinRowWithBuild = {
+  buildSide match {
+case BuildLeft =>
+  streamNullJoinRow.withRight(streamNullRow)
+  streamNullJoinRow.withLeft _
+case BuildRight =>
+  streamNullJoinRow.withLeft(streamNullRow)
+  streamNullJoinRow.withRight _
+  }
+}
+
+val iter = if (hashedRelation.keyIsUnique) {
+  

[GitHub] [spark] agrawaldevesh commented on a change in pull request #29342: [SPARK-32399][SQL] Full outer shuffled hash join

2020-08-10 Thread GitBox


agrawaldevesh commented on a change in pull request #29342:
URL: https://github.com/apache/spark/pull/29342#discussion_r467709017



##
File path: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
##
@@ -1188,4 +1188,53 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
 classOf[BroadcastNestedLoopJoinExec]))
 }
   }
+
+  test("SPARK-32399: Full outer shuffled hash join") {
+val inputDFs = Seq(
+  // Test unique join key
+  (spark.range(10).selectExpr("id as k1"),
+spark.range(30).selectExpr("id as k2"),
+$"k1" === $"k2"),
+  // Test non-unique join key
+  (spark.range(10).selectExpr("id % 5 as k1"),
+spark.range(30).selectExpr("id % 5 as k2"),
+$"k1" === $"k2"),
+  // Test string join key
+  (spark.range(10).selectExpr("cast(id * 3 as string) as k1"),
+spark.range(30).selectExpr("cast(id as string) as k2"),
+$"k1" === $"k2"),
+  // Test build side at right
+  (spark.range(30).selectExpr("cast(id / 3 as string) as k1"),
+spark.range(10).selectExpr("cast(id as string) as k2"),
+$"k1" === $"k2"),
+  // Test NULL join key
+  (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr("value 
as k1"),
+spark.range(30).map(i => if (i % 4 == 0) i else 
null).selectExpr("value as k2"),
+$"k1" === $"k2"),
+  // Test multiple join keys
+  (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr(
+"value as k1", "cast(value % 5 as short) as k2", "cast(value * 3 as 
long) as k3"),
+spark.range(30).map(i => if (i % 4 == 0) i else null).selectExpr(
+  "value as k4", "cast(value % 5 as short) as k5", "cast(value * 3 as 
long) as k6"),
+$"k1" === $"k4" && $"k2" === $"k5" && $"k3" === $"k6")
+)
+inputDFs.foreach { case (df1, df2, joinExprs) =>
+  withSQLConf(
+SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80",

Review comment:
   You have used the word 'careful tuning' several times but I am not fully 
following out of my stupidity: Do you mean that to say that the broadcast 
threshold should be chosen to be larger than the build side size, so as to NOT 
trigger a BHJ ? 
   
   Perhaps add a comment above this line to explain to the reader why the 
configs are chosen the way they are: For example why shuffle-partitions is 2 
etc.
   
   Btw, is it worth testing with shuffle partitions = 1 ?

##
File path: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
##
@@ -1188,4 +1188,42 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
 classOf[BroadcastNestedLoopJoinExec]))
 }
   }
+
+  test("SPARK-32399: Full outer shuffled hash join") {

Review comment:
   Thanks for noticing the lack in the test coverage and explaining this to 
me ! I understand this better now.
   
   But you see where I am coming from :-) I am simply concerned about data 
corruption (more precisely wrong result computation). 
   
   Ideally, we should first fix the test, land that fix and then retest this PR 
with the tests fixed. It seems like it shouldn't be too hard, nor cause any 
merge conflicts, but it would delay landing this PR.
   
   As a compromise, may I please suggest that:
   - Make that test change as mentioned in SPARK-32577 locally and run the sql 
tests locally to verify that they work.
   - Run JoinSuite with code coverage enabled and ensure that all of the 
changed code paths in this PR are covered. 
   
   I recently worked on a join related feature and I was humbled by how many 
bugs I was able to make around nulls, strings and its relationship with 
downstream union queries :-D
   
   I think the tests in JoinSuite are good, but I am more concerned about the 
unknown unknowns and any regressions. And interactions of join with other 
operators like aggregations and unions.
   
   This is not a blocker at all !. Just due diligence and paranoia.

##
File path: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
##
@@ -1188,4 +1188,53 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
 classOf[BroadcastNestedLoopJoinExec]))
 }
   }
+
+  test("SPARK-32399: Full outer shuffled hash join") {
+val inputDFs = Seq(
+  // Test unique join key
+  (spark.range(10).selectExpr("id as k1"),
+spark.range(30).selectExpr("id as k2"),
+$"k1" === $"k2"),
+  // Test non-unique join key
+  (spark.range(10).selectExpr("id % 5 as k1"),
+spark.range(30).selectExpr("id % 5 as k2"),
+$"k1" === $"k2"),
+  // Test string join key
+  (spark.range(10).selectExpr("cast(id * 3 as string) as k1"),
+spark.range(30).selectExpr("cast(id as string) as k2"),
+$"k1" === $"k2"),
+  // Test build side at right
+  (spark.range(30).selectExpr("cast(id / 3 as string) as k1"),
+

[GitHub] [spark] agrawaldevesh commented on a change in pull request #29342: [SPARK-32399][SQL] Full outer shuffled hash join

2020-08-09 Thread GitBox


agrawaldevesh commented on a change in pull request #29342:
URL: https://github.com/apache/spark/pull/29342#discussion_r467667250



##
File path: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
##
@@ -1188,4 +1188,42 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
 classOf[BroadcastNestedLoopJoinExec]))
 }
   }
+
+  test("SPARK-32399: Full outer shuffled hash join") {

Review comment:
   I didn't fully follow: Do you mean that until 
https://issues.apache.org/jira/browse/SPARK-32577 is fixed we don't have very 
high confidence in this optimization truly producing the same results as 
without it ?
   
   All I am trying to ascertain is whether this optimization is safe in all 
cases: Would the results produced for full outer join be identical both with 
and without this optimization ? My understanding is that currently, this is 
only validated by the above scala unit tests that have been newly added, but it 
hasn't been fully validated for all full-outer-join scenarios due to 
https://issues.apache.org/jira/browse/SPARK-32577. Is that an accurate 
understanding ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] agrawaldevesh commented on a change in pull request #29342: [SPARK-32399][SQL] Full outer shuffled hash join

2020-08-08 Thread GitBox


agrawaldevesh commented on a change in pull request #29342:
URL: https://github.com/apache/spark/pull/29342#discussion_r467372338



##
File path: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
##
@@ -1188,4 +1188,42 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
 classOf[BroadcastNestedLoopJoinExec]))
 }
   }
+
+  test("SPARK-32399: Full outer shuffled hash join") {

Review comment:
   I am curious if it might be possible to unconditionally enable this 
optimization for _all_ full outer joins to make sure that this optimization 
unconditionally works ? A similar approach was taken in PR #29104, if you 
search for `CONFIG_DIM1 spark.sql.optimizeNullAwareAntiJoin=true` in all `.sql` 
files like `group-by-filter.sql` for example. 

##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##
@@ -97,7 +102,9 @@ private[execution] object HashedRelation {
   key: Seq[Expression],
   sizeEstimate: Int = 64,
   taskMemoryManager: TaskMemoryManager = null,
-  isNullAware: Boolean = false): HashedRelation = {
+  isNullAware: Boolean = false,
+  isLookupAware: Boolean = false,

Review comment:
   canMarkMatchedRow ? or canMarkLookedupRow ?
   
   stemming from the use of `markRowLookedUp` below.

##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
##
@@ -580,4 +580,28 @@ class HashedRelationSuite extends SharedSparkSession {
   assert(proj(packedKeys).get(0, dt) == -i - 1)
 }
   }
+
+  test("SPARK-32399: test values() method for HashedRelation") {
+val key = Seq(BoundReference(0, LongType, false))
+val value = Seq(BoundReference(0, IntegerType, true))
+val unsafeProj = UnsafeProjection.create(value)
+val rows = (0 until 100).map(i => unsafeProj(InternalRow(i + 1)).copy())
+
+// test LongHashedRelation
+val longRelation = LongHashedRelation(rows.iterator, key, 10, mm)
+var values = longRelation.values()
+assert(values.map(_.getInt(0)).toArray.sortWith(_ < _) === (0 until 
100).map(i => i + 1))

Review comment:
   This is quite a mouthful to understand (on this line and the sorts on 
lines 598 and 604 below). Can you add some comments on what is this test trying 
to assert ?

##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##
@@ -71,8 +89,134 @@ case class ShuffledHashJoinExec(
 val numOutputRows = longMetric("numOutputRows")
 streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, 
buildIter) =>
   val hashed = buildHashedRelation(buildIter)
-  join(streamIter, hashed, numOutputRows)
+  joinType match {
+case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+case _ => join(streamIter, hashed, numOutputRows)
+  }
+}
+  }
+
+  /**
+   * Full outer shuffled hash join has three steps:
+   * 1. Construct hash relation from build side,
+   *with extra boolean value at the end of row to track look up information
+   *(done in `buildHashedRelation`).
+   * 2. Process rows from stream side by looking up hash relation,
+   *and mark the matched rows from build side be looked up.
+   * 3. Process rows from build side by iterating hash relation,
+   *and filter out rows from build side being looked up already.
+   */
+  private def fullOuterJoin(
+  streamIter: Iterator[InternalRow],
+  hashedRelation: HashedRelation,
+  numOutputRows: SQLMetric): Iterator[InternalRow] = {
+abstract class HashJoinedRow extends JoinedRow {
+  /** Updates this JoinedRow by updating its stream side row. Returns 
itself. */
+  def withStream(newStream: InternalRow): JoinedRow
+
+  /** Updates this JoinedRow by updating its build side row. Returns 
itself. */
+  def withBuild(newBuild: InternalRow): JoinedRow
 }
+val joinRow: HashJoinedRow = buildSide match {
+  case BuildLeft =>
+new HashJoinedRow {
+  override def withStream(newStream: InternalRow): JoinedRow = 
withRight(newStream)
+  override def withBuild(newBuild: InternalRow): JoinedRow = 
withLeft(newBuild)
+}
+  case BuildRight =>
+new HashJoinedRow {
+  override def withStream(newStream: InternalRow): JoinedRow = 
withLeft(newStream)
+  override def withBuild(newBuild: InternalRow): JoinedRow = 
withRight(newBuild)
+}
+}
+val joinKeys = streamSideKeyGenerator()
+val buildRowGenerator = UnsafeProjection.create(buildOutput, buildOutput)
+val buildNullRow = new GenericInternalRow(buildOutput.length)
+val streamNullRow = new GenericInternalRow(streamedOutput.length)
+
+def markRowLookedUp(row: UnsafeRow): Unit =
+  row.setBoolean(row.numFields() - 1, true)
+
+// Process stream side with looking up hash relation
+val 

[GitHub] [spark] agrawaldevesh commented on a change in pull request #29342: [SPARK-32399][SQL] Full outer shuffled hash join

2020-08-06 Thread GitBox


agrawaldevesh commented on a change in pull request #29342:
URL: https://github.com/apache/spark/pull/29342#discussion_r466738015



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##
@@ -71,8 +89,134 @@ case class ShuffledHashJoinExec(
 val numOutputRows = longMetric("numOutputRows")
 streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, 
buildIter) =>
   val hashed = buildHashedRelation(buildIter)
-  join(streamIter, hashed, numOutputRows)
+  joinType match {
+case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)

Review comment:
   I am not sure if that's a good idea: 
spark.sql.autoBroadcastJoinThreshold is a very widely used config and I think 
we should have a separate config to disable just this full outer join 
optimization, without having to turn of BHJ in itself.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] agrawaldevesh commented on a change in pull request #29342: [SPARK-32399][SQL] Full outer shuffled hash join

2020-08-06 Thread GitBox


agrawaldevesh commented on a change in pull request #29342:
URL: https://github.com/apache/spark/pull/29342#discussion_r466556531



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##
@@ -76,6 +76,11 @@ private[execution] sealed trait HashedRelation extends 
KnownSizeEstimation {
*/
   def keys(): Iterator[InternalRow]
 
+  /**
+   * Returns an iterator for values of InternalRow type.
+   */
+  def values(): Iterator[InternalRow]

Review comment:
   nit: Down below this is referred to as a build row. Should values be 
renamed to buildRows ? or just rows ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] agrawaldevesh commented on a change in pull request #29342: [SPARK-32399][SQL] Full outer shuffled hash join

2020-08-06 Thread GitBox


agrawaldevesh commented on a change in pull request #29342:
URL: https://github.com/apache/spark/pull/29342#discussion_r466533520



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##
@@ -314,7 +338,9 @@ private[joins] object UnsafeHashedRelation {
   key: Seq[Expression],
   sizeEstimate: Int,
   taskMemoryManager: TaskMemoryManager,
-  isNullAware: Boolean = false): HashedRelation = {
+  isNullAware: Boolean = false,
+  isLookupAware: Boolean = false,
+  value: Option[Seq[Expression]] = None): HashedRelation = {

Review comment:
   Would `isLookedUp` be a better name for `value` ? Is there a way to 
force the Expression type to be a Boolean ?

##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##
@@ -71,8 +89,134 @@ case class ShuffledHashJoinExec(
 val numOutputRows = longMetric("numOutputRows")
 streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, 
buildIter) =>
   val hashed = buildHashedRelation(buildIter)
-  join(streamIter, hashed, numOutputRows)
+  joinType match {
+case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+case _ => join(streamIter, hashed, numOutputRows)
+  }
+}
+  }
+
+  /**
+   * Full outer shuffled hash join has three steps:
+   * 1. Construct hash relation from build side,
+   *with extra boolean value at the end of row to track look up information
+   *(done in `buildHashedRelation`).
+   * 2. Process rows from stream side by looking up hash relation,
+   *and mark the matched rows from build side be looked up.
+   * 3. Process rows from build side by iterating hash relation,
+   *and filter out rows from build side being looked up already.
+   */
+  private def fullOuterJoin(
+  streamIter: Iterator[InternalRow],
+  hashedRelation: HashedRelation,
+  numOutputRows: SQLMetric): Iterator[InternalRow] = {
+abstract class HashJoinedRow extends JoinedRow {
+  /** Updates this JoinedRow by updating its stream side row. Returns 
itself. */
+  def withStream(newStream: InternalRow): JoinedRow
+
+  /** Updates this JoinedRow by updating its build side row. Returns 
itself. */
+  def withBuild(newBuild: InternalRow): JoinedRow
 }
+val joinRow: HashJoinedRow = buildSide match {
+  case BuildLeft =>
+new HashJoinedRow {
+  override def withStream(newStream: InternalRow): JoinedRow = 
withRight(newStream)
+  override def withBuild(newBuild: InternalRow): JoinedRow = 
withLeft(newBuild)
+}
+  case BuildRight =>
+new HashJoinedRow {
+  override def withStream(newStream: InternalRow): JoinedRow = 
withLeft(newStream)
+  override def withBuild(newBuild: InternalRow): JoinedRow = 
withRight(newBuild)
+}
+}
+val joinKeys = streamSideKeyGenerator()
+val buildRowGenerator = UnsafeProjection.create(buildOutput, buildOutput)
+val buildNullRow = new GenericInternalRow(buildOutput.length)
+val streamNullRow = new GenericInternalRow(streamedOutput.length)
+
+def markRowLookedUp(row: UnsafeRow): Unit =
+  row.setBoolean(row.numFields() - 1, true)
+
+// Process stream side with looking up hash relation
+val streamResultIter =
+  if (hashedRelation.keyIsUnique) {
+streamIter.map { srow =>
+  joinRow.withStream(srow)
+  val keys = joinKeys(srow)
+  if (keys.anyNull) {
+joinRow.withBuild(buildNullRow)
+  } else {
+val matched = hashedRelation.getValue(keys)
+if (matched != null) {
+  val buildRow = buildRowGenerator(matched)
+  if (boundCondition(joinRow.withBuild(buildRow))) {
+markRowLookedUp(matched.asInstanceOf[UnsafeRow])

Review comment:
   As my overall review comment states, I believe this marking does not 
have to be stored in the hash table. Have you considered using a position-list 
or a bitset (compressed or not) for this ? Marked up rows are only considered 
within this full outer join RDD closure, so the space for them does not have to 
be allocated inside the hash table.
   
   (If I am understanding correctly)

##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##
@@ -97,7 +102,9 @@ private[execution] object HashedRelation {
   key: Seq[Expression],
   sizeEstimate: Int = 64,
   taskMemoryManager: TaskMemoryManager = null,
-  isNullAware: Boolean = false): HashedRelation = {
+  isNullAware: Boolean = false,
+  isLookupAware: Boolean = false,
+  value: Option[Seq[Expression]] = None): HashedRelation = {

Review comment:
   Please add some documentation for 'value' too.

##
File path: