[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22001 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r209662081 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -402,6 +422,19 @@ class DAGScheduler( } } + /** + * Check whether the barrier stage requires more slots (to be able to launch all tasks in the + * barrier stage together) than the total number of active slots currently. Fail current check + * if trying to submit a barrier stage that requires more slots than current total number. If + * the check fails consecutively for three times for a job, then fail current job submission. --- End diff -- Seems I do not find the code about `"consecutively for three times"`, but only `maxFailureNumTasksCheck ` ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r209658945 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -929,6 +955,28 @@ class DAGScheduler( // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { + case e: Exception if e.getMessage == + DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER => +logWarning("The job requires to run a barrier stage that requires more slots than the " + + "total number of slots in the cluster currently.") +jobIdToNumTasksCheckFailures.putIfAbsent(jobId, 0) +val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId) + 1 --- End diff -- @kiszk IIUC, there's exactly only one thread in `eventLoop`, so, the scenario mentioned above will not happen. And I even feel it is no need to use `ConcurrentHashMap` for `jobIdToNumTasksCheckFailures` at all. @jiangxb1987 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r209460397 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -929,11 +963,38 @@ class DAGScheduler( // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { + case e: Exception if e.getMessage.contains( + DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER) => +logWarning(s"The job $jobId requires to run a barrier stage that requires more slots " + + "than the total number of slots in the cluster currently.") +jobIdToNumTasksCheckFailures.compute(jobId, new BiFunction[Int, Int, Int] { + override def apply(key: Int, value: Int): Int = value + 1 +}) +val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId) +if (numCheckFailures <= maxFailureNumTasksCheck) { + messageScheduler.schedule( +new Runnable { + override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func, +partitions, callSite, listener, properties)) +}, +timeIntervalNumTasksCheck * 1000, --- End diff -- minor: how about removing `1000` and changing the time unit to `SECONDS`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r209460279 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -929,11 +963,38 @@ class DAGScheduler( // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { + case e: Exception if e.getMessage.contains( + DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER) => +logWarning(s"The job $jobId requires to run a barrier stage that requires more slots " + + "than the total number of slots in the cluster currently.") +jobIdToNumTasksCheckFailures.compute(jobId, new BiFunction[Int, Int, Int] { --- End diff -- minor: Should have an inline comment that mentions the implicit conversation from `null` to `0: Int` to handle new keys. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r209460309 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -929,11 +963,38 @@ class DAGScheduler( // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { + case e: Exception if e.getMessage.contains( + DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER) => +logWarning(s"The job $jobId requires to run a barrier stage that requires more slots " + + "than the total number of slots in the cluster currently.") +jobIdToNumTasksCheckFailures.compute(jobId, new BiFunction[Int, Int, Int] { + override def apply(key: Int, value: Int): Int = value + 1 +}) +val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId) --- End diff -- minor: this is the return value from `compute`. we don't need `get`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r209304798 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -929,6 +955,28 @@ class DAGScheduler( // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { + case e: Exception if e.getMessage == + DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER => +logWarning("The job requires to run a barrier stage that requires more slots than the " + + "total number of slots in the cluster currently.") +jobIdToNumTasksCheckFailures.putIfAbsent(jobId, 0) +val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId) + 1 +if (numCheckFailures < DAGScheduler.DEFAULT_MAX_CONSECUTIVE_NUM_TASKS_CHECK_FAILURES) { --- End diff -- Should make `DEFAULT_MAX_CONSECUTIVE_NUM_TASKS_CHECK_FAILURES` configurable so users can specify unlimited retry if needed. Instead, we might want to fix the timeout since it is only relevant to cost. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r209294774 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala --- @@ -453,4 +453,8 @@ private[spark] class MesosFineGrainedSchedulerBackend( super.applicationId } + override def maxNumConcurrentTasks(): Int = { +// TODO support this method for MesosFineGrainedSchedulerBackend --- End diff -- link to a JIRA --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r209276818 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -929,6 +955,28 @@ class DAGScheduler( // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { + case e: Exception if e.getMessage == + DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER => +logWarning("The job requires to run a barrier stage that requires more slots than the " + + "total number of slots in the cluster currently.") +jobIdToNumTasksCheckFailures.putIfAbsent(jobId, 0) +val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId) + 1 --- End diff -- +1. Use atomic updates from ConcurrentHashMap. Update the counter and then check max failures. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r209277357 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -929,6 +955,28 @@ class DAGScheduler( // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { + case e: Exception if e.getMessage == + DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER => +logWarning("The job requires to run a barrier stage that requires more slots than the " + + "total number of slots in the cluster currently.") +jobIdToNumTasksCheckFailures.putIfAbsent(jobId, 0) +val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId) + 1 +if (numCheckFailures < DAGScheduler.DEFAULT_MAX_CONSECUTIVE_NUM_TASKS_CHECK_FAILURES) { + jobIdToNumTasksCheckFailures.put(jobId, numCheckFailures) + messageScheduler.schedule( +new Runnable { + override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func, +partitions, callSite, listener, properties)) +}, +timeIntervalNumTasksCheck * 1000, +TimeUnit.MILLISECONDS + ) + return +} else { + listener.jobFailed(e) --- End diff -- do you expect the same job submitted again? if not, we should remove the key from the hashmap. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r209274833 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -929,6 +955,28 @@ class DAGScheduler( // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { + case e: Exception if e.getMessage == --- End diff -- `==` -> `.contains()` in case the error message is nested --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r209272468 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -577,4 +577,17 @@ package object config { .timeConf(TimeUnit.SECONDS) .checkValue(v => v > 0, "The value should be a positive time value.") .createWithDefaultString("365d") + + private[spark] val BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL = + ConfigBuilder("spark.scheduler.barrier.maxConcurrentTasksCheck.interval") + .doc("Time in seconds to wait between a max concurrent tasks check failure and the next " + --- End diff -- "a ... failure" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r209275201 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -929,6 +955,28 @@ class DAGScheduler( // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { + case e: Exception if e.getMessage == + DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER => +logWarning("The job requires to run a barrier stage that requires more slots than the " + --- End diff -- Please include jobId, stageId, request slots, and total slots in the log message. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r209277632 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -203,6 +203,17 @@ class DAGScheduler( sc.getConf.getInt("spark.stage.maxConsecutiveAttempts", DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS) + /** + * Number of max concurrent tasks check failures for each job. + */ + private[scheduler] val jobIdToNumTasksCheckFailures = new ConcurrentHashMap[Int, Int] --- End diff -- How do entries in this map get cleaned? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r209273451 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -577,4 +577,17 @@ package object config { .timeConf(TimeUnit.SECONDS) .checkValue(v => v > 0, "The value should be a positive time value.") .createWithDefaultString("365d") + + private[spark] val BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL = + ConfigBuilder("spark.scheduler.barrier.maxConcurrentTasksCheck.interval") + .doc("Time in seconds to wait between a max concurrent tasks check failure and the next " + +"check. A max concurrent tasks check ensures the cluster can launch more concurrent " + +"tasks than required by a barrier stage on job submitted. The check can fail in case " + +"a cluster has just started and not enough executors have registered, so we wait for a " + +"little while and try to perform the check again. If the check fails consecutively for " + +"three times for a job then fail current job submission. Note this config only applies " + +"to jobs that contain one or more barrier stages, we won't perform the check on " + +"non-barrier jobs.") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("10s") --- End diff -- Would you make the default higher like `30s`? This is to cover the case when applications starts immediately with a barrier while master is adding new executors. Let me know if this won't happen. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r209294652 --- Diff: core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala --- @@ -38,4 +46,83 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo assert(smaller.size === 4) } + test("compute max number of concurrent tasks can be launched") { +val conf = new SparkConf() + .setMaster("local-cluster[4, 3, 1024]") + .setAppName("test") +sc = new SparkContext(conf) +eventually(timeout(10.seconds)) { + // Ensure all executors have been launched. + assert(sc.getExecutorIds().length == 4) +} +assert(sc.maxNumConcurrentTasks() == 12) + } + + test("compute max number of concurrent tasks can be launched when spark.task.cpus > 1") { +val conf = new SparkConf() + .set("spark.task.cpus", "2") + .setMaster("local-cluster[4, 3, 1024]") + .setAppName("test") +sc = new SparkContext(conf) +eventually(timeout(10.seconds)) { + // Ensure all executors have been launched. + assert(sc.getExecutorIds().length == 4) +} +// Each executor can only launch one task since `spark.task.cpus` is 2. +assert(sc.maxNumConcurrentTasks() == 4) + } + + test("compute max number of concurrent tasks can be launched when some executors are busy") { +val conf = new SparkConf() + .set("spark.task.cpus", "2") + .setMaster("local-cluster[4, 3, 1024]") + .setAppName("test") +sc = new SparkContext(conf) +val rdd = sc.parallelize(1 to 10, 4).mapPartitions { iter => + Thread.sleep(1000) + iter +} +var taskStarted = new AtomicBoolean(false) +var taskEnded = new AtomicBoolean(false) +val listener = new SparkListener() { + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { +taskStarted.set(true) + } + + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { +taskEnded.set(true) + } +} + +try { + sc.addSparkListener(listener) + eventually(timeout(10.seconds)) { +// Ensure all executors have been launched. +assert(sc.getExecutorIds().length == 4) + } + + // Submit a job to trigger some tasks on active executors. + testSubmitJob(sc, rdd) + + eventually(timeout(5.seconds)) { --- End diff -- Maybe safer to let the task sleep longer and cancel the task one the conditions are met. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r208950122 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -929,6 +955,28 @@ class DAGScheduler( // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { + case e: Exception if e.getMessage == + DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER => +logWarning("The job requires to run a barrier stage that requires more slots than the " + + "total number of slots in the cluster currently.") +jobIdToNumTasksCheckFailures.putIfAbsent(jobId, 0) +val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId) + 1 --- End diff -- Is it OK while this increment is not atomic? In the following scenario, the value may not be correct 1. We assume `jobIdToNumTasksCheckFailures(jobId) = 1` 1. Thread A executes L963, then `numCheckFailures = 2` 1. Thread B executes L963, then `numCheckFailures = 2` 1. Thread B executes L964 and L965, then `jobIdToNumTasksCheckFailures(jobId)` has 2. 1. Thread A executes L964 and L965, then `jobIdToNumTasksCheckFailures(jobId)` has 2. Since two threads detected failure, we expect `3`. But, it is `2`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r208947201 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -203,6 +203,17 @@ class DAGScheduler( sc.getConf.getInt("spark.stage.maxConsecutiveAttempts", DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS) + /** + * Number of max concurrent tasks check failures for each job. + */ + private[scheduler] val jobIdToNumTasksCheckFailures = new ConcurrentHashMap[Int, Int] + + /** + * Time in seconds to wait between a max concurrent tasks check failure and the next check. --- End diff -- nit: `a max` -> `max`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r208946523 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -577,4 +577,17 @@ package object config { .timeConf(TimeUnit.SECONDS) .checkValue(v => v > 0, "The value should be a positive time value.") .createWithDefaultString("365d") + + private[spark] val BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL = + ConfigBuilder("spark.scheduler.barrier.maxConcurrentTasksCheck.interval") + .doc("Time in seconds to wait between a max concurrent tasks check failure and the next " + --- End diff -- nit: `a max` -> `max`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r208945843 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1602,6 +1602,15 @@ class SparkContext(config: SparkConf) extends Logging { } } + /** + * Get the max number of tasks that can be concurrent launched currently. --- End diff -- How about like this? ``` * Get the max number of tasks that can be concurrently launched when the method is called. * Note that please don't cache the value returned by this method, because the number can be * changed due to adding/removing executors. ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r208067280 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala --- @@ -453,4 +453,8 @@ private[spark] class MesosFineGrainedSchedulerBackend( super.applicationId } + override def getNumSlots(): Int = { +// TODO support this method for MesosFineGrainedSchedulerBackend --- End diff -- @jiangxb1987 Could you create a JIRA and link here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r208066204 --- Diff: core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala --- @@ -185,4 +185,56 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext testSubmitJob(sc, rdd, message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION) } + --- End diff -- We need a test verifies if total slots are good but some are running other jobs, we shouldn't fail the barrier job. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r208067143 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -402,6 +403,18 @@ class DAGScheduler( } } + /** + * Check whether the barrier stage requires more slots (to be able to launch all tasks in the + * barrier stage together) than the total number of active slots currently. Fail fast if trying + * to submit a barrier stage that requires more slots than current total number. + */ + private def checkBarrierStageWithNumSlots(rdd: RDD[_]): Unit = { +if (rdd.isBarrier() && rdd.getNumPartitions > sc.getNumSlots) { + throw new SparkException( --- End diff -- We should tolerate temporarily unavailability here by adding a wait. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r208066484 --- Diff: core/src/test/scala/org/apache/spark/SparkContextSuite.scala --- @@ -653,6 +653,10 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu .setMaster("local-cluster[3, 1, 1024]") --- End diff -- Add a unit test for `getNumSlots`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r208065883 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -496,6 +496,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorDataMap.keySet.toSeq } + override def getNumSlots(): Int = { +executorDataMap.values.foldLeft(0) { (num, executor) => + num + executor.totalCores / scheduler.CPUS_PER_TASK --- End diff -- ~~~scala executorDataMap.values.map { executor => executor.totalCores / scheduler.CPUS_PER_TASK }.sum ~~~ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r208065660 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1597,6 +1597,15 @@ class SparkContext(config: SparkConf) extends Logging { } } + /** + * Get the number of currently active slots (total number of tasks can be launched currently). + * Note that please don't cache the value returned by this method, because the number can change + * due to add/remove executors. + * + * @return The number of tasks can be launched currently. + */ + private[spark] def getNumSlots(): Int = schedulerBackend.getNumSlots() --- End diff -- How about `maxConcurrentTasks`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r208066391 --- Diff: core/src/test/scala/org/apache/spark/SparkContextSuite.scala --- @@ -653,6 +653,10 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu .setMaster("local-cluster[3, 1, 1024]") .setAppName("test-cluster") sc = new SparkContext(conf) +eventually(timeout(5.seconds)) { --- End diff -- move this wait code to barrier suite, because it is only required there --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r207834660 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -496,6 +496,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorDataMap.keySet.toSeq } + override def getNumSlots(): Int = { +executorDataMap.values.foldLeft(0) { (num, executor) => + num + executor.totalCores / scheduler.CPUS_PER_TASK --- End diff -- As mentioned in the method description of `SchedulerBackend.getNumSlots()`: ``` * Note that please don't cache the value returned by this method, because the number can change * due to add/remove executors. ``` It shall be fine to cache that within different stages of a job, but it requires a few more changes that will make the current PR more complicated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r207833047 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala --- @@ -453,4 +453,8 @@ private[spark] class MesosFineGrainedSchedulerBackend( super.applicationId } + override def getNumSlots(): Int = { +// TODO support this method for MesosFineGrainedSchedulerBackend --- End diff -- Only `MesosFineGrainedSchedulerBackend` shall break, we still support `MesosCoarseGrainedSchedulerBackend` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r207745122 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala --- @@ -453,4 +453,8 @@ private[spark] class MesosFineGrainedSchedulerBackend( super.applicationId } + override def getNumSlots(): Int = { +// TODO support this method for MesosFineGrainedSchedulerBackend --- End diff -- but finegrained is being deprecated... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r207745108 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala --- @@ -453,4 +453,8 @@ private[spark] class MesosFineGrainedSchedulerBackend( super.applicationId } + override def getNumSlots(): Int = { +// TODO support this method for MesosFineGrainedSchedulerBackend --- End diff -- so this breaks barrier execution on mesos completely? (since available slot is 0 it will just fail) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r207745157 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -496,6 +496,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorDataMap.keySet.toSeq } + override def getNumSlots(): Int = { +executorDataMap.values.foldLeft(0) { (num, executor) => + num + executor.totalCores / scheduler.CPUS_PER_TASK --- End diff -- should this be saved instead of re-compute on each stage? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
GitHub user jiangxb1987 opened a pull request: https://github.com/apache/spark/pull/22001 [SPARK-24819][CORE] Fail fast when no enough slots to launch the barrier stage on job submitted ## What changes were proposed in this pull request? We shall check whether the barrier stage requires more slots (to be able to launch all tasks in the barrier stage together) than the total number of active slots currently, and fail fast if trying to submit a barrier stage that requires more slots than current total number. This PR proposes to add a new method `getNumSlots()` to try to get the total number of currently active slots in `SchedulerBackend`, support of this new method has been added to all the first-class scheduler backends except `MesosFineGrainedSchedulerBackend`. ## How was this patch tested? Added new test cases in `BarrierStageOnSubmittedSuite`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jiangxb1987/spark SPARK-24819 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22001.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22001 commit 52530052c896862748a86a1b77455f31534b6045 Author: Xingbo Jiang Date: 2018-08-05T15:47:05Z Fail fast when no enough slots to launch the barrier stage on job submitted --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org