Repository: spark Updated Branches: refs/heads/branch-2.0 7d9bd951b -> 90f0e8132
[SPARK-16435][YARN][MINOR] Add warning log if initialExecutors is less than minExecutors ## What changes were proposed in this pull request? Currently if `spark.dynamicAllocation.initialExecutors` is less than `spark.dynamicAllocation.minExecutors`, Spark will automatically pick the minExecutors without any warning. While in 1.6 Spark will throw exception if configured like this. So here propose to add warning log if these parameters are configured invalidly. ## How was this patch tested? Unit test added to verify the scenario. Author: jerryshao <ss...@hortonworks.com> Closes #14149 from jerryshao/SPARK-16435. (cherry picked from commit d8220c1e5e94abbdb9643672b918f0d748206db9) Signed-off-by: Tom Graves <tgra...@yahoo-inc.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90f0e813 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90f0e813 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90f0e813 Branch: refs/heads/branch-2.0 Commit: 90f0e8132bb40158d6d1b6be77e6b512d837466b Parents: 7d9bd95 Author: jerryshao <ss...@hortonworks.com> Authored: Wed Jul 13 13:24:47 2016 -0500 Committer: Tom Graves <tgra...@yahoo-inc.com> Committed: Wed Jul 13 13:25:05 2016 -0500 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/util/Utils.scala | 19 ++++++++++++++++++- .../scala/org/apache/spark/util/UtilsSuite.scala | 3 +++ 2 files changed, 21 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/90f0e813/core/src/main/scala/org/apache/spark/util/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 156cf17..a79d195 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2338,10 +2338,27 @@ private[spark] object Utils extends Logging { * Return the initial number of executors for dynamic allocation. */ def getDynamicAllocationInitialExecutors(conf: SparkConf): Int = { - Seq( + if (conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS) < conf.get(DYN_ALLOCATION_MIN_EXECUTORS)) { + logWarning(s"${DYN_ALLOCATION_INITIAL_EXECUTORS.key} less than " + + s"${DYN_ALLOCATION_MIN_EXECUTORS.key} is invalid, ignoring its setting, " + + "please update your configs.") + } + + if (conf.get(EXECUTOR_INSTANCES).getOrElse(0) < conf.get(DYN_ALLOCATION_MIN_EXECUTORS)) { + logWarning(s"${EXECUTOR_INSTANCES.key} less than " + + s"${DYN_ALLOCATION_MIN_EXECUTORS.key} is invalid, ignoring its setting, " + + "please update your configs.") + } + + val initialExecutors = Seq( conf.get(DYN_ALLOCATION_MIN_EXECUTORS), conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS), conf.get(EXECUTOR_INSTANCES).getOrElse(0)).max + + logInfo(s"Using initial executors = $initialExecutors, max of " + + s"${DYN_ALLOCATION_INITIAL_EXECUTORS.key}, ${DYN_ALLOCATION_MIN_EXECUTORS.key} and " + + s"${EXECUTOR_INSTANCES.key}") + initialExecutors } def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = { http://git-wip-us.apache.org/repos/asf/spark/blob/90f0e813/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index f5d0fb0..30952a9 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -782,6 +782,9 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { conf.set("spark.dynamicAllocation.initialExecutors", "3")) === 4) assert(Utils.getDynamicAllocationInitialExecutors( // should use initialExecutors conf.set("spark.dynamicAllocation.initialExecutors", "5")) === 5) + assert(Utils.getDynamicAllocationInitialExecutors( // should use minExecutors + conf.set("spark.dynamicAllocation.initialExecutors", "2") + .set("spark.executor.instances", "1")) === 3) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org