Repository: spark Updated Branches: refs/heads/master 26503fdf2 -> 1f4a648d4
SPARK-1713. Use a thread pool for launching executors. This patch copies the approach used in the MapReduce application master for launching containers. Author: Sandy Ryza <sa...@cloudera.com> Closes #663 from sryza/sandy-spark-1713 and squashes the following commits: 036550d [Sandy Ryza] SPARK-1713. [YARN] Use a threadpool for launching executor containers Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1f4a648d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1f4a648d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1f4a648d Branch: refs/heads/master Commit: 1f4a648d4e30e837d6cf3ea8de1808e2254ad70b Parents: 26503fd Author: Sandy Ryza <sa...@cloudera.com> Authored: Wed Sep 10 14:34:24 2014 -0500 Committer: Thomas Graves <tgra...@apache.org> Committed: Wed Sep 10 14:34:24 2014 -0500 ---------------------------------------------------------------------- docs/running-on-yarn.md | 7 +++++++ .../org/apache/spark/deploy/yarn/YarnAllocator.scala | 14 ++++++++++++-- 2 files changed, 19 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1f4a648d/docs/running-on-yarn.md ---------------------------------------------------------------------- diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 943f06b..d8b22f3 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -125,6 +125,13 @@ Most of the configs are the same for Spark on YARN as for other deployment modes the environment of the executor launcher. </td> </tr> +<tr> + <td><code>spark.yarn.containerLauncherMaxThreads</code></td> + <td>25</td> + <td> + The maximum number of threads to use in the application master for launching executor containers. + </td> +</tr> </table> # Launching Spark on YARN http://git-wip-us.apache.org/repos/asf/spark/blob/1f4a648d/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 02b9a81..0b8744f 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.yarn import java.util.{List => JList} -import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent._ import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConversions._ @@ -32,6 +32,8 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv} import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import com.google.common.util.concurrent.ThreadFactoryBuilder + object AllocationType extends Enumeration { type AllocationType = Value val HOST, RACK, ANY = Value @@ -95,6 +97,14 @@ private[yarn] abstract class YarnAllocator( protected val (preferredHostToCount, preferredRackToCount) = generateNodeToWeight(conf, preferredNodes) + private val launcherPool = new ThreadPoolExecutor( + // max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue + sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25), Integer.MAX_VALUE, + 1, TimeUnit.MINUTES, + new LinkedBlockingQueue[Runnable](), + new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build()) + launcherPool.allowCoreThreadTimeOut(true) + def getNumExecutorsRunning: Int = numExecutorsRunning.intValue def getNumExecutorsFailed: Int = numExecutorsFailed.intValue @@ -283,7 +293,7 @@ private[yarn] abstract class YarnAllocator( executorMemory, executorCores, securityMgr) - new Thread(executorRunnable).start() + launcherPool.execute(executorRunnable) } } logDebug(""" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org