This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 39f75b4  [SPARK-27192][CORE] spark.task.cpus should be less or equal 
than spark.executor.cores
39f75b4 is described below

commit 39f75b45889951b175c48b2e849e920d0a0eee6e
Author: liulijia <liutang...@yeah.net>
AuthorDate: Fri Apr 5 13:55:57 2019 -0500

    [SPARK-27192][CORE] spark.task.cpus should be less or equal than 
spark.executor.cores
    
    ## What changes were proposed in this pull request?
    check spark.task.cpus before creating TaskScheduler in SparkContext
    
    ## How was this patch tested?
    UT
    
    Please review http://spark.apache.org/contributing.html before opening a 
pull request.
    
    Closes #24261 from liutang123/SPARK-27192.
    
    Authored-by: liulijia <liutang...@yeah.net>
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
---
 .../apache/spark/ExecutorAllocationManager.scala   |  3 ---
 .../main/scala/org/apache/spark/SparkConf.scala    | 10 ---------
 .../main/scala/org/apache/spark/SparkContext.scala | 24 ++++++++++++++++++++++
 .../scala/org/apache/spark/SparkConfSuite.scala    |  7 -------
 .../scala/org/apache/spark/SparkContextSuite.scala | 21 +++++++++++++++++++
 .../spark/scheduler/TaskSchedulerImplSuite.scala   | 22 +++++++++++++++-----
 6 files changed, 62 insertions(+), 25 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 60d0404..6fade10 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -222,9 +222,6 @@ private[spark] class ExecutorAllocationManager(
       throw new SparkException("Dynamic allocation of executors requires the 
external " +
         "shuffle service. You may enable this through 
spark.shuffle.service.enabled.")
     }
-    if (tasksPerExecutorForFullParallelism == 0) {
-      throw new SparkException(s"${EXECUTOR_CORES.key} must not be < 
${CPUS_PER_TASK.key}.")
-    }
 
     if (executorAllocationRatio > 1.0 || executorAllocationRatio <= 0.0) {
       throw new SparkException(
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 4117aea..913a170 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -575,16 +575,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Seria
       }
     }
 
-    if (contains(EXECUTOR_CORES) && contains(CPUS_PER_TASK)) {
-      val executorCores = get(EXECUTOR_CORES)
-      val taskCpus = get(CPUS_PER_TASK)
-
-      if (executorCores < taskCpus) {
-        throw new SparkException(
-          s"${EXECUTOR_CORES.key} must not be less than ${CPUS_PER_TASK.key}.")
-      }
-    }
-
     val encryptionEnabled = get(NETWORK_CRYPTO_ENABLED) || 
get(SASL_ENCRYPTION_ENABLED)
     require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED),
       s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.")
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4abb18d..8b74435 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2665,8 +2665,27 @@ object SparkContext extends Logging {
     // When running locally, don't try to re-execute tasks on failure.
     val MAX_LOCAL_TASK_FAILURES = 1
 
+    // SPARK-26340: Ensure that executor's core num meets at least one task 
requirement.
+    def checkCpusPerTask(
+      clusterMode: Boolean,
+      maxCoresPerExecutor: Option[Int]): Unit = {
+      val cpusPerTask = sc.conf.get(CPUS_PER_TASK)
+      if (clusterMode && sc.conf.contains(EXECUTOR_CORES)) {
+        if (sc.conf.get(EXECUTOR_CORES) < cpusPerTask) {
+          throw new SparkException(s"${CPUS_PER_TASK.key}" +
+            s" must be <= ${EXECUTOR_CORES.key} when run on $master.")
+        }
+      } else if (maxCoresPerExecutor.isDefined) {
+        if (maxCoresPerExecutor.get < cpusPerTask) {
+          throw new SparkException(s"Only ${maxCoresPerExecutor.get} cores 
available per executor" +
+            s" when run on $master, and ${CPUS_PER_TASK.key} must be <= it.")
+        }
+      }
+    }
+
     master match {
       case "local" =>
+        checkCpusPerTask(clusterMode = false, Some(1))
         val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, 
isLocal = true)
         val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
         scheduler.initialize(backend)
@@ -2679,6 +2698,7 @@ object SparkContext extends Logging {
         if (threadCount <= 0) {
           throw new SparkException(s"Asked to run locally with $threadCount 
threads")
         }
+        checkCpusPerTask(clusterMode = false, Some(threadCount))
         val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, 
isLocal = true)
         val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 
threadCount)
         scheduler.initialize(backend)
@@ -2689,12 +2709,14 @@ object SparkContext extends Logging {
         // local[*, M] means the number of cores on the computer with M 
failures
         // local[N, M] means exactly N threads with M failures
         val threadCount = if (threads == "*") localCpuCount else threads.toInt
+        checkCpusPerTask(clusterMode = false, Some(threadCount))
         val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = 
true)
         val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 
threadCount)
         scheduler.initialize(backend)
         (backend, scheduler)
 
       case SPARK_REGEX(sparkUrl) =>
+        checkCpusPerTask(clusterMode = true, None)
         val scheduler = new TaskSchedulerImpl(sc)
         val masterUrls = sparkUrl.split(",").map("spark://" + _)
         val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
@@ -2702,6 +2724,7 @@ object SparkContext extends Logging {
         (backend, scheduler)
 
       case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
+        checkCpusPerTask(clusterMode = true, Some(coresPerSlave.toInt))
         // Check to make sure memory requested <= memoryPerSlave. Otherwise 
Spark will just hang.
         val memoryPerSlaveInt = memoryPerSlave.toInt
         if (sc.executorMemory > memoryPerSlaveInt) {
@@ -2722,6 +2745,7 @@ object SparkContext extends Logging {
         (backend, scheduler)
 
       case masterUrl =>
+        checkCpusPerTask(clusterMode = true, None)
         val cm = getClusterManager(masterUrl) match {
           case Some(clusterMgr) => clusterMgr
           case None => throw new SparkException("Could not parse Master URL: 
'" + master + "'")
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 9f759a5..0795790 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -140,13 +140,6 @@ class SparkConfSuite extends SparkFunSuite with 
LocalSparkContext with ResetSyst
     assert(sc.appName === "My other app")
   }
 
-  test("creating SparkContext with cpus per tasks bigger than cores per 
executors") {
-    val conf = new SparkConf(false)
-      .set(EXECUTOR_CORES, 1)
-      .set(CPUS_PER_TASK, 2)
-    intercept[SparkException] { sc = new SparkContext(conf) }
-  }
-
   test("nested property names") {
     // This wasn't supported by some external conf parsing libraries
     System.setProperty("spark.test.a", "a")
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 7a16f7b..3490eaf 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -710,6 +710,27 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
       assert(runningTaskIds.isEmpty)
     }
   }
+
+  test(s"Avoid setting ${CPUS_PER_TASK.key} unreasonably (SPARK-27192)") {
+    val FAIL_REASON = s"${CPUS_PER_TASK.key} must be <="
+    Seq(
+      ("local", 2, None),
+      ("local[2]", 3, None),
+      ("local[2, 1]", 3, None),
+      ("spark://test-spark-cluster", 2, Option(1)),
+      ("local-cluster[1, 1, 1000]", 2, Option(1)),
+      ("yarn", 2, Option(1))
+    ).foreach { case (master, cpusPerTask, executorCores) =>
+      val conf = new SparkConf()
+      conf.set(CPUS_PER_TASK, cpusPerTask)
+      executorCores.map(executorCores => conf.set(EXECUTOR_CORES, 
executorCores))
+      val ex = intercept[SparkException] {
+        sc = new SparkContext(master, "test", conf)
+      }
+      assert(ex.getMessage.contains(FAIL_REASON))
+      resetSparkContext()
+    }
+  }
 }
 
 object SparkContextSuite {
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 1a81f55..115b203 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -77,7 +77,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
   }
 
   def setupScheduler(confs: (String, String)*): TaskSchedulerImpl = {
-    val conf = new 
SparkConf().setMaster("local").setAppName("TaskSchedulerImplSuite")
+    setupSchedulerWithMaster("local", confs: _*)
+  }
+
+  def setupSchedulerWithMaster(master: String, confs: (String, String)*): 
TaskSchedulerImpl = {
+    val conf = new 
SparkConf().setMaster(master).setAppName("TaskSchedulerImplSuite")
     confs.foreach { case (k, v) => conf.set(k, v) }
     sc = new SparkContext(conf)
     taskScheduler = new TaskSchedulerImpl(sc)
@@ -155,7 +159,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
 
   test("Scheduler correctly accounts for multiple CPUs per task") {
     val taskCpus = 2
-    val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> 
taskCpus.toString)
+    val taskScheduler = setupSchedulerWithMaster(
+      s"local[$taskCpus]",
+      config.CPUS_PER_TASK.key -> taskCpus.toString)
     // Give zero core offers. Should not generate any tasks
     val zeroCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", 
"host0", 0),
       new WorkerOffer("executor1", "host1", 0))
@@ -185,7 +191,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
 
   test("Scheduler does not crash when tasks are not serializable") {
     val taskCpus = 2
-    val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> 
taskCpus.toString)
+    val taskScheduler = setupSchedulerWithMaster(
+      s"local[$taskCpus]",
+      config.CPUS_PER_TASK.key -> taskCpus.toString)
     val numFreeCores = 1
     val taskSet = new TaskSet(
       Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 
1)), 0, 0, 0, null)
@@ -1241,7 +1249,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
 
   test("don't schedule for a barrier taskSet if available slots are less than 
pending tasks") {
     val taskCpus = 2
-    val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> 
taskCpus.toString)
+    val taskScheduler = setupSchedulerWithMaster(
+      s"local[$taskCpus]",
+      config.CPUS_PER_TASK.key -> taskCpus.toString)
 
     val numFreeCores = 3
     val workerOffers = IndexedSeq(
@@ -1258,7 +1268,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
 
   test("schedule tasks for a barrier taskSet if all tasks can be launched 
together") {
     val taskCpus = 2
-    val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> 
taskCpus.toString)
+    val taskScheduler = setupSchedulerWithMaster(
+      s"local[$taskCpus]",
+      config.CPUS_PER_TASK.key -> taskCpus.toString)
 
     val numFreeCores = 3
     val workerOffers = IndexedSeq(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to