Repository: spark
Updated Branches:
  refs/heads/master 26503fdf2 -> 1f4a648d4


SPARK-1713. Use a thread pool for launching executors.

This patch copies the approach used in the MapReduce application master for 
launching containers.

Author: Sandy Ryza <sa...@cloudera.com>

Closes #663 from sryza/sandy-spark-1713 and squashes the following commits:

036550d [Sandy Ryza] SPARK-1713. [YARN] Use a threadpool for launching executor 
containers


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1f4a648d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1f4a648d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1f4a648d

Branch: refs/heads/master
Commit: 1f4a648d4e30e837d6cf3ea8de1808e2254ad70b
Parents: 26503fd
Author: Sandy Ryza <sa...@cloudera.com>
Authored: Wed Sep 10 14:34:24 2014 -0500
Committer: Thomas Graves <tgra...@apache.org>
Committed: Wed Sep 10 14:34:24 2014 -0500

----------------------------------------------------------------------
 docs/running-on-yarn.md                               |  7 +++++++
 .../org/apache/spark/deploy/yarn/YarnAllocator.scala  | 14 ++++++++++++--
 2 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1f4a648d/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 943f06b..d8b22f3 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -125,6 +125,13 @@ Most of the configs are the same for Spark on YARN as for 
other deployment modes
      the environment of the executor launcher. 
   </td>
 </tr>
+<tr>
+  <td><code>spark.yarn.containerLauncherMaxThreads</code></td>
+  <td>25</td>
+  <td>
+    The maximum number of threads to use in the application master for 
launching executor containers.
+  </td>
+</tr>
 </table>
 
 # Launching Spark on YARN

http://git-wip-us.apache.org/repos/asf/spark/blob/1f4a648d/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 02b9a81..0b8744f 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.deploy.yarn
 
 import java.util.{List => JList}
-import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent._
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.JavaConversions._
@@ -32,6 +32,8 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, 
SparkEnv}
 import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+
 object AllocationType extends Enumeration {
   type AllocationType = Value
   val HOST, RACK, ANY = Value
@@ -95,6 +97,14 @@ private[yarn] abstract class YarnAllocator(
   protected val (preferredHostToCount, preferredRackToCount) =
     generateNodeToWeight(conf, preferredNodes)
 
+  private val launcherPool = new ThreadPoolExecutor(
+    // max pool size of Integer.MAX_VALUE is ignored because we use an 
unbounded queue
+    sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25), 
Integer.MAX_VALUE,
+    1, TimeUnit.MINUTES,
+    new LinkedBlockingQueue[Runnable](),
+    new ThreadFactoryBuilder().setNameFormat("ContainerLauncher 
#%d").setDaemon(true).build())
+  launcherPool.allowCoreThreadTimeOut(true)
+
   def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
 
   def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
@@ -283,7 +293,7 @@ private[yarn] abstract class YarnAllocator(
             executorMemory,
             executorCores,
             securityMgr)
-          new Thread(executorRunnable).start()
+          launcherPool.execute(executorRunnable)
         }
       }
       logDebug("""


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to