Repository: spark
Updated Branches:
  refs/heads/branch-1.5 b28295fe0 -> 8537e51d3


[SPARK-9092] Fixed incompatibility when both num-executors and dynamic...

… allocation are set. Now, dynamic allocation is set to false when 
num-executors is explicitly specified as an argument. Consequently, 
executorAllocationManager in not initialized in the SparkContext.

Author: Niranjan Padmanabhan <niranjan.padmanab...@cloudera.com>

Closes #7657 from neurons/SPARK-9092.

(cherry picked from commit 738f353988dbf02704bd63f5e35d94402c59ed79)
Signed-off-by: Marcelo Vanzin <van...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8537e51d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8537e51d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8537e51d

Branch: refs/heads/branch-1.5
Commit: 8537e51d39f693c58732b07ceb6b4ad308d5a0ba
Parents: b28295f
Author: Niranjan Padmanabhan <niranjan.padmanab...@cloudera.com>
Authored: Wed Aug 12 16:10:21 2015 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Wed Aug 12 16:10:43 2015 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkConf.scala  | 19 +++++++++++++++++++
 .../scala/org/apache/spark/SparkContext.scala    |  6 +++++-
 .../org/apache/spark/deploy/SparkSubmit.scala    |  4 ++--
 .../main/scala/org/apache/spark/util/Utils.scala | 11 +++++++++++
 .../org/apache/spark/SparkContextSuite.scala     |  8 ++++++++
 .../apache/spark/deploy/SparkSubmitSuite.scala   |  1 -
 docs/running-on-yarn.md                          |  2 +-
 .../spark/deploy/yarn/ApplicationMaster.scala    |  4 ++--
 .../deploy/yarn/ApplicationMasterArguments.scala |  5 -----
 .../org/apache/spark/deploy/yarn/Client.scala    |  5 ++++-
 .../spark/deploy/yarn/ClientArguments.scala      |  8 +-------
 .../apache/spark/deploy/yarn/YarnAllocator.scala |  9 ++++++++-
 .../cluster/YarnClientSchedulerBackend.scala     |  3 ---
 .../spark/deploy/yarn/YarnAllocatorSuite.scala   |  5 +++--
 14 files changed, 64 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8537e51d/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 8ff154f..b344b5e 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -389,6 +389,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging {
     val driverOptsKey = "spark.driver.extraJavaOptions"
     val driverClassPathKey = "spark.driver.extraClassPath"
     val driverLibraryPathKey = "spark.driver.extraLibraryPath"
+    val sparkExecutorInstances = "spark.executor.instances"
 
     // Used by Yarn in 1.1 and before
     sys.props.get("spark.driver.libraryPath").foreach { value =>
@@ -476,6 +477,24 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging {
         }
       }
     }
+
+    if (!contains(sparkExecutorInstances)) {
+      sys.env.get("SPARK_WORKER_INSTANCES").foreach { value =>
+        val warning =
+          s"""
+             |SPARK_WORKER_INSTANCES was detected (set to '$value').
+             |This is deprecated in Spark 1.0+.
+             |
+             |Please instead use:
+             | - ./spark-submit with --num-executors to specify the number of 
executors
+             | - Or set SPARK_EXECUTOR_INSTANCES
+             | - spark.executor.instances to configure the number of instances 
in the spark config.
+        """.stripMargin
+        logWarning(warning)
+
+        set("spark.executor.instances", value)
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/8537e51d/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 6aafb4c..207a0c1 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -528,7 +528,11 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
       }
 
     // Optionally scale number of executors dynamically based on workload. 
Exposed for testing.
-    val dynamicAllocationEnabled = 
_conf.getBoolean("spark.dynamicAllocation.enabled", false)
+    val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
+    if (!dynamicAllocationEnabled && 
_conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
+      logInfo("Dynamic Allocation and num executors both set, thus dynamic 
allocation disabled.")
+    }
+
     _executorAllocationManager =
       if (dynamicAllocationEnabled) {
         Some(new ExecutorAllocationManager(this, listenerBus, _conf))

http://git-wip-us.apache.org/repos/asf/spark/blob/8537e51d/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 7ac6cbc..02fa308 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -422,7 +422,8 @@ object SparkSubmit {
 
       // Yarn client only
       OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
-      OptionAssigner(args.numExecutors, YARN, CLIENT, sysProp = 
"spark.executor.instances"),
+      OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,
+        sysProp = "spark.executor.instances"),
       OptionAssigner(args.files, YARN, CLIENT, sysProp = 
"spark.yarn.dist.files"),
       OptionAssigner(args.archives, YARN, CLIENT, sysProp = 
"spark.yarn.dist.archives"),
       OptionAssigner(args.principal, YARN, CLIENT, sysProp = 
"spark.yarn.principal"),
@@ -433,7 +434,6 @@ object SparkSubmit {
       OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = 
"--driver-memory"),
       OptionAssigner(args.driverCores, YARN, CLUSTER, clOption = 
"--driver-cores"),
       OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"),
-      OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = 
"--num-executors"),
       OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = 
"--executor-memory"),
       OptionAssigner(args.executorCores, YARN, CLUSTER, clOption = 
"--executor-cores"),
       OptionAssigner(args.files, YARN, CLUSTER, clOption = "--files"),

http://git-wip-us.apache.org/repos/asf/spark/blob/8537e51d/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index c4012d0..a90d854 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2286,6 +2286,17 @@ private[spark] object Utils extends Logging {
     isInDirectory(parent, child.getParentFile)
   }
 
+  /**
+   * Return whether dynamic allocation is enabled in the given conf
+   * Dynamic allocation and explicitly setting the number of executors are 
inherently
+   * incompatible. In environments where dynamic allocation is turned on by 
default,
+   * the latter should override the former (SPARK-9092).
+   */
+  def isDynamicAllocationEnabled(conf: SparkConf): Boolean = {
+    conf.contains("spark.dynamicAllocation.enabled") &&
+      conf.getInt("spark.executor.instances", 0) == 0
+  }
+
 }
 
 private [util] class SparkShutdownHookManager {

http://git-wip-us.apache.org/repos/asf/spark/blob/8537e51d/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 5c57940..d4f2ea8 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -285,4 +285,12 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext {
     }
   }
 
+  test("No exception when both num-executors and dynamic allocation set.") {
+    noException should be thrownBy {
+      sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local")
+        .set("spark.dynamicAllocation.enabled", 
"true").set("spark.executor.instances", "6"))
+      assert(sc.executorAllocationManager.isEmpty)
+      assert(sc.getConf.getInt("spark.executor.instances", 0) === 6)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8537e51d/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 757e0ce..2456c5d 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -159,7 +159,6 @@ class SparkSubmitSuite
     childArgsStr should include ("--executor-cores 5")
     childArgsStr should include ("--arg arg1 --arg arg2")
     childArgsStr should include ("--queue thequeue")
-    childArgsStr should include ("--num-executors 6")
     childArgsStr should include regex ("--jar .*thejar.jar")
     childArgsStr should include regex ("--addJars 
.*one.jar,.*two.jar,.*three.jar")
     childArgsStr should include regex ("--files .*file1.txt,.*file2.txt")

http://git-wip-us.apache.org/repos/asf/spark/blob/8537e51d/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index cac08a9..ec32c41 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -199,7 +199,7 @@ If you need a reference to the proper location to put log 
files in the YARN so t
  <td><code>spark.executor.instances</code></td>
   <td>2</td>
   <td>
-    The number of executors. Note that this property is incompatible with 
<code>spark.dynamicAllocation.enabled</code>.
+    The number of executors. Note that this property is incompatible with 
<code>spark.dynamicAllocation.enabled</code>. If both 
<code>spark.dynamicAllocation.enabled</code> and 
<code>spark.executor.instances</code> are specified, dynamic allocation is 
turned off and the specified number of <code>spark.executor.instances</code> is 
used. 
   </td>
 </tr>
 <tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/8537e51d/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 1d67b3e..e19940d 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -64,7 +64,8 @@ private[spark] class ApplicationMaster(
 
   // Default to numExecutors * 2, with minimum of 3
   private val maxNumExecutorFailures = 
sparkConf.getInt("spark.yarn.max.executor.failures",
-    sparkConf.getInt("spark.yarn.max.worker.failures", 
math.max(args.numExecutors * 2, 3)))
+    sparkConf.getInt("spark.yarn.max.worker.failures",
+      math.max(sparkConf.getInt("spark.executor.instances", 0) *  2, 3)))
 
   @volatile private var exitCode = 0
   @volatile private var unregistered = false
@@ -493,7 +494,6 @@ private[spark] class ApplicationMaster(
    */
   private def startUserApplication(): Thread = {
     logInfo("Starting the user application in a separate Thread")
-    System.setProperty("spark.executor.instances", args.numExecutors.toString)
 
     val classpath = Client.getUserClasspath(sparkConf)
     val urls = classpath.map { entry =>

http://git-wip-us.apache.org/repos/asf/spark/blob/8537e51d/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index 37f7937..b084124 100644
--- 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -29,7 +29,6 @@ class ApplicationMasterArguments(val args: Array[String]) {
   var userArgs: Seq[String] = Nil
   var executorMemory = 1024
   var executorCores = 1
-  var numExecutors = DEFAULT_NUMBER_EXECUTORS
   var propertiesFile: String = null
 
   parseArgs(args.toList)
@@ -63,10 +62,6 @@ class ApplicationMasterArguments(val args: Array[String]) {
           userArgsBuffer += value
           args = tail
 
-        case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail 
=>
-          numExecutors = value
-          args = tail
-
         case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) 
:: tail =>
           executorMemory = value
           args = tail

http://git-wip-us.apache.org/repos/asf/spark/blob/8537e51d/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index b4ba3f0..6d63dda 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -751,7 +751,6 @@ private[spark] class Client(
         userArgs ++ Seq(
           "--executor-memory", args.executorMemory.toString + "m",
           "--executor-cores", args.executorCores.toString,
-          "--num-executors ", args.numExecutors.toString,
           "--properties-file", 
buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
             LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
 
@@ -960,6 +959,10 @@ object Client extends Logging {
     val sparkConf = new SparkConf
 
     val args = new ClientArguments(argStrings, sparkConf)
+    // to maintain backwards-compatibility
+    if (!Utils.isDynamicAllocationEnabled(sparkConf)) {
+      sparkConf.setIfMissing("spark.executor.instances", 
args.numExecutors.toString)
+    }
     new Client(args, sparkConf).run()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8537e51d/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 20d63d4..4f42ffe 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -53,8 +53,7 @@ private[spark] class ClientArguments(args: Array[String], 
sparkConf: SparkConf)
   private val amMemOverheadKey = "spark.yarn.am.memoryOverhead"
   private val driverCoresKey = "spark.driver.cores"
   private val amCoresKey = "spark.yarn.am.cores"
-  private val isDynamicAllocationEnabled =
-    sparkConf.getBoolean("spark.dynamicAllocation.enabled", false)
+  private val isDynamicAllocationEnabled = 
Utils.isDynamicAllocationEnabled(sparkConf)
 
   parseArgs(args.toList)
   loadEnvironmentArgs()
@@ -196,11 +195,6 @@ private[spark] class ClientArguments(args: Array[String], 
sparkConf: SparkConf)
           if (args(0) == "--num-workers") {
             println("--num-workers is deprecated. Use --num-executors 
instead.")
           }
-          // Dynamic allocation is not compatible with this option
-          if (isDynamicAllocationEnabled) {
-            throw new IllegalArgumentException("Explicitly setting the number 
" +
-              "of executors is not compatible with 
spark.dynamicAllocation.enabled!")
-          }
           numExecutors = value
           args = tail
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8537e51d/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 59caa78..ccf753e 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -21,6 +21,8 @@ import java.util.Collections
 import java.util.concurrent._
 import java.util.regex.Pattern
 
+import org.apache.spark.util.Utils
+
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 
@@ -86,7 +88,12 @@ private[yarn] class YarnAllocator(
   private var executorIdCounter = 0
   @volatile private var numExecutorsFailed = 0
 
-  @volatile private var targetNumExecutors = args.numExecutors
+  @volatile private var targetNumExecutors =
+    if (Utils.isDynamicAllocationEnabled(sparkConf)) {
+      sparkConf.getInt("spark.dynamicAllocation.initialExecutors", 0)
+    } else {
+      sparkConf.getInt("spark.executor.instances", 
YarnSparkHadoopUtil.DEFAULT_NUMBER_EXECUTORS)
+    }
 
   // Keep track of which container is running which executor to remove the 
executors later
   // Visible for testing.

http://git-wip-us.apache.org/repos/asf/spark/blob/8537e51d/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
 
b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index d225061..d06d951 100644
--- 
a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ 
b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -81,8 +81,6 @@ private[spark] class YarnClientSchedulerBackend(
     // List of (target Client argument, environment variable, Spark property)
     val optionTuples =
       List(
-        ("--num-executors", "SPARK_WORKER_INSTANCES", 
"spark.executor.instances"),
-        ("--num-executors", "SPARK_EXECUTOR_INSTANCES", 
"spark.executor.instances"),
         ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"),
         ("--executor-memory", "SPARK_EXECUTOR_MEMORY", 
"spark.executor.memory"),
         ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
@@ -92,7 +90,6 @@ private[spark] class YarnClientSchedulerBackend(
       )
     // Warn against the following deprecated environment variables: env var -> 
suggestion
     val deprecatedEnvVars = Map(
-      "SPARK_WORKER_INSTANCES" -> "SPARK_WORKER_INSTANCES or --num-executors 
through spark-submit",
       "SPARK_WORKER_MEMORY" -> "SPARK_EXECUTOR_MEMORY or --executor-memory 
through spark-submit",
       "SPARK_WORKER_CORES" -> "SPARK_EXECUTOR_CORES or --executor-cores 
through spark-submit")
     optionTuples.foreach { case (optionName, envVar, sparkProp) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/8537e51d/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 58318bf..5d05f51 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -87,16 +87,17 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
 
   def createAllocator(maxExecutors: Int = 5): YarnAllocator = {
     val args = Array(
-      "--num-executors", s"$maxExecutors",
       "--executor-cores", "5",
       "--executor-memory", "2048",
       "--jar", "somejar.jar",
       "--class", "SomeClass")
+    val sparkConfClone = sparkConf.clone()
+    sparkConfClone.set("spark.executor.instances", maxExecutors.toString)
     new YarnAllocator(
       "not used",
       mock(classOf[RpcEndpointRef]),
       conf,
-      sparkConf,
+      sparkConfClone,
       rmClient,
       appAttemptId,
       new ApplicationMasterArguments(args),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to