Repository: spark
Updated Branches:
  refs/heads/branch-1.4 be66d1924 -> 2a42d2d8f


[SPARK-7693][Core] Remove "import 
scala.concurrent.ExecutionContext.Implicits.global"

Learnt a lesson from SPARK-7655: Spark should avoid to use 
`scala.concurrent.ExecutionContext.Implicits.global` because the user may 
submit blocking actions to `scala.concurrent.ExecutionContext.Implicits.global` 
and exhaust all threads in it. This could crash Spark. So Spark should always 
use its own thread pools for safety.

This PR removes all usages of 
`scala.concurrent.ExecutionContext.Implicits.global` and uses proper thread 
pools to replace them.

Author: zsxwing <zsxw...@gmail.com>

Closes #6223 from zsxwing/SPARK-7693 and squashes the following commits:

a33ff06 [zsxwing] Decrease the max thread number from 1024 to 128
cf4b3fc [zsxwing] Remove "import 
scala.concurrent.ExecutionContext.Implicits.global"

(cherry picked from commit ff71d34e00b64d70f671f9bf3e63aec39cd525e5)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2a42d2d8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2a42d2d8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2a42d2d8

Branch: refs/heads/branch-1.4
Commit: 2a42d2d8f26e3e66ab8c926e952a20a3900ca7f3
Parents: be66d19
Author: zsxwing <zsxw...@gmail.com>
Authored: Sun May 17 20:37:19 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Sun May 17 20:37:27 2015 -0700

----------------------------------------------------------------------
 .../executor/CoarseGrainedExecutorBackend.scala |  9 +++---
 .../org/apache/spark/rdd/AsyncRDDActions.scala  | 13 +++++++--
 .../org/apache/spark/storage/BlockManager.scala | 17 +++++++++---
 .../spark/storage/BlockManagerMaster.scala      | 29 ++++++++++++--------
 .../sql/execution/joins/BroadcastHashJoin.scala |  2 +-
 .../streaming/receiver/ReceiverSupervisor.scala | 14 +++++++---
 6 files changed, 58 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2a42d2d8/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index ed159de..f3a26f5 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -33,7 +33,7 @@ import org.apache.spark.deploy.worker.WorkerWatcher
 import org.apache.spark.scheduler.TaskDescription
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.serializer.SerializerInstance
-import org.apache.spark.util.{SignalLogger, Utils}
+import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils}
 
 private[spark] class CoarseGrainedExecutorBackend(
     override val rpcEnv: RpcEnv,
@@ -55,18 +55,19 @@ private[spark] class CoarseGrainedExecutorBackend(
   private[this] val ser: SerializerInstance = 
env.closureSerializer.newInstance()
 
   override def onStart() {
-    import scala.concurrent.ExecutionContext.Implicits.global
     logInfo("Connecting to driver: " + driverUrl)
     rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
+      // This is a very fast action so we can use "ThreadUtils.sameThread"
       driver = Some(ref)
       ref.ask[RegisteredExecutor.type](
         RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
-    } onComplete {
+    }(ThreadUtils.sameThread).onComplete {
+      // This is a very fast action so we can use "ThreadUtils.sameThread"
       case Success(msg) => Utils.tryLogNonFatalError {
         Option(self).foreach(_.send(msg)) // msg must be RegisteredExecutor
       }
       case Failure(e) => logError(s"Cannot register with driver: $driverUrl", 
e)
-    }
+    }(ThreadUtils.sameThread)
   }
 
   def extractLogUrls: Map[String, String] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/2a42d2d8/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala 
b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index ec18534..bbf1b83 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -19,8 +19,10 @@ package org.apache.spark.rdd
 
 import java.util.concurrent.atomic.AtomicLong
 
+import org.apache.spark.util.ThreadUtils
+
 import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.ExecutionContext
 import scala.reflect.ClassTag
 
 import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
@@ -66,6 +68,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends 
Serializable with Loggi
     val f = new ComplexFutureAction[Seq[T]]
 
     f.run {
+      // This is a blocking action so we should use 
"AsyncRDDActions.futureExecutionContext" which
+      // is a cached thread pool.
       val results = new ArrayBuffer[T](num)
       val totalParts = self.partitions.length
       var partsScanned = 0
@@ -101,7 +105,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends 
Serializable with Loggi
         partsScanned += numPartsToTry
       }
       results.toSeq
-    }
+    }(AsyncRDDActions.futureExecutionContext)
 
     f
   }
