Repository: spark
Updated Branches:
  refs/heads/branch-1.5 43b30bc6c -> c39d5d144


[SPARK-9482] [SQL] Fix thread-safey issue of using UnsafeProjection in join

This PR also change to use `def` instead of `lazy val` for UnsafeProjection, 
because it's not thread safe.

TODO: cleanup the debug code once the flaky test passed 100 times.

Author: Davies Liu <dav...@databricks.com>

Closes #7940 from davies/semijoin and squashes the following commits:

93baac7 [Davies Liu] fix outerjoin
5c40ded [Davies Liu] address comments
aa3de46 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
semijoin
7590a25 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
semijoin
2d4085b [Davies Liu] use def for resultProjection
0833407 [Davies Liu] Merge branch 'semijoin' of github.com:davies/spark into 
semijoin
e0d8c71 [Davies Liu] use lazy val
6a59e8f [Davies Liu] Update HashedRelation.scala
0fdacaf [Davies Liu] fix broadcast and thread-safety of UnsafeProjection
2fc3ef6 [Davies Liu] reproduce failure in semijoin

(cherry picked from commit 93085c992e40dbc06714cb1a64c838e25e683a6f)
Signed-off-by: Davies Liu <davies....@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c39d5d14
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c39d5d14
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c39d5d14

Branch: refs/heads/branch-1.5
Commit: c39d5d144fe051cd5ff716e56a5f57bce0d4913d
Parents: 43b30bc
Author: Davies Liu <dav...@databricks.com>
Authored: Thu Aug 6 09:12:41 2015 -0700
Committer: Davies Liu <davies....@gmail.com>
Committed: Thu Aug 6 09:12:54 2015 -0700

----------------------------------------------------------------------
 .../sql/execution/joins/BroadcastHashJoin.scala |  6 ++---
 .../joins/BroadcastHashOuterJoin.scala          | 20 ++++++-----------
 .../joins/BroadcastNestedLoopJoin.scala         | 17 +++++++++------
 .../spark/sql/execution/joins/HashJoin.scala    |  4 ++--
 .../sql/execution/joins/HashOuterJoin.scala     | 23 ++++++++++----------
 .../sql/execution/joins/HashSemiJoin.scala      |  8 +++----
 .../sql/execution/joins/HashedRelation.scala    |  4 ++--
 .../execution/joins/ShuffledHashOuterJoin.scala |  6 +++--
 8 files changed, 44 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c39d5d14/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index ec1a148..f7a68e4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -20,14 +20,14 @@ package org.apache.spark.sql.execution.joins
 import scala.concurrent._
 import scala.concurrent.duration._
 
