spark git commit: [SPARK-13747][SQL] Fix concurrent executions in ForkJoinPool for SQL (branch 2.0)

2016-10-26 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 b482b3d58 -> 76b71eef4


[SPARK-13747][SQL] Fix concurrent executions in ForkJoinPool for SQL (branch 
2.0)

## What changes were proposed in this pull request?

Backport #15520 to 2.0.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu 

Closes #15646 from zsxwing/SPARK-13747-2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76b71eef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76b71eef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76b71eef

Branch: refs/heads/branch-2.0
Commit: 76b71eef46a3b932d6de7f831f0245ea27e3dfe7
Parents: b482b3d
Author: Shixiong Zhu 
Authored: Wed Oct 26 13:21:46 2016 -0700
Committer: Shixiong Zhu 
Committed: Wed Oct 26 13:21:46 2016 -0700

--
 .../org/apache/spark/util/ThreadUtils.scala | 21 
 scalastyle-config.xml   |  1 +
 .../apache/spark/sql/execution/SparkPlan.scala  |  2 +-
 .../exchange/BroadcastExchangeExec.scala|  3 ++-
 4 files changed, 25 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/76b71eef/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala 
b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index 5a6dbc8..d093e7b 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -194,4 +194,25 @@ private[spark] object ThreadUtils {
 throw new SparkException("Exception thrown in awaitResult: ", t)
 }
   }
+
+  /**
+   * Calls [[Awaitable.result]] directly to avoid using `ForkJoinPool`'s 
`BlockingContext`, wraps
+   * and re-throws any exceptions with nice stack track.
+   *
+   * Codes running in the user's thread may be in a thread of Scala 
ForkJoinPool. As concurrent
+   * executions in ForkJoinPool may see some [[ThreadLocal]] value 
unexpectedly, this method
+   * basically prevents ForkJoinPool from running other tasks in the current 
waiting thread.
+   */
+  @throws(classOf[SparkException])
+  def awaitResultInForkJoinSafely[T](awaitable: Awaitable[T], atMost: 
Duration): T = {
+try {
+  // `awaitPermission` is not actually used anywhere so it's safe to pass 
in null here.
+  // See SPARK-13747.
+  val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
+  awaitable.result(Duration.Inf)(awaitPermission)
+} catch {
+  case NonFatal(t) =>
+throw new SparkException("Exception thrown in awaitResult: ", t)
+}
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/76b71eef/scalastyle-config.xml
--
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index 7fe0697..81d57d7 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -200,6 +200,7 @@ This file is divided into 3 sections:
   // scalastyle:off awaitresult
   Await.result(...)
   // scalastyle:on awaitresult
+  If your codes use ThreadLocal and may run in threads created by the 
user, use ThreadUtils.awaitResultInForkJoinSafely instead.
 ]]>
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/76b71eef/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 79cb409..fa40414 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -166,7 +166,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
   protected def waitForSubqueries(): Unit = synchronized {
 // fill in the result of subqueries
 subqueryResults.foreach { case (e, futureResult) =>
-  val rows = ThreadUtils.awaitResult(futureResult, Duration.Inf)
+  val rows = ThreadUtils.awaitResultInForkJoinSafely(futureResult, 
Duration.Inf)
   if (rows.length > 1) {
 sys.error(s"more than one row returned by a subquery used as an 
expression:\n${e.plan}")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/76b71eef/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
--
diff --git 

spark git commit: [SPARK-13747][SQL] Fix concurrent executions in ForkJoinPool for SQL

2016-10-26 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 312ea3f7f -> 7ac70e7ba


[SPARK-13747][SQL] Fix concurrent executions in ForkJoinPool for SQL

## What changes were proposed in this pull request?

Calling `Await.result` will allow other tasks to be run on the same thread when 
using ForkJoinPool. However, SQL uses a `ThreadLocal` execution id to trace 
Spark jobs launched by a query, which doesn't work perfectly in ForkJoinPool.

This PR just uses `Awaitable.result` instead to  prevent ForkJoinPool from 
running other tasks in the current waiting thread.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu 

Closes #15520 from zsxwing/SPARK-13747.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7ac70e7b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7ac70e7b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7ac70e7b

Branch: refs/heads/master
Commit: 7ac70e7ba8d610a45c21a70dc28e4c989c19451b
Parents: 312ea3f
Author: Shixiong Zhu 
Authored: Wed Oct 26 10:36:36 2016 -0700
Committer: Shixiong Zhu 
Committed: Wed Oct 26 10:36:36 2016 -0700

--
 .../org/apache/spark/util/ThreadUtils.scala | 21 
 scalastyle-config.xml   |  1 +
 .../sql/execution/basicPhysicalOperators.scala  |  2 +-
 .../exchange/BroadcastExchangeExec.scala|  3 ++-
 4 files changed, 25 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7ac70e7b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala 
b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index 5a6dbc8..d093e7b 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -194,4 +194,25 @@ private[spark] object ThreadUtils {
 throw new SparkException("Exception thrown in awaitResult: ", t)
 }
   }
+
+  /**
+   * Calls [[Awaitable.result]] directly to avoid using `ForkJoinPool`'s 
`BlockingContext`, wraps
+   * and re-throws any exceptions with nice stack track.
+   *
+   * Codes running in the user's thread may be in a thread of Scala 
ForkJoinPool. As concurrent
+   * executions in ForkJoinPool may see some [[ThreadLocal]] value 
unexpectedly, this method
+   * basically prevents ForkJoinPool from running other tasks in the current 
waiting thread.
+   */
+  @throws(classOf[SparkException])
+  def awaitResultInForkJoinSafely[T](awaitable: Awaitable[T], atMost: 
Duration): T = {
+try {
+  // `awaitPermission` is not actually used anywhere so it's safe to pass 
in null here.
+  // See SPARK-13747.
+  val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
+  awaitable.result(Duration.Inf)(awaitPermission)
+} catch {
+  case NonFatal(t) =>
+throw new SparkException("Exception thrown in awaitResult: ", t)
+}
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7ac70e7b/scalastyle-config.xml
--
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index 7fe0697..81d57d7 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -200,6 +200,7 @@ This file is divided into 3 sections:
   // scalastyle:off awaitresult
   Await.result(...)
   // scalastyle:on awaitresult
+  If your codes use ThreadLocal and may run in threads created by the 
user, use ThreadUtils.awaitResultInForkJoinSafely instead.
 ]]>
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7ac70e7b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 37d750e..a5291e0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -570,7 +570,7 @@ case class SubqueryExec(name: String, child: SparkPlan) 
extends UnaryExecNode {
   }
 
   override def executeCollect(): Array[InternalRow] = {
-ThreadUtils.awaitResult(relationFuture, Duration.Inf)
+ThreadUtils.awaitResultInForkJoinSafely(relationFuture, Duration.Inf)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7ac70e7b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala