Repository: spark
Updated Branches:
  refs/heads/master f376c3726 -> d8220c1e5


[SPARK-16435][YARN][MINOR] Add warning log if initialExecutors is less than 
minExecutors

## What changes were proposed in this pull request?

Currently if `spark.dynamicAllocation.initialExecutors` is less than 
`spark.dynamicAllocation.minExecutors`, Spark will automatically pick the 
minExecutors without any warning. While in 1.6 Spark will throw exception if 
configured like this. So here propose to add warning log if these parameters 
are configured invalidly.

## How was this patch tested?

Unit test added to verify the scenario.

Author: jerryshao <ss...@hortonworks.com>

Closes #14149 from jerryshao/SPARK-16435.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8220c1e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8220c1e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8220c1e

Branch: refs/heads/master
Commit: d8220c1e5e94abbdb9643672b918f0d748206db9
Parents: f376c37
Author: jerryshao <ss...@hortonworks.com>
Authored: Wed Jul 13 13:24:47 2016 -0500
Committer: Tom Graves <tgra...@yahoo-inc.com>
Committed: Wed Jul 13 13:24:47 2016 -0500

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/util/Utils.scala | 19 ++++++++++++++++++-
 .../scala/org/apache/spark/util/UtilsSuite.scala |  3 +++
 2 files changed, 21 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d8220c1e/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 298e624..2e4ec4c 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2342,10 +2342,27 @@ private[spark] object Utils extends Logging {
    * Return the initial number of executors for dynamic allocation.
    */
   def getDynamicAllocationInitialExecutors(conf: SparkConf): Int = {
-    Seq(
+    if (conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS) < 
conf.get(DYN_ALLOCATION_MIN_EXECUTORS)) {
+      logWarning(s"${DYN_ALLOCATION_INITIAL_EXECUTORS.key} less than " +
+        s"${DYN_ALLOCATION_MIN_EXECUTORS.key} is invalid, ignoring its 
setting, " +
+          "please update your configs.")
+    }
+
+    if (conf.get(EXECUTOR_INSTANCES).getOrElse(0) < 
conf.get(DYN_ALLOCATION_MIN_EXECUTORS)) {
+      logWarning(s"${EXECUTOR_INSTANCES.key} less than " +
+        s"${DYN_ALLOCATION_MIN_EXECUTORS.key} is invalid, ignoring its 
setting, " +
+          "please update your configs.")
+    }
+
+    val initialExecutors = Seq(
       conf.get(DYN_ALLOCATION_MIN_EXECUTORS),
       conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS),
       conf.get(EXECUTOR_INSTANCES).getOrElse(0)).max
+
+    logInfo(s"Using initial executors = $initialExecutors, max of " +
+      s"${DYN_ALLOCATION_INITIAL_EXECUTORS.key}, 
${DYN_ALLOCATION_MIN_EXECUTORS.key} and " +
+        s"${EXECUTOR_INSTANCES.key}")
+    initialExecutors
   }
 
   def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = 
{

http://git-wip-us.apache.org/repos/asf/spark/blob/d8220c1e/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index f5d0fb0..30952a9 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -782,6 +782,9 @@ class UtilsSuite extends SparkFunSuite with 
ResetSystemProperties with Logging {
       conf.set("spark.dynamicAllocation.initialExecutors", "3")) === 4)
     assert(Utils.getDynamicAllocationInitialExecutors( // should use 
initialExecutors
       conf.set("spark.dynamicAllocation.initialExecutors", "5")) === 5)
+    assert(Utils.getDynamicAllocationInitialExecutors( // should use 
minExecutors
+      conf.set("spark.dynamicAllocation.initialExecutors", "2")
+        .set("spark.executor.instances", "1")) === 3)
   }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to