@@ -123,3 +127,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends 
Serializable with Loggi
       (index, data) => Unit, Unit)
   }
 }
+
+private object AsyncRDDActions {
+  val futureExecutionContext = ExecutionContext.fromExecutorService(
+    ThreadUtils.newDaemonCachedThreadPool("AsyncRDDActions-future", 128))
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2a42d2d8/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index cc794e5..16d67cb 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -21,8 +21,7 @@ import java.io.{BufferedOutputStream, ByteArrayOutputStream, 
File, InputStream,
 import java.nio.{ByteBuffer, MappedByteBuffer}
 
 import scala.collection.mutable.{ArrayBuffer, HashMap}
-import scala.concurrent.{Await, Future}
-import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.{ExecutionContext, Await, Future}
 import scala.concurrent.duration._
 import scala.util.Random
 
@@ -77,6 +76,9 @@ private[spark] class BlockManager(
 
   private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
 
+  private val futureExecutionContext = ExecutionContext.fromExecutorService(
+    ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))
+
   // Actual storage of where blocks are kept
   private var externalBlockStoreInitialized = false
   private[spark] val memoryStore = new MemoryStore(this, maxMemory)
@@ -266,11 +268,13 @@ private[spark] class BlockManager(
     asyncReregisterLock.synchronized {
       if (asyncReregisterTask == null) {
         asyncReregisterTask = Future[Unit] {
+          // This is a blocking action and should run in 
futureExecutionContext which is a cached
+          // thread pool
           reregister()
           asyncReregisterLock.synchronized {
             asyncReregisterTask = null
           }
-        }
+        }(futureExecutionContext)
       }
     }
   }
@@ -744,7 +748,11 @@ private[spark] class BlockManager(
       case b: ByteBufferValues if putLevel.replication > 1 =>
         // Duplicate doesn't copy the bytes, but just creates a wrapper
         val bufferView = b.buffer.duplicate()
-        Future { replicate(blockId, bufferView, putLevel) }
+        Future {
+          // This is a blocking action and should run in 
futureExecutionContext which is a cached
+          // thread pool
+          replicate(blockId, bufferView, putLevel)
+        }(futureExecutionContext)
       case _ => null
     }
 
@@ -1218,6 +1226,7 @@ private[spark] class BlockManager(
     }
     metadataCleaner.cancel()
     broadcastCleaner.cancel()
+    futureExecutionContext.shutdownNow()
     logInfo("BlockManager stopped")
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2a42d2d8/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index a85e1c7..abcad94 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -17,13 +17,14 @@
 
 package org.apache.spark.storage
 
+import scala.collection.Iterable
+import scala.collection.generic.CanBuildFrom
 import scala.concurrent.{Await, Future}
-import scala.concurrent.ExecutionContext.Implicits.global
 
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.{Logging, SparkConf, SparkException}
 import org.apache.spark.storage.BlockManagerMessages._
-import org.apache.spark.util.RpcUtils
+import org.apache.spark.util.{ThreadUtils, RpcUtils}
 
 private[spark]
 class BlockManagerMaster(
@@ -102,8 +103,8 @@ class BlockManagerMaster(
     val future = 
driverEndpoint.askWithRetry[Future[Seq[Int]]](RemoveRdd(rddId))
     future.onFailure {
       case e: Exception =>
-        logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}}")
-    }
+        logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}}", e)
+    }(ThreadUtils.sameThread)
     if (blocking) {
       Await.result(future, timeout)
     }
@@ -114,8 +115,8 @@ class BlockManagerMaster(
     val future = 
driverEndpoint.askWithRetry[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
     future.onFailure {
       case e: Exception =>
-        logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}}")
-    }
+        logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}}", 
e)
+    }(ThreadUtils.sameThread)
     if (blocking) {
       Await.result(future, timeout)
     }
