spark git commit: [SPARK-7655][Core][SQL] Remove 'scala.concurrent.ExecutionContext.Implicits.global' in 'ask' and 'BroadcastHashJoin'

2015-05-16 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 0ac8b01a0 -> 47e7ffe36


[SPARK-7655][Core][SQL] Remove 
'scala.concurrent.ExecutionContext.Implicits.global' in 'ask' and 
'BroadcastHashJoin'

Because both `AkkaRpcEndpointRef.ask` and `BroadcastHashJoin` uses 
`scala.concurrent.ExecutionContext.Implicits.global`. However, because the 
tasks in `BroadcastHashJoin` are usually long-running tasks, which will occupy 
all threads in `global`. Then `ask` cannot get a chance to process the replies.

For `ask`, actually the tasks are very simple, so we can use 
`MoreExecutors.sameThreadExecutor()`. For `BroadcastHashJoin`, it's better to 
use `ThreadUtils.newDaemonCachedThreadPool`.

Author: zsxwing 

Closes #6200 from zsxwing/SPARK-7655-2 and squashes the following commits:

cfdc605 [zsxwing] Remove redundant imort and minor doc fix
cf83153 [zsxwing] Add "sameThread" and "newDaemonCachedThreadPool with 
maxThreadNumber" to ThreadUtils
08ad0ee [zsxwing] Remove 'scala.concurrent.ExecutionContext.Implicits.global' 
in 'ask' and 'BroadcastHashJoin'


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47e7ffe3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47e7ffe3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47e7ffe3

Branch: refs/heads/master
Commit: 47e7ffe36b8a8a246fe9af522aff480d19c0c8a6
Parents: 0ac8b01
Author: zsxwing 
Authored: Sat May 16 00:44:29 2015 -0700
Committer: Reynold Xin 
Committed: Sat May 16 00:44:29 2015 -0700

--
 .../org/apache/spark/rpc/akka/AkkaRpcEnv.scala  |  8 ---
 .../org/apache/spark/util/ThreadUtils.scala | 24 +++-
 .../apache/spark/util/ThreadUtilsSuite.scala| 12 ++
 .../sql/execution/joins/BroadcastHashJoin.scala | 10 ++--
 4 files changed, 48 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/47e7ffe3/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala 
b/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
index ba0d468..0161962 100644
--- a/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
@@ -29,9 +29,11 @@ import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, 
ActorRef, Props, Add
 import akka.event.Logging.Error
 import akka.pattern.{ask => akkaAsk}
 import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, 
AssociationErrorEvent}
+import com.google.common.util.concurrent.MoreExecutors
+
 import org.apache.spark.{SparkException, Logging, SparkConf}
 import org.apache.spark.rpc._
