Repository: spark
Updated Branches:
  refs/heads/branch-2.0 7d9bd951b -> 90f0e8132


[SPARK-16435][YARN][MINOR] Add warning log if initialExecutors is less than 
minExecutors

## What changes were proposed in this pull request?

Currently if `spark.dynamicAllocation.initialExecutors` is less than 
`spark.dynamicAllocation.minExecutors`, Spark will automatically pick the 
minExecutors without any warning. While in 1.6 Spark will throw exception if 
configured like this. So here propose to add warning log if these parameters 
are configured invalidly.

## How was this patch tested?

Unit test added to verify the scenario.

Author: jerryshao <ss...@hortonworks.com>

Closes #14149 from jerryshao/SPARK-16435.

(cherry picked from commit d8220c1e5e94abbdb9643672b918f0d748206db9)
Signed-off-by: Tom Graves <tgra...@yahoo-inc.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90f0e813
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90f0e813
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90f0e813

Branch: refs/heads/branch-2.0
Commit: 90f0e8132bb40158d6d1b6be77e6b512d837466b
Parents: 7d9bd95
Author: jerryshao <ss...@hortonworks.com>
Authored: Wed Jul 13 13:24:47 2016 -0500
Committer: Tom Graves <tgra...@yahoo-inc.com>
Committed: Wed Jul 13 13:25:05 2016 -0500

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/util/Utils.scala | 19 ++++++++++++++++++-
 .../scala/org/apache/spark/util/UtilsSuite.scala |  3 +++
 2 files changed, 21 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/90f0e813/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 156cf17..a79d195 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2338,10 +2338,27 @@ private[spark] object Utils extends Logging {
    * Return the initial number of executors for dynamic allocation.
    */
   def getDynamicAllocationInitialExecutors(conf: SparkConf): Int = {
-    Seq(
+    if (conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS) < 
conf.get(DYN_ALLOCATION_MIN_EXECUTORS)) {
+      logWarning(s"${DYN_ALLOCATION_INITIAL_EXECUTORS.key} less than " +
+        s"${DYN_ALLOCATION_MIN_EXECUTORS.key} is invalid, ignoring its 
setting, " +
+          "please update your configs.")
+    }
+
+    if (conf.get(EXECUTOR_INSTANCES).getOrElse(0) < 
conf.get(DYN_ALLOCATION_MIN_EXECUTORS)) {
+      logWarning(s"${EXECUTOR_INSTANCES.key} less than " +
+        s"${DYN_ALLOCATION_MIN_EXECUTORS.key} is invalid, ignoring its 
setting, " +
+          "please update your configs.")
+    }
+
+    val initialExecutors = Seq(
       conf.get(DYN_ALLOCATION_MIN_EXECUTORS),
       conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS),
       conf.get(EXECUTOR_INSTANCES).getOrElse(0)).max
+
+    logInfo(s"Using initial executors = $initialExecutors, max of " +
+      s"${DYN_ALLOCATION_INITIAL_EXECUTORS.key}, 
${DYN_ALLOCATION_MIN_EXECUTORS.key} and " +
+        s"${EXECUTOR_INSTANCES.key}")
+    initialExecutors
   }
 
   def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = 
{

http://git-wip-us.apache.org/repos/asf/spark/blob/90f0e813/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index f5d0fb0..30952a9 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -782,6 +782,9 @@ class UtilsSuite extends SparkFunSuite with 
ResetSystemProperties with Logging {
       conf.set("spark.dynamicAllocation.initialExecutors", "3")) === 4)
     assert(Utils.getDynamicAllocationInitialExecutors( // should use 
initialExecutors
       conf.set("spark.dynamicAllocation.initialExecutors", "5")) === 5)
+    assert(Utils.getDynamicAllocationInitialExecutors( // should use 
minExecutors
+      conf.set("spark.dynamicAllocation.initialExecutors", "2")
+        .set("spark.executor.instances", "1")) === 3)
   }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to