-import org.apache.spark.{InternalAccumulator, TaskContext}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.plans.physical.{Distribution, 
Partitioning, UnspecifiedDistribution}
-import org.apache.spark.sql.execution.{BinaryNode, SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.{BinaryNode, SQLExecution, SparkPlan}
 import org.apache.spark.util.ThreadUtils
+import org.apache.spark.{InternalAccumulator, TaskContext}
 
 /**
  * :: DeveloperApi ::
@@ -102,6 +102,6 @@ case class BroadcastHashJoin(
 
 object BroadcastHashJoin {
 
-  private val broadcastHashJoinExecutionContext = 
ExecutionContext.fromExecutorService(
+  private[joins] val broadcastHashJoinExecutionContext = 
ExecutionContext.fromExecutorService(
     ThreadUtils.newDaemonCachedThreadPool("broadcast-hash-join", 128))
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c39d5d14/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
index e342fd9..a3626de 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
@@ -20,15 +20,14 @@ package org.apache.spark.sql.execution.joins
 import scala.concurrent._
 import scala.concurrent.duration._
 
-import org.apache.spark.{InternalAccumulator, TaskContext}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
Distribution, UnspecifiedDistribution}
+import org.apache.spark.sql.catalyst.plans.physical.{Distribution, 
Partitioning, UnspecifiedDistribution}
 import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
-import org.apache.spark.sql.execution.{BinaryNode, SparkPlan, SQLExecution}
-import org.apache.spark.util.ThreadUtils
+import org.apache.spark.sql.execution.{BinaryNode, SQLExecution, SparkPlan}
+import org.apache.spark.{InternalAccumulator, TaskContext}
 
 /**
  * :: DeveloperApi ::
@@ -76,7 +75,7 @@ case class BroadcastHashOuterJoin(
         val hashed = HashedRelation(input.iterator, buildKeyGenerator, 
input.size)
         sparkContext.broadcast(hashed)
       }
-    }(BroadcastHashOuterJoin.broadcastHashOuterJoinExecutionContext)
+    }(BroadcastHashJoin.broadcastHashJoinExecutionContext)
   }
 
   protected override def doPrepare(): Unit = {
@@ -98,19 +97,20 @@ case class BroadcastHashOuterJoin(
         case _ =>
       }
 
+      val resultProj = resultProjection
       joinType match {
         case LeftOuter =>
           streamedIter.flatMap(currentRow => {
             val rowKey = keyGenerator(currentRow)
             joinedRow.withLeft(currentRow)
-            leftOuterIterator(rowKey, joinedRow, hashTable.get(rowKey))
+            leftOuterIterator(rowKey, joinedRow, hashTable.get(rowKey), 
resultProj)
           })
 
         case RightOuter =>
           streamedIter.flatMap(currentRow => {
             val rowKey = keyGenerator(currentRow)
             joinedRow.withRight(currentRow)
-            rightOuterIterator(rowKey, hashTable.get(rowKey), joinedRow)
+            rightOuterIterator(rowKey, hashTable.get(rowKey), joinedRow, 
resultProj)
           })
 
         case x =>
@@ -120,9 +120,3 @@ case class BroadcastHashOuterJoin(
     }
   }
 }
-
-object BroadcastHashOuterJoin {
-
-  private val broadcastHashOuterJoinExecutionContext = 
ExecutionContext.fromExecutorService(
-    ThreadUtils.newDaemonCachedThreadPool("broadcast-hash-outer-join", 128))
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/c39d5d14/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
index 83b726a..23aebf4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
@@ -47,7 +47,7 @@ case class BroadcastNestedLoopJoin(
   override def outputsUnsafeRows: Boolean = left.outputsUnsafeRows || 
right.outputsUnsafeRows
   override def canProcessUnsafeRows: Boolean = true
 
-  @transient private[this] lazy val resultProjection: InternalRow => 
InternalRow = {
+  private[this] def genResultProjection: InternalRow => InternalRow = {
     if (outputsUnsafeRows) {
       UnsafeProjection.create(schema)
     } else {
@@ -88,6 +88,7 @@ case class BroadcastNestedLoopJoin(
 
       val leftNulls = new GenericMutableRow(left.output.size)
       val rightNulls = new GenericMutableRow(right.output.size)
+      val resultProj = genResultProjection
 
       streamedIter.foreach { streamedRow =>
         var i = 0
@@ -97,11 +98,11 @@ case class BroadcastNestedLoopJoin(
           val broadcastedRow = broadcastedRelation.value(i)
           buildSide match {
             case BuildRight if boundCondition(joinedRow(streamedRow, 
broadcastedRow)) =>
-              matchedRows += resultProjection(joinedRow(streamedRow, 
broadcastedRow)).copy()
+              matchedRows += resultProj(joinedRow(streamedRow, 
broadcastedRow)).copy()
               streamRowMatched = true
               includedBroadcastTuples += i
             case BuildLeft if boundCondition(joinedRow(broadcastedRow, 
streamedRow)) =>
-              matchedRows += resultProjection(joinedRow(broadcastedRow, 
streamedRow)).copy()
+              matchedRows += resultProj(joinedRow(broadcastedRow, 
streamedRow)).copy()
               streamRowMatched = true
               includedBroadcastTuples += i
             case _ =>
@@ -111,9 +112,9 @@ case class BroadcastNestedLoopJoin(
 
         (streamRowMatched, joinType, buildSide) match {
           case (false, LeftOuter | FullOuter, BuildRight) =>
-            matchedRows += resultProjection(joinedRow(streamedRow, 
rightNulls)).copy()
+            matchedRows += resultProj(joinedRow(streamedRow, 
rightNulls)).copy()
           case (false, RightOuter | FullOuter, BuildLeft) =>
-            matchedRows += resultProjection(joinedRow(leftNulls, 
streamedRow)).copy()
+            matchedRows += resultProj(joinedRow(leftNulls, streamedRow)).copy()
           case _ =>
         }
       }
@@ -127,6 +128,8 @@ case class BroadcastNestedLoopJoin(
 
     val leftNulls = new GenericMutableRow(left.output.size)
     val rightNulls = new GenericMutableRow(right.output.size)
+    val resultProj = genResultProjection
+
     /** Rows from broadcasted joined with nulls. */
     val broadcastRowsWithNulls: Seq[InternalRow] = {
       val buf: CompactBuffer[InternalRow] = new CompactBuffer()
@@ -138,7 +141,7 @@ case class BroadcastNestedLoopJoin(
           joinedRow.withLeft(leftNulls)
           while (i < rel.length) {
             if (!allIncludedBroadcastTuples.contains(i)) {
-              buf += resultProjection(joinedRow.withRight(rel(i))).copy()
+              buf += resultProj(joinedRow.withRight(rel(i))).copy()
             }
             i += 1
           }
@@ -147,7 +150,7 @@ case class BroadcastNestedLoopJoin(
           joinedRow.withRight(rightNulls)
           while (i < rel.length) {
             if (!allIncludedBroadcastTuples.contains(i)) {
-              buf += resultProjection(joinedRow.withLeft(rel(i))).copy()
+              buf += resultProj(joinedRow.withLeft(rel(i))).copy()
             }
             i += 1
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/c39d5d14/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index 6b3d165..5e9cd9f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -52,14 +52,14 @@ trait HashJoin {
   override def canProcessUnsafeRows: Boolean = isUnsafeMode
   override def canProcessSafeRows: Boolean = !isUnsafeMode
 
-  @transient protected lazy val buildSideKeyGenerator: Projection =
+  protected def buildSideKeyGenerator: Projection =
     if (isUnsafeMode) {
       UnsafeProjection.create(buildKeys, buildPlan.output)
     } else {
       newMutableProjection(buildKeys, buildPlan.output)()
     }
 
-  @transient protected lazy val streamSideKeyGenerator: Projection =
+  protected def streamSideKeyGenerator: Projection =
     if (isUnsafeMode) {
       UnsafeProjection.create(streamedKeys, streamedPlan.output)
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/c39d5d14/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
index a323aea..346337e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
@@ -76,14 +76,14 @@ trait HashOuterJoin {
   override def canProcessUnsafeRows: Boolean = isUnsafeMode
   override def canProcessSafeRows: Boolean = !isUnsafeMode
 
-  @transient protected lazy val buildKeyGenerator: Projection =
+  protected def buildKeyGenerator: Projection =
     if (isUnsafeMode) {
       UnsafeProjection.create(buildKeys, buildPlan.output)
     } else {
       newMutableProjection(buildKeys, buildPlan.output)()
     }
 
-  @transient protected[this] lazy val streamedKeyGenerator: Projection = {
+  protected[this] def streamedKeyGenerator: Projection = {
     if (isUnsafeMode) {
       UnsafeProjection.create(streamedKeys, streamedPlan.output)
     } else {
@@ -91,7 +91,7 @@ trait HashOuterJoin {
     }
   }
 
-  @transient private[this] lazy val resultProjection: InternalRow => 
InternalRow = {
+  protected[this] def resultProjection: InternalRow => InternalRow = {
     if (isUnsafeMode) {
       UnsafeProjection.create(self.schema)
     } else {
@@ -113,7 +113,8 @@ trait HashOuterJoin {
   protected[this] def leftOuterIterator(
       key: InternalRow,
       joinedRow: JoinedRow,
-      rightIter: Iterable[InternalRow]): Iterator[InternalRow] = {
+      rightIter: Iterable[InternalRow],
+      resultProjection: InternalRow => InternalRow): Iterator[InternalRow] = {
     val ret: Iterable[InternalRow] = {
       if (!key.anyNull) {
         val temp = if (rightIter != null) {
@@ -124,12 +125,12 @@ trait HashOuterJoin {
           List.empty
         }
         if (temp.isEmpty) {
-          resultProjection(joinedRow.withRight(rightNullRow)).copy :: Nil
+          resultProjection(joinedRow.withRight(rightNullRow)) :: Nil
         } else {
           temp
         }
       } else {
-        resultProjection(joinedRow.withRight(rightNullRow)).copy :: Nil
+        resultProjection(joinedRow.withRight(rightNullRow)) :: Nil
       }
     }
     ret.iterator
@@ -138,24 +139,24 @@ trait HashOuterJoin {
   protected[this] def rightOuterIterator(
       key: InternalRow,
       leftIter: Iterable[InternalRow],
-      joinedRow: JoinedRow): Iterator[InternalRow] = {
+      joinedRow: JoinedRow,
+      resultProjection: InternalRow => InternalRow): Iterator[InternalRow] = {
     val ret: Iterable[InternalRow] = {
       if (!key.anyNull) {
         val temp = if (leftIter != null) {
           leftIter.collect {
-            case l if boundCondition(joinedRow.withLeft(l)) =>
-              resultProjection(joinedRow).copy()
+            case l if boundCondition(joinedRow.withLeft(l)) => 
resultProjection(joinedRow).copy()
           }
         } else {
           List.empty
         }
         if (temp.isEmpty) {
-          resultProjection(joinedRow.withLeft(leftNullRow)).copy :: Nil
+          resultProjection(joinedRow.withLeft(leftNullRow)) :: Nil
         } else {
           temp
         }
       } else {
-        resultProjection(joinedRow.withLeft(leftNullRow)).copy :: Nil
+        resultProjection(joinedRow.withLeft(leftNullRow)) :: Nil
       }
     }
     ret.iterator

http://git-wip-us.apache.org/repos/asf/spark/blob/c39d5d14/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
index 97fde8f..47a7d37 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
@@ -43,14 +43,14 @@ trait HashSemiJoin {
   override def canProcessUnsafeRows: Boolean = supportUnsafe
   override def canProcessSafeRows: Boolean = !supportUnsafe
 
-  @transient protected lazy val leftKeyGenerator: Projection =
+  protected def leftKeyGenerator: Projection =
     if (supportUnsafe) {
       UnsafeProjection.create(leftKeys, left.output)
     } else {
       newMutableProjection(leftKeys, left.output)()
     }
 
-  @transient protected lazy val rightKeyGenerator: Projection =
+  protected def rightKeyGenerator: Projection =
     if (supportUnsafe) {
       UnsafeProjection.create(rightKeys, right.output)
     } else {
@@ -62,12 +62,11 @@ trait HashSemiJoin {
 
   protected def buildKeyHashSet(buildIter: Iterator[InternalRow]): 
java.util.Set[InternalRow] = {
     val hashSet = new java.util.HashSet[InternalRow]()
-    var currentRow: InternalRow = null
 
     // Create a Hash set of buildKeys
     val rightKey = rightKeyGenerator
     while (buildIter.hasNext) {
-      currentRow = buildIter.next()
+      val currentRow = buildIter.next()
       val rowKey = rightKey(currentRow)
       if (!rowKey.anyNull) {
         val keyExists = hashSet.contains(rowKey)
@@ -76,6 +75,7 @@ trait HashSemiJoin {
         }
       }
     }
+
     hashSet
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c39d5d14/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 58b4236..3f257ec 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -17,12 +17,11 @@
 
 package org.apache.spark.sql.execution.joins
 
-import java.io.{IOException, Externalizable, ObjectInput, ObjectOutput}
+import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
 import java.nio.ByteOrder
 import java.util.{HashMap => JavaHashMap}
 
 import org.apache.spark.shuffle.ShuffleMemoryManager
-import org.apache.spark.{SparkConf, SparkEnv, TaskContext}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.SparkSqlSerializer
@@ -31,6 +30,7 @@ import org.apache.spark.unsafe.map.BytesToBytesMap
 import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, MemoryAllocator, 
TaskMemoryManager}
 import org.apache.spark.util.Utils
 import org.apache.spark.util.collection.CompactBuffer
+import org.apache.spark.{SparkConf, SparkEnv}
 
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/c39d5d14/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala
index eee8ad8..6a8c35e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala
@@ -60,19 +60,21 @@ case class ShuffledHashOuterJoin(
         case LeftOuter =>
           val hashed = HashedRelation(rightIter, buildKeyGenerator)
           val keyGenerator = streamedKeyGenerator
+          val resultProj = resultProjection
           leftIter.flatMap( currentRow => {
             val rowKey = keyGenerator(currentRow)
             joinedRow.withLeft(currentRow)
-            leftOuterIterator(rowKey, joinedRow, hashed.get(rowKey))
+            leftOuterIterator(rowKey, joinedRow, hashed.get(rowKey), 
resultProj)
           })
 
         case RightOuter =>
           val hashed = HashedRelation(leftIter, buildKeyGenerator)
           val keyGenerator = streamedKeyGenerator
+          val resultProj = resultProjection
           rightIter.flatMap ( currentRow => {
             val rowKey = keyGenerator(currentRow)
             joinedRow.withRight(currentRow)
-            rightOuterIterator(rowKey, hashed.get(rowKey), joinedRow)
+            rightOuterIterator(rowKey, hashed.get(rowKey), joinedRow, 
resultProj)
           })
 
         case FullOuter =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to