-import org.apache.spark.util.{ActorLogReceive, AkkaUtils}
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils, ThreadUtils}
 
 /**
  * A RpcEnv implementation based on Akka.
@@ -294,8 +296,8 @@ private[akka] class AkkaRpcEndpointRef(
   }
 
   override def ask[T: ClassTag](message: Any, timeout: FiniteDuration): 
Future[T] = {
-import scala.concurrent.ExecutionContext.Implicits.global
 actorRef.ask(AkkaMessage(message, true))(timeout).flatMap {
+  // The function will run in the calling thread, so it should be short 
and never block.
   case msg @ AkkaMessage(message, reply) =>
 if (reply) {
   logError(s"Receive $msg but the sender cannot reply")
@@ -305,7 +307,7 @@ private[akka] class AkkaRpcEndpointRef(
 }
   case AkkaFailure(e) =>
 Future.failed(e)
-}.mapTo[T]
+}(ThreadUtils.sameThread).mapTo[T]
   }
 
   override def toString: String = s"${getClass.getSimpleName}($actorRef)"

http://git-wip-us.apache.org/repos/asf/spark/blob/47e7ffe3/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala 
b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index 098a4b7..ca5624a 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -20,10 +20,22 @@ package org.apache.spark.util
 
 import java.util.concurrent._
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
+
+import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
 
 private[spark] object ThreadUtils {
 
+  private val sameThreadExecutionContext =
+ExecutionContext.fromExecutorService(MoreExecutors.sameThreadExecutor())
+
+  /**
+   * An `ExecutionContextExecutor` that runs each task in the thread that 
invokes `execute/submit`.
+   * The caller should make sure the tasks runnin

spark git commit: [SPARK-7655][Core][SQL] Remove 'scala.concurrent.ExecutionContext.Implicits.global' in 'ask' and 'BroadcastHashJoin'

2015-05-16 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 e7607e5cb -> ad5b0b1ce


[SPARK-7655][Core][SQL] Remove 
'scala.concurrent.ExecutionContext.Implicits.global' in 'ask' and 
'BroadcastHashJoin'

Because both `AkkaRpcEndpointRef.ask` and `BroadcastHashJoin` uses 
`scala.concurrent.ExecutionContext.Implicits.global`. However, because the 
tasks in `BroadcastHashJoin` are usually long-running tasks, which will occupy 
all threads in `global`. Then `ask` cannot get a chance to process the replies.

For `ask`, actually the tasks are very simple, so we can use 
`MoreExecutors.sameThreadExecutor()`. For `BroadcastHashJoin`, it's better to 
use `ThreadUtils.newDaemonCachedThreadPool`.

Author: zsxwing 

Closes #6200 from zsxwing/SPARK-7655-2 and squashes the following commits:

cfdc605 [zsxwing] Remove redundant imort and minor doc fix
cf83153 [zsxwing] Add "sameThread" and "newDaemonCachedThreadPool with 
maxThreadNumber" to ThreadUtils
08ad0ee [zsxwing] Remove 'scala.concurrent.ExecutionContext.Implicits.global' 
in 'ask' and 'BroadcastHashJoin'

(cherry picked from commit 47e7ffe36b8a8a246fe9af522aff480d19c0c8a6)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad5b0b1c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad5b0b1c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad5b0b1c

Branch: refs/heads/branch-1.4
Commit: ad5b0b1ce2a80fda7a50eb728b2a2cfe8775149a
Parents: e7607e5
Author: zsxwing 
Authored: Sat May 16 00:44:29 2015 -0700
Committer: Reynold Xin 
Committed: Sat May 16 00:44:36 2015 -0700

--
 .../org/apache/spark/rpc/akka/AkkaRpcEnv.scala  |  8 ---
 .../org/apache/spark/util/ThreadUtils.scala | 24 +++-
 .../apache/spark/util/ThreadUtilsSuite.scala| 12 ++
 .../sql/execution/joins/BroadcastHashJoin.scala | 10 ++--
 4 files changed, 48 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ad5b0b1c/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala 
b/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
index ba0d468..0161962 100644
--- a/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
@@ -29,9 +29,11 @@ import akka.actor.{ActorSystem, ExtendedActorSystem, Actor, 
ActorRef, Props, Add
 import akka.event.Logging.Error
 import akka.pattern.{ask => akkaAsk}
 import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, 
AssociationErrorEvent}
+import com.google.common.util.concurrent.MoreExecutors
+
 import org.apache.spark.{SparkException, Logging, SparkConf}
 import org.apache.spark.rpc._
-import org.apache.spark.util.{ActorLogReceive, AkkaUtils}
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils, ThreadUtils}
 
 /**
  * A RpcEnv implementation based on Akka.
@@ -294,8 +296,8 @@ private[akka] class AkkaRpcEndpointRef(
   }
 
   override def ask[T: ClassTag](message: Any, timeout: FiniteDuration): 
Future[T] = {
-import scala.concurrent.ExecutionContext.Implicits.global
 actorRef.ask(AkkaMessage(message, true))(timeout).flatMap {
+  // The function will run in the calling thread, so it should be short 
and never block.
   case msg @ AkkaMessage(message, reply) =>
 if (reply) {
   logError(s"Receive $msg but the sender cannot reply")
@@ -305,7 +307,7 @@ private[akka] class AkkaRpcEndpointRef(
 }
   case AkkaFailure(e) =>
 Future.failed(e)
-}.mapTo[T]
+}(ThreadUtils.sameThread).mapTo[T]
   }
 
   override def toString: String = s"${getClass.getSimpleName}($actorRef)"

http://git-wip-us.apache.org/repos/asf/spark/blob/ad5b0b1c/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala 
b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index 098a4b7..ca5624a 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -20,10 +20,22 @@ package org.apache.spark.util
 
 import java.util.concurrent._
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
+
+import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
 
 private[spark] object ThreadUtils {
 
+  private val sameThreadExecutionContext =
+ExecutionContext.fromExecutorService(MoreExecutors.sameThreadExecutor())
+
+  /**
+   * An `ExecutionContextExecutor` that runs