Repository: spark
Updated Branches:
  refs/heads/master d8d50ed38 -> 353c30bd7


[SPARK-10790] [YARN] Fix initial executor number not set issue and consolidate 
the codes

This bug is introduced in 
[SPARK-9092](https://issues.apache.org/jira/browse/SPARK-9092), 
`targetExecutorNumber` should use `minExecutors` if `initialExecutors` is not 
set. Using 0 instead will meet the problem as mentioned in 
[SPARK-10790](https://issues.apache.org/jira/browse/SPARK-10790).

Also consolidate and simplify some similar code snippets to keep the consistent 
semantics.

Author: jerryshao <ss...@hortonworks.com>

Closes #8910 from jerryshao/SPARK-10790.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/353c30bd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/353c30bd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/353c30bd

Branch: refs/heads/master
Commit: 353c30bd7dfbd3b76fc8bc9a6dfab9321439a34b
Parents: d8d50ed
Author: jerryshao <ss...@hortonworks.com>
Authored: Mon Sep 28 06:38:54 2015 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Mon Sep 28 06:38:54 2015 -0700

----------------------------------------------------------------------
 .../spark/deploy/yarn/ClientArguments.scala     | 20 +----------------
 .../spark/deploy/yarn/YarnAllocator.scala       |  6 +----
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 23 ++++++++++++++++++++
 .../cluster/YarnClusterSchedulerBackend.scala   | 18 ++-------------
 4 files changed, 27 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/353c30bd/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 54f62e6..1165061 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -81,25 +81,7 @@ private[spark] class ClientArguments(args: Array[String], 
sparkConf: SparkConf)
       .orNull
     // If dynamic allocation is enabled, start at the configured initial 
number of executors.
     // Default to minExecutors if no initialExecutors is set.
-    if (isDynamicAllocationEnabled) {
-      val minExecutorsConf = "spark.dynamicAllocation.minExecutors"
-      val initialExecutorsConf = "spark.dynamicAllocation.initialExecutors"
-      val maxExecutorsConf = "spark.dynamicAllocation.maxExecutors"
-      val minNumExecutors = sparkConf.getInt(minExecutorsConf, 0)
-      val initialNumExecutors = sparkConf.getInt(initialExecutorsConf, 
minNumExecutors)
-      val maxNumExecutors = sparkConf.getInt(maxExecutorsConf, 
Integer.MAX_VALUE)
-
-      // If defined, initial executors must be between min and max
-      if (initialNumExecutors < minNumExecutors || initialNumExecutors > 
maxNumExecutors) {
-        throw new IllegalArgumentException(
-          s"$initialExecutorsConf must be between $minExecutorsConf and 
$maxNumExecutors!")
-      }
-
-      numExecutors = initialNumExecutors
-    } else {
-      val numExecutorsConf = "spark.executor.instances"
-      numExecutors = sparkConf.getInt(numExecutorsConf, numExecutors)
-    }
+    numExecutors = 
YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
     principal = Option(principal)
       .orElse(sparkConf.getOption("spark.yarn.principal"))
       .orNull

http://git-wip-us.apache.org/repos/asf/spark/blob/353c30bd/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index fd88b8b..9e1ef1b 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -89,11 +89,7 @@ private[yarn] class YarnAllocator(
   @volatile private var numExecutorsFailed = 0
 
   @volatile private var targetNumExecutors =
-    if (Utils.isDynamicAllocationEnabled(sparkConf)) {
-      sparkConf.getInt("spark.dynamicAllocation.initialExecutors", 0)
-    } else {
-      sparkConf.getInt("spark.executor.instances", 
YarnSparkHadoopUtil.DEFAULT_NUMBER_EXECUTORS)
-    }
+    YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
 
   // Executor loss reason requests that are pending - maps from executor ID 
for inquiry to a
   // list of requesters that should be responded to once we find out why the 
given executor

http://git-wip-us.apache.org/repos/asf/spark/blob/353c30bd/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 445d3dc..f276e7e 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -314,5 +314,28 @@ object YarnSparkHadoopUtil {
   def getClassPathSeparator(): String = {
     classPathSeparatorField.get(null).asInstanceOf[String]
   }
+
+  /**
+   * Getting the initial target number of executors depends on whether dynamic 
allocation is
+   * enabled.
+   */
+  def getInitialTargetExecutorNumber(conf: SparkConf): Int = {
+    if (Utils.isDynamicAllocationEnabled(conf)) {
+      val minNumExecutors = 
conf.getInt("spark.dynamicAllocation.minExecutors", 0)
+      val initialNumExecutors =
+        conf.getInt("spark.dynamicAllocation.initialExecutors", 
minNumExecutors)
+      val maxNumExecutors = 
conf.getInt("spark.dynamicAllocation.maxExecutors", Int.MaxValue)
+      require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= 
maxNumExecutors,
+        s"initial executor number $initialNumExecutors must between min 
executor number" +
+          s"$minNumExecutors and max executor number $maxNumExecutors")
+
+      initialNumExecutors
+    } else {
+      val targetNumExecutors =
+        
sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(DEFAULT_NUMBER_EXECUTORS)
+      // System property can override environment variable.
+      conf.getInt("spark.executor.instances", targetNumExecutors)
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/353c30bd/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 
b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index 1aed5a1..50b699f 100644
--- 
a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ 
b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -17,21 +17,13 @@
 
 package org.apache.spark.scheduler.cluster
 
-import java.net.NetworkInterface
-
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.yarn.api.records.NodeState
-import org.apache.hadoop.yarn.client.api.YarnClient
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 
 import org.apache.spark.SparkContext
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.scheduler.TaskSchedulerImpl
-import org.apache.spark.util.{IntParam, Utils}
+import org.apache.spark.util.Utils
 
 private[spark] class YarnClusterSchedulerBackend(
     scheduler: TaskSchedulerImpl,
@@ -40,13 +32,7 @@ private[spark] class YarnClusterSchedulerBackend(
 
   override def start() {
     super.start()
-    totalExpectedExecutors = DEFAULT_NUMBER_EXECUTORS
-    if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
-      totalExpectedExecutors = 
IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES"))
-        .getOrElse(totalExpectedExecutors)
-    }
-    // System property can override environment variable.
-    totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", 
totalExpectedExecutors)
+    totalExpectedExecutors = 
YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sc.conf)
   }
 
   override def applicationId(): String =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to