This is an automated email from the ASF dual-hosted git repository. irashid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d124ce9 [SPARK-27590][CORE] do not consider skipped tasks when scheduling speculative tasks d124ce9 is described below commit d124ce9c7ea537c59a776c05977c9f918d38febc Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Tue May 7 12:02:08 2019 -0500 [SPARK-27590][CORE] do not consider skipped tasks when scheduling speculative tasks ## What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/24375 When `TaskSetManager` skips a task because its corresponding partition is already completed by other `TaskSetManager`s, we should not consider the duration of the task that is finished by other `TaskSetManager`s to schedule the speculative tasks of this `TaskSetManager`. ## How was this patch tested? updated test case Closes #24485 from cloud-fan/minor. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Imran Rashid <iras...@cloudera.com> --- .../org/apache/spark/scheduler/DAGScheduler.scala | 3 +-- .../apache/spark/scheduler/TaskResultGetter.scala | 5 ++--- .../org/apache/spark/scheduler/TaskScheduler.scala | 2 +- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 12 ++++------- .../apache/spark/scheduler/TaskSetManager.scala | 24 +++++++++++----------- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 6 ++---- .../scheduler/ExternalClusterManagerSuite.scala | 3 +-- .../spark/scheduler/TaskSetManagerSuite.scala | 4 ++-- 8 files changed, 25 insertions(+), 34 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b817eb6..1d4972e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1394,8 +1394,7 @@ private[spark] class DAGScheduler( // finished. Here we notify the task scheduler to skip running tasks for the same partition, // to save resource. if (task.stageAttemptId < stage.latestInfo.attemptNumber()) { - taskScheduler.notifyPartitionCompletion( - stageId, task.partitionId, event.taskInfo.duration) + taskScheduler.notifyPartitionCompletion(stageId, task.partitionId) } task match { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index 09c4d9b..9b7f901 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -158,10 +158,9 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul // This method calls `TaskSchedulerImpl.handlePartitionCompleted` asynchronously. We do not want // DAGScheduler to call `TaskSchedulerImpl.handlePartitionCompleted` directly, as it's // synchronized and may hurt the throughput of the scheduler. - def enqueuePartitionCompletionNotification( - stageId: Int, partitionId: Int, taskDuration: Long): Unit = { + def enqueuePartitionCompletionNotification(stageId: Int, partitionId: Int): Unit = { getTaskResultExecutor.execute(() => Utils.logUncaughtExceptions { - scheduler.handlePartitionCompleted(stageId, partitionId, taskDuration) + scheduler.handlePartitionCompleted(stageId, partitionId) }) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 1862e16..bfdbf02 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -70,7 +70,7 @@ private[spark] trait TaskScheduler { // Notify the corresponding `TaskSetManager`s of the stage, that a partition has already completed // and they can skip running tasks for it. - def notifyPartitionCompletion(stageId: Int, partitionId: Int, taskDuration: Long) + def notifyPartitionCompletion(stageId: Int, partitionId: Int) // Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called. def setDAGScheduler(dagScheduler: DAGScheduler): Unit diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 7e820c3..532eb32 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -301,9 +301,8 @@ private[spark] class TaskSchedulerImpl( } } - override def notifyPartitionCompletion( - stageId: Int, partitionId: Int, taskDuration: Long): Unit = { - taskResultGetter.enqueuePartitionCompletionNotification(stageId, partitionId, taskDuration) + override def notifyPartitionCompletion(stageId: Int, partitionId: Int): Unit = { + taskResultGetter.enqueuePartitionCompletionNotification(stageId, partitionId) } /** @@ -651,12 +650,9 @@ private[spark] class TaskSchedulerImpl( * means that a task completion from an earlier zombie attempt can lead to the entire stage * getting marked as successful. */ - private[scheduler] def handlePartitionCompleted( - stageId: Int, - partitionId: Int, - taskDuration: Long) = synchronized { + private[scheduler] def handlePartitionCompleted(stageId: Int, partitionId: Int) = synchronized { taskSetsByStageIdAndAttempt.get(stageId).foreach(_.values.filter(!_.isZombie).foreach { tsm => - tsm.markPartitionCompleted(partitionId, taskDuration) + tsm.markPartitionCompleted(partitionId) }) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index b3aa814..52323b3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -62,14 +62,8 @@ private[spark] class TaskSetManager( private val addedJars = HashMap[String, Long](sched.sc.addedJars.toSeq: _*) private val addedFiles = HashMap[String, Long](sched.sc.addedFiles.toSeq: _*) - // Quantile of tasks at which to start speculation - val speculationQuantile = conf.get(SPECULATION_QUANTILE) - val speculationMultiplier = conf.get(SPECULATION_MULTIPLIER) - val maxResultSize = conf.get(config.MAX_RESULT_SIZE) - val speculationEnabled = conf.get(SPECULATION_ENABLED) - // Serializer for closures and tasks. val env = SparkEnv.get val ser = env.closureSerializer.newInstance() @@ -80,6 +74,12 @@ private[spark] class TaskSetManager( val numTasks = tasks.length val copiesRunning = new Array[Int](numTasks) + val speculationEnabled = conf.get(SPECULATION_ENABLED) + // Quantile of tasks at which to start speculation + val speculationQuantile = conf.get(SPECULATION_QUANTILE) + val speculationMultiplier = conf.get(SPECULATION_MULTIPLIER) + val minFinishedForSpeculation = math.max((speculationQuantile * numTasks).floor.toInt, 1) + // For each task, tracks whether a copy of the task has succeeded. A task will also be // marked as "succeeded" if it failed with a fetch failure, in which case it should not // be re-run because the missing map data needs to be regenerated first. @@ -816,12 +816,9 @@ private[spark] class TaskSetManager( maybeFinishTaskSet() } - private[scheduler] def markPartitionCompleted(partitionId: Int, taskDuration: Long): Unit = { + private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = { partitionToIndex.get(partitionId).foreach { index => if (!successful(index)) { - if (speculationEnabled && !isZombie) { - successfulTaskDurations.insert(taskDuration) - } tasksSuccessful += 1 successful(index) = true if (tasksSuccessful == numTasks) { @@ -1035,10 +1032,13 @@ private[spark] class TaskSetManager( return false } var foundTasks = false - val minFinishedForSpeculation = (speculationQuantile * numTasks).floor.toInt logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation) - if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) { + // It's possible that a task is marked as completed by the scheduler, then the size of + // `successfulTaskDurations` may not equal to `tasksSuccessful`. Here we should only count the + // tasks that are submitted by this `TaskSetManager` and are completed successfully. + val numSuccessfulTasks = successfulTaskDurations.size() + if (numSuccessfulTasks >= minFinishedForSpeculation) { val time = clock.getTimeMillis() val medianDuration = successfulTaskDurations.median val threshold = max(speculationMultiplier * medianDuration, minTimeToSpeculation) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 749e47c..d58ee4e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -157,8 +157,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi taskId: Long, interruptThread: Boolean, reason: String): Boolean = false override def killAllTaskAttempts( stageId: Int, interruptThread: Boolean, reason: String): Unit = {} - override def notifyPartitionCompletion( - stageId: Int, partitionId: Int, taskDuration: Long): Unit = { + override def notifyPartitionCompletion(stageId: Int, partitionId: Int): Unit = { taskSets.filter(_.stageId == stageId).lastOption.foreach { ts => val tasks = ts.tasks.filter(_.partitionId == partitionId) assert(tasks.length == 1) @@ -668,8 +667,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi stageId: Int, interruptThread: Boolean, reason: String): Unit = { throw new UnsupportedOperationException } - override def notifyPartitionCompletion( - stageId: Int, partitionId: Int, taskDuration: Long): Unit = { + override def notifyPartitionCompletion(stageId: Int, partitionId: Int): Unit = { throw new UnsupportedOperationException } override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {} diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index 347064d..ead34e5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -84,8 +84,7 @@ private class DummyTaskScheduler extends TaskScheduler { taskId: Long, interruptThread: Boolean, reason: String): Boolean = false override def killAllTaskAttempts( stageId: Int, interruptThread: Boolean, reason: String): Unit = {} - override def notifyPartitionCompletion( - stageId: Int, partitionId: Int, taskDuration: Long): Unit = {} + override def notifyPartitionCompletion(stageId: Int, partitionId: Int): Unit = {} override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {} override def defaultParallelism(): Int = 2 override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 0666bc3..72c6ab9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1394,8 +1394,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val taskSetManager = sched.taskSetManagerForAttempt(0, 0).get assert(taskSetManager.runningTasks === 8) - taskSetManager.markPartitionCompleted(8, 0) - assert(!taskSetManager.successfulTaskDurations.isEmpty()) + taskSetManager.markPartitionCompleted(8) + assert(taskSetManager.successfulTaskDurations.isEmpty()) taskSetManager.checkSpeculatableTasks(0) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org