@@ -128,8 +129,8 @@ class BlockManagerMaster(
     future.onFailure {
       case e: Exception =>
         logWarning(s"Failed to remove broadcast $broadcastId" +
-          s" with removeFromMaster = $removeFromMaster - ${e.getMessage}}")
-    }
+          s" with removeFromMaster = $removeFromMaster - ${e.getMessage}}", e)
+    }(ThreadUtils.sameThread)
     if (blocking) {
       Await.result(future, timeout)
     }
@@ -169,11 +170,17 @@ class BlockManagerMaster(
     val response = driverEndpoint.
       askWithRetry[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg)
     val (blockManagerIds, futures) = response.unzip
-    val result = Await.result(Future.sequence(futures), timeout)
-    if (result == null) {
+    implicit val sameThread = ThreadUtils.sameThread
+    val cbf =
+      implicitly[
+        CanBuildFrom[Iterable[Future[Option[BlockStatus]]],
+        Option[BlockStatus],
+        Iterable[Option[BlockStatus]]]]
+    val blockStatus = Await.result(
+      Future.sequence[Option[BlockStatus], Iterable](futures)(cbf, 
ThreadUtils.sameThread), timeout)
+    if (blockStatus == null) {
       throw new SparkException("BlockManager returned null for BlockStatus 
query: " + blockId)
     }
-    val blockStatus = result.asInstanceOf[Iterable[Option[BlockStatus]]]
     blockManagerIds.zip(blockStatus).flatMap { case (blockManagerId, status) =>
       status.map { s => (blockManagerId, s) }
     }.toMap

http://git-wip-us.apache.org/repos/asf/spark/blob/2a42d2d8/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index fe43fc4..b8b12be 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -78,5 +78,5 @@ case class BroadcastHashJoin(
 object BroadcastHashJoin {
 
   private val broadcastHashJoinExecutionContext = 
ExecutionContext.fromExecutorService(
-    ThreadUtils.newDaemonCachedThreadPool("broadcast-hash-join", 1024))
+    ThreadUtils.newDaemonCachedThreadPool("broadcast-hash-join", 128))
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2a42d2d8/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
index 4943f29..33be067 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
@@ -18,14 +18,14 @@
 package org.apache.spark.streaming.receiver
 
 import java.nio.ByteBuffer
+import java.util.concurrent.CountDownLatch
 
 import scala.collection.mutable.ArrayBuffer
+import scala.concurrent._
 
 import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.storage.StreamBlockId
-import java.util.concurrent.CountDownLatch
-import scala.concurrent._
-import ExecutionContext.Implicits.global
+import org.apache.spark.util.ThreadUtils
 
 /**
  * Abstract class that is responsible for supervising a Receiver in the worker.
@@ -46,6 +46,9 @@ private[streaming] abstract class ReceiverSupervisor(
   // Attach the executor to the receiver
   receiver.attachExecutor(this)
 
+  private val futureExecutionContext = ExecutionContext.fromExecutorService(
+    ThreadUtils.newDaemonCachedThreadPool("receiver-supervisor-future", 128))
+
   /** Receiver id */
   protected val streamId = receiver.streamId
 
@@ -111,6 +114,7 @@ private[streaming] abstract class ReceiverSupervisor(
     stoppingError = error.orNull
     stopReceiver(message, error)
     onStop(message, error)
+    futureExecutionContext.shutdownNow()
     stopLatch.countDown()
   }
 
@@ -150,6 +154,8 @@ private[streaming] abstract class ReceiverSupervisor(
   /** Restart receiver with delay */
   def restartReceiver(message: String, error: Option[Throwable], delay: Int) {
     Future {
+      // This is a blocking action so we should use "futureExecutionContext" 
which is a cached
+      // thread pool.
       logWarning("Restarting receiver with delay " + delay + " ms: " + message,
         error.getOrElse(null))
       stopReceiver("Restarting receiver with delay " + delay + "ms: " + 
message, error)
@@ -158,7 +164,7 @@ private[streaming] abstract class ReceiverSupervisor(
       logInfo("Starting receiver again")
       startReceiver()
       logInfo("Receiver started again")
-    }
+    }(futureExecutionContext)
   }
 
   /** Check if receiver has been marked for stopping */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to