Repository: spark
Updated Branches:
  refs/heads/branch-1.6 fc2942d12 -> 2946c85f5


[SPARK-11362] [SQL] Use Spark BitSet in BroadcastNestedLoopJoin

JIRA: https://issues.apache.org/jira/browse/SPARK-11362

We use scala.collection.mutable.BitSet in BroadcastNestedLoopJoin now. We 
should use Spark's BitSet.

Author: Liang-Chi Hsieh <vii...@appier.com>

Closes #9316 from viirya/use-spark-bitset.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2946c85f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2946c85f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2946c85f

Branch: refs/heads/branch-1.6
Commit: 2946c85f5f48516637a6ce52ba9e31caf3c8ee3a
Parents: fc2942d
Author: Liang-Chi Hsieh <vii...@appier.com>
Authored: Sat Nov 7 19:44:45 2015 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Mon Nov 9 10:02:46 2015 -0800

----------------------------------------------------------------------
 .../execution/joins/BroadcastNestedLoopJoin.scala | 18 ++++++++----------
 1 file changed, 8 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2946c85f/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
index 05d20f5..aab177b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
@@ -24,7 +24,7 @@ import 
org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
 import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.util.collection.CompactBuffer
+import org.apache.spark.util.collection.{BitSet, CompactBuffer}
 
 
 case class BroadcastNestedLoopJoin(
@@ -95,9 +95,7 @@ case class BroadcastNestedLoopJoin(
     /** All rows that either match both-way, or rows from streamed joined with 
nulls. */
     val matchesOrStreamedRowsWithNulls = streamed.execute().mapPartitions { 
streamedIter =>
       val matchedRows = new CompactBuffer[InternalRow]
-      // TODO: Use Spark's BitSet.
-      val includedBroadcastTuples =
-        new scala.collection.mutable.BitSet(broadcastedRelation.value.size)
+      val includedBroadcastTuples = new BitSet(broadcastedRelation.value.size)
       val joinedRow = new JoinedRow
 
       val leftNulls = new GenericMutableRow(left.output.size)
@@ -115,11 +113,11 @@ case class BroadcastNestedLoopJoin(
             case BuildRight if boundCondition(joinedRow(streamedRow, 
broadcastedRow)) =>
               matchedRows += resultProj(joinedRow(streamedRow, 
broadcastedRow)).copy()
               streamRowMatched = true
-              includedBroadcastTuples += i
+              includedBroadcastTuples.set(i)
             case BuildLeft if boundCondition(joinedRow(broadcastedRow, 
streamedRow)) =>
               matchedRows += resultProj(joinedRow(broadcastedRow, 
streamedRow)).copy()
               streamRowMatched = true
-              includedBroadcastTuples += i
+              includedBroadcastTuples.set(i)
             case _ =>
           }
           i += 1
@@ -138,8 +136,8 @@ case class BroadcastNestedLoopJoin(
 
     val includedBroadcastTuples = matchesOrStreamedRowsWithNulls.map(_._2)
     val allIncludedBroadcastTuples = includedBroadcastTuples.fold(
-      new scala.collection.mutable.BitSet(broadcastedRelation.value.size)
-    )(_ ++ _)
+      new BitSet(broadcastedRelation.value.size)
+    )(_ | _)
 
     val leftNulls = new GenericMutableRow(left.output.size)
     val rightNulls = new GenericMutableRow(right.output.size)
@@ -155,7 +153,7 @@ case class BroadcastNestedLoopJoin(
           val joinedRow = new JoinedRow
           joinedRow.withLeft(leftNulls)
           while (i < rel.length) {
-            if (!allIncludedBroadcastTuples.contains(i)) {
+            if (!allIncludedBroadcastTuples.get(i)) {
               buf += resultProj(joinedRow.withRight(rel(i))).copy()
             }
             i += 1
@@ -164,7 +162,7 @@ case class BroadcastNestedLoopJoin(
           val joinedRow = new JoinedRow
           joinedRow.withRight(rightNulls)
           while (i < rel.length) {
-            if (!allIncludedBroadcastTuples.contains(i)) {
+            if (!allIncludedBroadcastTuples.get(i)) {
               buf += resultProj(joinedRow.withLeft(rel(i))).copy()
             }
             i += 1


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to