[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22771#discussion_r230730694 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1364,6 +1385,21 @@ private[spark] class DAGScheduler( if (job.numFinished == job.numPartitions) { markStageAsFinished(resultStage) cleanupStateForJobAndIndependentStages(job) +try { + // killAllTaskAttempts will fail if a SchedulerBackend does not implement + // killTask. + logInfo(s"Job ${job.jobId} is finished. Cancelling potential speculative " + +"or zombie tasks for this job") + // ResultStage is only used by this job. It's safe to kill speculative or + // zombie tasks in this stage. + taskScheduler.killAllTaskAttempts( --- End diff -- cc @jiangxb1987 IIRC we have some similar code in barrier execution. Shall we create a util method to safely kill tasks? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22771 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22771#discussion_r228409787 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1364,6 +1385,21 @@ private[spark] class DAGScheduler( if (job.numFinished == job.numPartitions) { markStageAsFinished(resultStage) cleanupStateForJobAndIndependentStages(job) +try { + // killAllTaskAttempts will fail if a SchedulerBackend does not implement + // killTask. + logInfo(s"Job ${job.jobId} is finished. Killing potential speculative or " + +s"zombie tasks for this job") --- End diff -- I created https://issues.apache.org/jira/browse/SPARK-25849 to improve the document. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/22771#discussion_r228371144 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1364,6 +1385,21 @@ private[spark] class DAGScheduler( if (job.numFinished == job.numPartitions) { markStageAsFinished(resultStage) cleanupStateForJobAndIndependentStages(job) +try { + // killAllTaskAttempts will fail if a SchedulerBackend does not implement + // killTask. + logInfo(s"Job ${job.jobId} is finished. Killing potential speculative or " + +s"zombie tasks for this job") --- End diff -- Yes, other log messages probably also aren't very good. Maybe what we need is some additional explanation in the docs somewhere. The issue that I am having is that if the log messages say that Tasks are being killed or canceled or whatever, many users will assume that that means that the Tasks will no longer be running on the Executors. In fact, what it means is that the DAGScheduler isn't going to try to run them anymore, and that any previously started Tasks may or may not still be running or continue to run on the Executors -- it depends on whether the Tasks are interruptible and on whether the interrupt on cancel configuration is set to true. The log messages make sense if you understand that subtlety, so we should probably try to explain it more fully in the docs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22771#discussion_r228355772 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1364,6 +1385,21 @@ private[spark] class DAGScheduler( if (job.numFinished == job.numPartitions) { markStageAsFinished(resultStage) cleanupStateForJobAndIndependentStages(job) +try { + // killAllTaskAttempts will fail if a SchedulerBackend does not implement + // killTask. + logInfo(s"Job ${job.jobId} is finished. Killing potential speculative or " + +s"zombie tasks for this job") --- End diff -- How about `... is finished. Cancelling ...`? This should be consistent with other places. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/22771#discussion_r228354020 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1364,6 +1385,21 @@ private[spark] class DAGScheduler( if (job.numFinished == job.numPartitions) { markStageAsFinished(resultStage) cleanupStateForJobAndIndependentStages(job) +try { + // killAllTaskAttempts will fail if a SchedulerBackend does not implement + // killTask. + logInfo(s"Job ${job.jobId} is finished. Killing potential speculative or " + +s"zombie tasks for this job") --- End diff -- Isn't this misleading/confusing if !shouldInterruptTaskThread? You can attempt to kill speculative or zombie Tasks in that case, but nothing will actually happen if SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL is false. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22771#discussion_r227503789 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1364,6 +1385,19 @@ private[spark] class DAGScheduler( if (job.numFinished == job.numPartitions) { markStageAsFinished(resultStage) cleanupStateForJobAndIndependentStages(job) +try { // cancelTasks will fail if a SchedulerBackend does not implement killTask --- End diff -- comment "cancelTasks" no longer valid --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22771#discussion_r227459990 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1364,6 +1385,16 @@ private[spark] class DAGScheduler( if (job.numFinished == job.numPartitions) { markStageAsFinished(resultStage) cleanupStateForJobAndIndependentStages(job) +try { // cancelTasks will fail if a SchedulerBackend does not implement killTask + logInfo( +s"Job ${job.jobId} is finished. Killing speculative tasks for this job") + // ResultStage is only used by this job. It's safe to kill speculative or + // zombie tasks in this stage. + taskScheduler.cancelTasks(stageId, shouldInterruptTaskThread(job)) --- End diff -- IIRC `cancelTasks()` will fail the stage (maybe it's okay here coz the stage has been marked completed), if we just want to kill speculative/zombie tasks then maybe we shall call `killAllTaskAttempts()` ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22771#discussion_r227060028 --- Diff: core/src/test/scala/org/apache/spark/SparkContextSuite.scala --- @@ -672,6 +674,55 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu assert(sc.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0) } } + + test("cancel zombie tasks in a result stage when the job finishes") { +val conf = new SparkConf() + .setMaster("local-cluster[1,2,1024]") + .setAppName("test-cluster") + .set("spark.ui.enabled", "false") + // Disable this so that if a task is running, we can make sure the executor will always send + // task metrics via heartbeat to driver. + .set(EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES.key, "false") + // Set a short heartbeat interval to send SparkListenerExecutorMetricsUpdate fast + .set("spark.executor.heartbeatInterval", "1s") +sc = new SparkContext(conf) +sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true") +@volatile var runningTaskIds: Seq[Long] = null +val listener = new SparkListener { + override def onExecutorMetricsUpdate( + executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { +if (executorMetricsUpdate.execId != SparkContext.DRIVER_IDENTIFIER) { + runningTaskIds = executorMetricsUpdate.accumUpdates.map(_._1) +} + } +} +sc.addSparkListener(listener) +sc.range(0, 2).groupBy((x: Long) => x % 2, 2).map { case (x, _) => + val context = org.apache.spark.TaskContext.get() + if (context.stageAttemptNumber == 0) { +if (context.partitionId == 0) { + // Make the first task in the first stage attempt fail. + throw new FetchFailedException(SparkEnv.get.blockManager.blockManagerId, 0, 0, 0, +new java.io.IOException("fake")) +} else { + // Make the second task in the first stage attempt sleep to generate a zombie task + Thread.sleep(6) +} + } else { +// Make the second stage attempt successful. + } + x +}.collect() +sc.listenerBus.waitUntilEmpty(1) +// As executors will send the metrics of running tasks via heartbeat, we can use this to check +// whether there is any running task. --- End diff -- I prefer this way to make sure the executor did receive the kill command and interrupt the tasks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/22771#discussion_r226847830 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1364,6 +1385,16 @@ private[spark] class DAGScheduler( if (job.numFinished == job.numPartitions) { markStageAsFinished(resultStage) cleanupStateForJobAndIndependentStages(job) +try { // cancelTasks will fail if a SchedulerBackend does not implement killTask + logInfo( +s"Job ${job.jobId} is finished. Killing speculative tasks for this job") + // ResultStage is only used by this job. It's safe to kill speculative or + // zombie tasks in this stage. + taskScheduler.cancelTasks(stageId, shouldInterruptTaskThread(job)) --- End diff -- Thanks, @zsxwing . This looks promising to reduce the flakiness in some test suite. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22771#discussion_r226818782 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1364,6 +1385,16 @@ private[spark] class DAGScheduler( if (job.numFinished == job.numPartitions) { markStageAsFinished(resultStage) cleanupStateForJobAndIndependentStages(job) +try { // cancelTasks will fail if a SchedulerBackend does not implement killTask + logInfo( +s"Job ${job.jobId} is finished. Killing speculative tasks for this job") + // ResultStage is only used by this job. It's safe to kill speculative or + // zombie tasks in this stage. + taskScheduler.cancelTasks(stageId, shouldInterruptTaskThread(job)) +} catch { + case e: UnsupportedOperationException => +logInfo(s"Could not cancel tasks for stage $stageId", e) --- End diff -- logWarn? aren't we leaking tasks? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22771#discussion_r226651308 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1364,6 +1385,16 @@ private[spark] class DAGScheduler( if (job.numFinished == job.numPartitions) { markStageAsFinished(resultStage) cleanupStateForJobAndIndependentStages(job) +try { // cancelTasks will fail if a SchedulerBackend does not implement killTask + logInfo( +s"Job ${job.jobId} is finished. Killing speculative tasks for this job") --- End diff -- message should be updated as this should be more then speculative tasks as it could be tasks in other attempts. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22771#discussion_r226522963 --- Diff: core/src/test/scala/org/apache/spark/SparkContextSuite.scala --- @@ -672,6 +674,55 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu assert(sc.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0) } } + + test("cancel zombie tasks in a result stage when the job finishes") { +val conf = new SparkConf() + .setMaster("local-cluster[1,2,1024]") + .setAppName("test-cluster") + .set("spark.ui.enabled", "false") + // Disable this so that if a task is running, we can make sure the executor will always send + // task metrics via heartbeat to driver. + .set(EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES.key, "false") + // Set a short heartbeat interval to send SparkListenerExecutorMetricsUpdate fast + .set("spark.executor.heartbeatInterval", "1s") +sc = new SparkContext(conf) +sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true") +@volatile var runningTaskIds: Seq[Long] = null +val listener = new SparkListener { + override def onExecutorMetricsUpdate( + executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { +if (executorMetricsUpdate.execId != SparkContext.DRIVER_IDENTIFIER) { + runningTaskIds = executorMetricsUpdate.accumUpdates.map(_._1) +} + } +} +sc.addSparkListener(listener) +sc.range(0, 2).groupBy((x: Long) => x % 2, 2).map { case (x, _) => + val context = org.apache.spark.TaskContext.get() + if (context.stageAttemptNumber == 0) { +if (context.partitionId == 0) { + // Make the first task in the first stage attempt fail. + throw new FetchFailedException(SparkEnv.get.blockManager.blockManagerId, 0, 0, 0, +new java.io.IOException("fake")) +} else { + // Make the second task in the first stage attempt sleep to generate a zombie task + Thread.sleep(6) +} + } else { +// Make the second stage attempt successful. + } + x +}.collect() +sc.listenerBus.waitUntilEmpty(1) +// As executors will send the metrics of running tasks via heartbeat, we can use this to check +// whether there is any running task. --- End diff -- any reason to do it this way, rather than using the TaskStart / TaskEnd events for a SparkListener? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/22771 [SPARK-25773][Core]Cancel zombie tasks in a result stage when the job finishes ## What changes were proposed in this pull request? When a job finishes, there may be some zombie tasks still running due to stage retry. Since a result stage will never be used by other jobs, running these tasks are just wasting the cluster resource. This PR just asks TaskScheduler to cancel the running tasks of a result stage when it's already finished. Credits go to @srinathshankar who suggested this idea to me. This PR also fixes two minor issues while I'm touching DAGScheduler: - Invalid spark.job.interruptOnCancel should not crash DAGScheduler. - Non fatal errors should not crash DAGScheduler. ## How was this patch tested? The new unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark SPARK-25773 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22771.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22771 commit 581ea53b57cc9fc0e89f2d635422653cfdfcb27f Author: Shixiong Zhu Date: 2018-10-16T22:07:04Z Cancel zombie tasks in a result stage when the job finishes --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org