spark git commit: [SPARK-23433][CORE] Late zombie task completions update all tasksets
Repository: spark Updated Branches: refs/heads/branch-2.2 4f1ae3af9 -> 154bbc959 [SPARK-23433][CORE] Late zombie task completions update all tasksets Fetch failure lead to multiple tasksets which are active for a given stage. While there is only one "active" version of the taskset, the earlier attempts can still have running tasks, which can complete successfully. So a task completion needs to update every taskset so that it knows the partition is completed. That way the final active taskset does not try to submit another task for the same partition, and so that it knows when it is completed and when it should be marked as a "zombie". Added a regression test. Author: Imran Rashid Closes #21131 from squito/SPARK-23433. (cherry picked from commit 94641fe6cc68e5977dd8663b8f232a287a783acb) Signed-off-by: Imran Rashid Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/154bbc95 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/154bbc95 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/154bbc95 Branch: refs/heads/branch-2.2 Commit: 154bbc95989b34b2007022b6082ed17864bbaa32 Parents: 4f1ae3a Author: Imran Rashid Authored: Thu May 3 10:59:18 2018 -0500 Committer: Imran Rashid Committed: Thu May 3 10:59:45 2018 -0500 -- .../spark/scheduler/TaskSchedulerImpl.scala | 14 +++ .../apache/spark/scheduler/TaskSetManager.scala | 20 +++- .../scheduler/TaskSchedulerImplSuite.scala | 104 +++ 3 files changed, 137 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/154bbc95/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 1b6bc91..df6407b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -693,6 +693,20 @@ private[spark] class TaskSchedulerImpl private[scheduler]( } } + /** + * Marks the task has completed in all TaskSetManagers for the given stage. + * + * After stage failure and retry, there may be multiple TaskSetManagers for the stage. + * If an earlier attempt of a stage completes a task, we should ensure that the later attempts + * do not also submit those same tasks. That also means that a task completion from an earlier + * attempt can lead to the entire stage getting marked as successful. + */ + private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, partitionId: Int) = { +taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => + tsm.markPartitionCompleted(partitionId) +} + } + } http://git-wip-us.apache.org/repos/asf/spark/blob/154bbc95/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 2f4e46c..d9515fb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -74,6 +74,8 @@ private[spark] class TaskSetManager( val ser = env.closureSerializer.newInstance() val tasks = taskSet.tasks + private[scheduler] val partitionToIndex = tasks.zipWithIndex +.map { case (t, idx) => t.partitionId -> idx }.toMap val numTasks = tasks.length val copiesRunning = new Array[Int](numTasks) @@ -149,7 +151,7 @@ private[spark] class TaskSetManager( private[scheduler] val speculatableTasks = new HashSet[Int] // Task index, start and finish time for each task attempt (indexed by task ID) - private val taskInfos = new HashMap[Long, TaskInfo] + private[scheduler] val taskInfos = new HashMap[Long, TaskInfo] // Use a MedianHeap to record durations of successful tasks so we know when to launch // speculative tasks. This is only used when speculation is enabled, to avoid the overhead @@ -744,6 +746,9 @@ private[spark] class TaskSetManager( logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + " because task " + index + " has already completed successfully") } +// There may be multiple tasksets for this stage -- we let all of them know that the partition +// was completed. This may result in some of the tasksets getting completed. +sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId) // This method is called by "TaskSchedulerImpl.
spark git commit: [SPARK-23433][CORE] Late zombie task completions update all tasksets
Repository: spark Updated Branches: refs/heads/branch-2.3 61e7bc0c1 -> 8509284e1 [SPARK-23433][CORE] Late zombie task completions update all tasksets Fetch failure lead to multiple tasksets which are active for a given stage. While there is only one "active" version of the taskset, the earlier attempts can still have running tasks, which can complete successfully. So a task completion needs to update every taskset so that it knows the partition is completed. That way the final active taskset does not try to submit another task for the same partition, and so that it knows when it is completed and when it should be marked as a "zombie". Added a regression test. Author: Imran Rashid Closes #21131 from squito/SPARK-23433. (cherry picked from commit 94641fe6cc68e5977dd8663b8f232a287a783acb) Signed-off-by: Imran Rashid Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8509284e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8509284e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8509284e Branch: refs/heads/branch-2.3 Commit: 8509284e1ec048d5afa87d41071c0429924e45c9 Parents: 61e7bc0 Author: Imran Rashid Authored: Thu May 3 10:59:18 2018 -0500 Committer: Imran Rashid Committed: Thu May 3 10:59:30 2018 -0500 -- .../spark/scheduler/TaskSchedulerImpl.scala | 14 +++ .../apache/spark/scheduler/TaskSetManager.scala | 20 +++- .../scheduler/TaskSchedulerImplSuite.scala | 104 +++ 3 files changed, 137 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8509284e/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 0c11806..8e97b3d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -689,6 +689,20 @@ private[spark] class TaskSchedulerImpl( } } + /** + * Marks the task has completed in all TaskSetManagers for the given stage. + * + * After stage failure and retry, there may be multiple TaskSetManagers for the stage. + * If an earlier attempt of a stage completes a task, we should ensure that the later attempts + * do not also submit those same tasks. That also means that a task completion from an earlier + * attempt can lead to the entire stage getting marked as successful. + */ + private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, partitionId: Int) = { +taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => + tsm.markPartitionCompleted(partitionId) +} + } + } http://git-wip-us.apache.org/repos/asf/spark/blob/8509284e/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index c3ed11b..b52e376 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -74,6 +74,8 @@ private[spark] class TaskSetManager( val ser = env.closureSerializer.newInstance() val tasks = taskSet.tasks + private[scheduler] val partitionToIndex = tasks.zipWithIndex +.map { case (t, idx) => t.partitionId -> idx }.toMap val numTasks = tasks.length val copiesRunning = new Array[Int](numTasks) @@ -154,7 +156,7 @@ private[spark] class TaskSetManager( private[scheduler] val speculatableTasks = new HashSet[Int] // Task index, start and finish time for each task attempt (indexed by task ID) - private val taskInfos = new HashMap[Long, TaskInfo] + private[scheduler] val taskInfos = new HashMap[Long, TaskInfo] // Use a MedianHeap to record durations of successful tasks so we know when to launch // speculative tasks. This is only used when speculation is enabled, to avoid the overhead @@ -755,6 +757,9 @@ private[spark] class TaskSetManager( logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + " because task " + index + " has already completed successfully") } +// There may be multiple tasksets for this stage -- we let all of them know that the partition +// was completed. This may result in some of the tasksets getting completed. +sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId) // This method is called by "TaskSchedulerImpl.handleSuccessfulTas
spark git commit: [SPARK-23433][CORE] Late zombie task completions update all tasksets
Repository: spark Updated Branches: refs/heads/master 96a50016b -> 94641fe6c [SPARK-23433][CORE] Late zombie task completions update all tasksets Fetch failure lead to multiple tasksets which are active for a given stage. While there is only one "active" version of the taskset, the earlier attempts can still have running tasks, which can complete successfully. So a task completion needs to update every taskset so that it knows the partition is completed. That way the final active taskset does not try to submit another task for the same partition, and so that it knows when it is completed and when it should be marked as a "zombie". Added a regression test. Author: Imran Rashid Closes #21131 from squito/SPARK-23433. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94641fe6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94641fe6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94641fe6 Branch: refs/heads/master Commit: 94641fe6cc68e5977dd8663b8f232a287a783acb Parents: 96a5001 Author: Imran Rashid Authored: Thu May 3 10:59:18 2018 -0500 Committer: Imran Rashid Committed: Thu May 3 10:59:18 2018 -0500 -- .../spark/scheduler/TaskSchedulerImpl.scala | 14 +++ .../apache/spark/scheduler/TaskSetManager.scala | 20 +++- .../scheduler/TaskSchedulerImplSuite.scala | 104 +++ 3 files changed, 137 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/94641fe6/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 0c11806..8e97b3d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -689,6 +689,20 @@ private[spark] class TaskSchedulerImpl( } } + /** + * Marks the task has completed in all TaskSetManagers for the given stage. + * + * After stage failure and retry, there may be multiple TaskSetManagers for the stage. + * If an earlier attempt of a stage completes a task, we should ensure that the later attempts + * do not also submit those same tasks. That also means that a task completion from an earlier + * attempt can lead to the entire stage getting marked as successful. + */ + private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, partitionId: Int) = { +taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => + tsm.markPartitionCompleted(partitionId) +} + } + } http://git-wip-us.apache.org/repos/asf/spark/blob/94641fe6/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 8a96a76..195fc80 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -73,6 +73,8 @@ private[spark] class TaskSetManager( val ser = env.closureSerializer.newInstance() val tasks = taskSet.tasks + private[scheduler] val partitionToIndex = tasks.zipWithIndex +.map { case (t, idx) => t.partitionId -> idx }.toMap val numTasks = tasks.length val copiesRunning = new Array[Int](numTasks) @@ -153,7 +155,7 @@ private[spark] class TaskSetManager( private[scheduler] val speculatableTasks = new HashSet[Int] // Task index, start and finish time for each task attempt (indexed by task ID) - private val taskInfos = new HashMap[Long, TaskInfo] + private[scheduler] val taskInfos = new HashMap[Long, TaskInfo] // Use a MedianHeap to record durations of successful tasks so we know when to launch // speculative tasks. This is only used when speculation is enabled, to avoid the overhead @@ -754,6 +756,9 @@ private[spark] class TaskSetManager( logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + " because task " + index + " has already completed successfully") } +// There may be multiple tasksets for this stage -- we let all of them know that the partition +// was completed. This may result in some of the tasksets getting completed. +sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId) // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we shou