spark git commit: [SPARK-7655][Core][SQL] Remove 'scala.concurrent.ExecutionContext.Implicits.global' in 'ask' and 'BroadcastHashJoin'
Repository: spark Updated Branches: refs/heads/master 0ac8b01a0 -> 47e7ffe36 [SPARK-7655][Core][SQL] Remove 'scala.concurrent.ExecutionContext.Implicits.global' in 'ask' and 'BroadcastHashJoin' Because both `AkkaRpcEndpointRef.ask` and `BroadcastHashJoin` uses `scala.concurrent.ExecutionContext.Implicits.global`. However, because the tasks in `BroadcastHashJoin` are usually long-running tasks, which will occupy all threads in `global`. Then `ask` cannot get a chance to process the replies. For `ask`, actually the tasks are very simple, so we can use `MoreExecutors.sameThreadExecutor()`. For `BroadcastHashJoin`, it's better to use `ThreadUtils.newDaemonCachedThreadPool`. Author: zsxwing Closes #6200 from zsxwing/SPARK-7655-2 and squashes the following commits: cfdc605 [zsxwing] Remove redundant imort and minor doc fix cf83153 [zsxwing] Add "sameThread" and "newDaemonCachedThreadPool with maxThreadNumber" to ThreadUtils 08ad0ee [zsxwing] Remove 'scala.concurrent.ExecutionContext.Implicits.global' in 'ask' and 'BroadcastHashJoin' Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47e7ffe3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47e7ffe3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47e7ffe3 Branch: refs/heads/master Commit: 47e7ffe36b8a8a246fe9af522aff480d19c0c8a6 Parents: 0ac8b01 Author: zsxwing Authored: Sat May 16 00:44:29 2015 -0700 Committer: Reynold Xin Committed: Sat May 16 00:44:29 2015 -0700 -- .../org/apache/spark/rpc/akka/AkkaRpcEnv.scala | 8 --- .../org/apache/spark/util/ThreadUtils.scala | 24 +++- .../apache/spark/util/ThreadUtilsSuite.scala| 12 ++ .../sql/execution/joins/BroadcastHashJoin.scala | 10 ++-- 4 files changed, 48 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/47e7ffe3/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala -- diff --git a/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala index ba0d468..0161962 100644 --- a/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala @@ -29,9 +29,11 @@ import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, Props, Add import akka.event.Logging.Error import akka.pattern.{ask => akkaAsk} import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, AssociationErrorEvent} +import com.google.common.util.concurrent.MoreExecutors + import org.apache.spark.{SparkException, Logging, SparkConf} import org.apache.spark.rpc._ -import org.apache.spark.util.{ActorLogReceive, AkkaUtils} +import org.apache.spark.util.{ActorLogReceive, AkkaUtils, ThreadUtils} /** * A RpcEnv implementation based on Akka. @@ -294,8 +296,8 @@ private[akka] class AkkaRpcEndpointRef( } override def ask[T: ClassTag](message: Any, timeout: FiniteDuration): Future[T] = { -import scala.concurrent.ExecutionContext.Implicits.global actorRef.ask(AkkaMessage(message, true))(timeout).flatMap { + // The function will run in the calling thread, so it should be short and never block. case msg @ AkkaMessage(message, reply) => if (reply) { logError(s"Receive $msg but the sender cannot reply") @@ -305,7 +307,7 @@ private[akka] class AkkaRpcEndpointRef( } case AkkaFailure(e) => Future.failed(e) -}.mapTo[T] +}(ThreadUtils.sameThread).mapTo[T] } override def toString: String = s"${getClass.getSimpleName}($actorRef)" http://git-wip-us.apache.org/repos/asf/spark/blob/47e7ffe3/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index 098a4b7..ca5624a 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -20,10 +20,22 @@ package org.apache.spark.util import java.util.concurrent._ -import com.google.common.util.concurrent.ThreadFactoryBuilder +import scala.concurrent.{ExecutionContext, ExecutionContextExecutor} + +import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder} private[spark] object ThreadUtils { + private val sameThreadExecutionContext = +ExecutionContext.fromExecutorService(MoreExecutors.sameThreadExecutor()) + + /** + * An `ExecutionContextExecutor` that runs each task in the thread that invokes `execute/submit`. + * The caller should make sure the tasks runnin
spark git commit: [SPARK-7655][Core][SQL] Remove 'scala.concurrent.ExecutionContext.Implicits.global' in 'ask' and 'BroadcastHashJoin'
Repository: spark Updated Branches: refs/heads/branch-1.4 e7607e5cb -> ad5b0b1ce [SPARK-7655][Core][SQL] Remove 'scala.concurrent.ExecutionContext.Implicits.global' in 'ask' and 'BroadcastHashJoin' Because both `AkkaRpcEndpointRef.ask` and `BroadcastHashJoin` uses `scala.concurrent.ExecutionContext.Implicits.global`. However, because the tasks in `BroadcastHashJoin` are usually long-running tasks, which will occupy all threads in `global`. Then `ask` cannot get a chance to process the replies. For `ask`, actually the tasks are very simple, so we can use `MoreExecutors.sameThreadExecutor()`. For `BroadcastHashJoin`, it's better to use `ThreadUtils.newDaemonCachedThreadPool`. Author: zsxwing Closes #6200 from zsxwing/SPARK-7655-2 and squashes the following commits: cfdc605 [zsxwing] Remove redundant imort and minor doc fix cf83153 [zsxwing] Add "sameThread" and "newDaemonCachedThreadPool with maxThreadNumber" to ThreadUtils 08ad0ee [zsxwing] Remove 'scala.concurrent.ExecutionContext.Implicits.global' in 'ask' and 'BroadcastHashJoin' (cherry picked from commit 47e7ffe36b8a8a246fe9af522aff480d19c0c8a6) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad5b0b1c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad5b0b1c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad5b0b1c Branch: refs/heads/branch-1.4 Commit: ad5b0b1ce2a80fda7a50eb728b2a2cfe8775149a Parents: e7607e5 Author: zsxwing Authored: Sat May 16 00:44:29 2015 -0700 Committer: Reynold Xin Committed: Sat May 16 00:44:36 2015 -0700 -- .../org/apache/spark/rpc/akka/AkkaRpcEnv.scala | 8 --- .../org/apache/spark/util/ThreadUtils.scala | 24 +++- .../apache/spark/util/ThreadUtilsSuite.scala| 12 ++ .../sql/execution/joins/BroadcastHashJoin.scala | 10 ++-- 4 files changed, 48 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ad5b0b1c/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala -- diff --git a/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala index ba0d468..0161962 100644 --- a/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala @@ -29,9 +29,11 @@ import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, ActorRef, Props, Add import akka.event.Logging.Error import akka.pattern.{ask => akkaAsk} import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, AssociationErrorEvent} +import com.google.common.util.concurrent.MoreExecutors + import org.apache.spark.{SparkException, Logging, SparkConf} import org.apache.spark.rpc._ -import org.apache.spark.util.{ActorLogReceive, AkkaUtils} +import org.apache.spark.util.{ActorLogReceive, AkkaUtils, ThreadUtils} /** * A RpcEnv implementation based on Akka. @@ -294,8 +296,8 @@ private[akka] class AkkaRpcEndpointRef( } override def ask[T: ClassTag](message: Any, timeout: FiniteDuration): Future[T] = { -import scala.concurrent.ExecutionContext.Implicits.global actorRef.ask(AkkaMessage(message, true))(timeout).flatMap { + // The function will run in the calling thread, so it should be short and never block. case msg @ AkkaMessage(message, reply) => if (reply) { logError(s"Receive $msg but the sender cannot reply") @@ -305,7 +307,7 @@ private[akka] class AkkaRpcEndpointRef( } case AkkaFailure(e) => Future.failed(e) -}.mapTo[T] +}(ThreadUtils.sameThread).mapTo[T] } override def toString: String = s"${getClass.getSimpleName}($actorRef)" http://git-wip-us.apache.org/repos/asf/spark/blob/ad5b0b1c/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index 098a4b7..ca5624a 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -20,10 +20,22 @@ package org.apache.spark.util import java.util.concurrent._ -import com.google.common.util.concurrent.ThreadFactoryBuilder +import scala.concurrent.{ExecutionContext, ExecutionContextExecutor} + +import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder} private[spark] object ThreadUtils { + private val sameThreadExecutionContext = +ExecutionContext.fromExecutorService(MoreExecutors.sameThreadExecutor()) + + /** + * An `ExecutionContextExecutor` that runs