spark git commit: [SPARK-24677][CORE] Avoid NoSuchElementException from MedianHeap
Repository: spark Updated Branches: refs/heads/branch-2.2 17db57213 -> 144426cff [SPARK-24677][CORE] Avoid NoSuchElementException from MedianHeap ## What changes were proposed in this pull request? When speculation is enabled, TaskSetManager#markPartitionCompleted should write successful task duration to MedianHeap, not just increase tasksSuccessful. Otherwise when TaskSetManager#checkSpeculatableTasks,tasksSuccessful non-zero, but MedianHeap is empty. Then throw an exception successfulTaskDurations.median java.util.NoSuchElementException: MedianHeap is empty. Finally led to stopping SparkContext. ## How was this patch tested? TaskSetManagerSuite.scala unit test:[SPARK-24677] MedianHeap should not be empty when speculation is enabled Author: sychen Closes #21656 from cxzl25/fix_MedianHeap_empty. (cherry picked from commit c8bee932cb644627c4049b5a07dd8028968572d9) Signed-off-by: Thomas Graves Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/144426cf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/144426cf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/144426cf Branch: refs/heads/branch-2.2 Commit: 144426cffd6e4b26b676004f5489e218140f7df2 Parents: 17db572 Author: sychen Authored: Wed Jul 18 13:24:41 2018 -0500 Committer: Thomas Graves Committed: Wed Jul 18 13:26:24 2018 -0500 -- .../spark/scheduler/TaskSchedulerImpl.scala | 7 ++- .../apache/spark/scheduler/TaskSetManager.scala | 7 ++- .../spark/scheduler/TaskSetManagerSuite.scala | 49 3 files changed, 59 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/144426cf/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index df6407b..f8c62b4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -701,9 +701,12 @@ private[spark] class TaskSchedulerImpl private[scheduler]( * do not also submit those same tasks. That also means that a task completion from an earlier * attempt can lead to the entire stage getting marked as successful. */ - private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, partitionId: Int) = { + private[scheduler] def markPartitionCompletedInAllTaskSets( + stageId: Int, + partitionId: Int, + taskInfo: TaskInfo) = { taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => - tsm.markPartitionCompleted(partitionId) + tsm.markPartitionCompleted(partitionId, taskInfo) } } http://git-wip-us.apache.org/repos/asf/spark/blob/144426cf/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index d9515fb..705b896 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -748,7 +748,7 @@ private[spark] class TaskSetManager( } // There may be multiple tasksets for this stage -- we let all of them know that the partition // was completed. This may result in some of the tasksets getting completed. -sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId) +sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId, info) // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not // "deserialize" the value when holding a lock to avoid blocking other threads. So we call @@ -759,9 +759,12 @@ private[spark] class TaskSetManager( maybeFinishTaskSet() } - private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = { + private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: TaskInfo): Unit = { partitionToIndex.get(partitionId).foreach { index => if (!successful(index)) { +if (speculationEnabled && !isZombie) { + successfulTaskDurations.insert(taskInfo.duration) +} tasksSuccessful += 1 successful(index) = true if (tasksSuccessful == numTasks) { http://git-wip-us.apache.org/repos/asf/spark/blob/144426cf/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
spark git commit: [SPARK-24677][CORE] Avoid NoSuchElementException from MedianHeap
Repository: spark Updated Branches: refs/heads/branch-2.3 e31b4766b -> 7be70e29d [SPARK-24677][CORE] Avoid NoSuchElementException from MedianHeap ## What changes were proposed in this pull request? When speculation is enabled, TaskSetManager#markPartitionCompleted should write successful task duration to MedianHeap, not just increase tasksSuccessful. Otherwise when TaskSetManager#checkSpeculatableTasks,tasksSuccessful non-zero, but MedianHeap is empty. Then throw an exception successfulTaskDurations.median java.util.NoSuchElementException: MedianHeap is empty. Finally led to stopping SparkContext. ## How was this patch tested? TaskSetManagerSuite.scala unit test:[SPARK-24677] MedianHeap should not be empty when speculation is enabled Author: sychen Closes #21656 from cxzl25/fix_MedianHeap_empty. (cherry picked from commit c8bee932cb644627c4049b5a07dd8028968572d9) Signed-off-by: Thomas Graves Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7be70e29 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7be70e29 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7be70e29 Branch: refs/heads/branch-2.3 Commit: 7be70e29dd92de36dbb30ce39623d588f48e4cac Parents: e31b476 Author: sychen Authored: Wed Jul 18 13:24:41 2018 -0500 Committer: Thomas Graves Committed: Wed Jul 18 13:24:54 2018 -0500 -- .../spark/scheduler/TaskSchedulerImpl.scala | 7 ++- .../apache/spark/scheduler/TaskSetManager.scala | 7 ++- .../spark/scheduler/TaskSetManagerSuite.scala | 49 3 files changed, 59 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7be70e29/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 598b62f..56c0bf6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -697,9 +697,12 @@ private[spark] class TaskSchedulerImpl( * do not also submit those same tasks. That also means that a task completion from an earlier * attempt can lead to the entire stage getting marked as successful. */ - private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, partitionId: Int) = { + private[scheduler] def markPartitionCompletedInAllTaskSets( + stageId: Int, + partitionId: Int, + taskInfo: TaskInfo) = { taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => - tsm.markPartitionCompleted(partitionId) + tsm.markPartitionCompleted(partitionId, taskInfo) } } http://git-wip-us.apache.org/repos/asf/spark/blob/7be70e29/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index b52e376..c90ae50 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -759,7 +759,7 @@ private[spark] class TaskSetManager( } // There may be multiple tasksets for this stage -- we let all of them know that the partition // was completed. This may result in some of the tasksets getting completed. -sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId) +sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId, info) // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not // "deserialize" the value when holding a lock to avoid blocking other threads. So we call @@ -770,9 +770,12 @@ private[spark] class TaskSetManager( maybeFinishTaskSet() } - private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = { + private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: TaskInfo): Unit = { partitionToIndex.get(partitionId).foreach { index => if (!successful(index)) { +if (speculationEnabled && !isZombie) { + successfulTaskDurations.insert(taskInfo.duration) +} tasksSuccessful += 1 successful(index) = true if (tasksSuccessful == numTasks) { http://git-wip-us.apache.org/repos/asf/spark/blob/7be70e29/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
spark git commit: [SPARK-24677][CORE] Avoid NoSuchElementException from MedianHeap
Repository: spark Updated Branches: refs/heads/master fc0c8c971 -> c8bee932c [SPARK-24677][CORE] Avoid NoSuchElementException from MedianHeap ## What changes were proposed in this pull request? When speculation is enabled, TaskSetManager#markPartitionCompleted should write successful task duration to MedianHeap, not just increase tasksSuccessful. Otherwise when TaskSetManager#checkSpeculatableTasks,tasksSuccessful non-zero, but MedianHeap is empty. Then throw an exception successfulTaskDurations.median java.util.NoSuchElementException: MedianHeap is empty. Finally led to stopping SparkContext. ## How was this patch tested? TaskSetManagerSuite.scala unit test:[SPARK-24677] MedianHeap should not be empty when speculation is enabled Author: sychen Closes #21656 from cxzl25/fix_MedianHeap_empty. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c8bee932 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c8bee932 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c8bee932 Branch: refs/heads/master Commit: c8bee932cb644627c4049b5a07dd8028968572d9 Parents: fc0c8c9 Author: sychen Authored: Wed Jul 18 13:24:41 2018 -0500 Committer: Thomas Graves Committed: Wed Jul 18 13:24:41 2018 -0500 -- .../spark/scheduler/TaskSchedulerImpl.scala | 7 ++- .../apache/spark/scheduler/TaskSetManager.scala | 7 ++- .../spark/scheduler/TaskSetManagerSuite.scala | 49 3 files changed, 59 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c8bee932/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 598b62f..56c0bf6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -697,9 +697,12 @@ private[spark] class TaskSchedulerImpl( * do not also submit those same tasks. That also means that a task completion from an earlier * attempt can lead to the entire stage getting marked as successful. */ - private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, partitionId: Int) = { + private[scheduler] def markPartitionCompletedInAllTaskSets( + stageId: Int, + partitionId: Int, + taskInfo: TaskInfo) = { taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => - tsm.markPartitionCompleted(partitionId) + tsm.markPartitionCompleted(partitionId, taskInfo) } } http://git-wip-us.apache.org/repos/asf/spark/blob/c8bee932/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index a18c665..6071605 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -758,7 +758,7 @@ private[spark] class TaskSetManager( } // There may be multiple tasksets for this stage -- we let all of them know that the partition // was completed. This may result in some of the tasksets getting completed. -sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId) +sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId, info) // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not // "deserialize" the value when holding a lock to avoid blocking other threads. So we call @@ -769,9 +769,12 @@ private[spark] class TaskSetManager( maybeFinishTaskSet() } - private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = { + private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: TaskInfo): Unit = { partitionToIndex.get(partitionId).foreach { index => if (!successful(index)) { +if (speculationEnabled && !isZombie) { + successfulTaskDurations.insert(taskInfo.duration) +} tasksSuccessful += 1 successful(index) = true if (tasksSuccessful == numTasks) { http://git-wip-us.apache.org/repos/asf/spark/blob/c8bee932/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala -- diff --git