Repository: spark Updated Branches: refs/heads/master f376c3726 -> d8220c1e5
[SPARK-16435][YARN][MINOR] Add warning log if initialExecutors is less than minExecutors ## What changes were proposed in this pull request? Currently if `spark.dynamicAllocation.initialExecutors` is less than `spark.dynamicAllocation.minExecutors`, Spark will automatically pick the minExecutors without any warning. While in 1.6 Spark will throw exception if configured like this. So here propose to add warning log if these parameters are configured invalidly. ## How was this patch tested? Unit test added to verify the scenario. Author: jerryshao <ss...@hortonworks.com> Closes #14149 from jerryshao/SPARK-16435. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8220c1e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8220c1e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8220c1e Branch: refs/heads/master Commit: d8220c1e5e94abbdb9643672b918f0d748206db9 Parents: f376c37 Author: jerryshao <ss...@hortonworks.com> Authored: Wed Jul 13 13:24:47 2016 -0500 Committer: Tom Graves <tgra...@yahoo-inc.com> Committed: Wed Jul 13 13:24:47 2016 -0500 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/util/Utils.scala | 19 ++++++++++++++++++- .../scala/org/apache/spark/util/UtilsSuite.scala | 3 +++ 2 files changed, 21 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d8220c1e/core/src/main/scala/org/apache/spark/util/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 298e624..2e4ec4c 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2342,10 +2342,27 @@ private[spark] object Utils extends Logging { * Return the initial number of executors for dynamic allocation. */ def getDynamicAllocationInitialExecutors(conf: SparkConf): Int = { - Seq( + if (conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS) < conf.get(DYN_ALLOCATION_MIN_EXECUTORS)) { + logWarning(s"${DYN_ALLOCATION_INITIAL_EXECUTORS.key} less than " + + s"${DYN_ALLOCATION_MIN_EXECUTORS.key} is invalid, ignoring its setting, " + + "please update your configs.") + } + + if (conf.get(EXECUTOR_INSTANCES).getOrElse(0) < conf.get(DYN_ALLOCATION_MIN_EXECUTORS)) { + logWarning(s"${EXECUTOR_INSTANCES.key} less than " + + s"${DYN_ALLOCATION_MIN_EXECUTORS.key} is invalid, ignoring its setting, " + + "please update your configs.") + } + + val initialExecutors = Seq( conf.get(DYN_ALLOCATION_MIN_EXECUTORS), conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS), conf.get(EXECUTOR_INSTANCES).getOrElse(0)).max + + logInfo(s"Using initial executors = $initialExecutors, max of " + + s"${DYN_ALLOCATION_INITIAL_EXECUTORS.key}, ${DYN_ALLOCATION_MIN_EXECUTORS.key} and " + + s"${EXECUTOR_INSTANCES.key}") + initialExecutors } def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = { http://git-wip-us.apache.org/repos/asf/spark/blob/d8220c1e/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index f5d0fb0..30952a9 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -782,6 +782,9 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { conf.set("spark.dynamicAllocation.initialExecutors", "3")) === 4) assert(Utils.getDynamicAllocationInitialExecutors( // should use initialExecutors conf.set("spark.dynamicAllocation.initialExecutors", "5")) === 5) + assert(Utils.getDynamicAllocationInitialExecutors( // should use minExecutors + conf.set("spark.dynamicAllocation.initialExecutors", "2") + .set("spark.executor.instances", "1")) === 3) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org