spark git commit: [SPARK-24677][CORE] Avoid NoSuchElementException from MedianHeap

2018-07-18 Thread tgraves
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 17db57213 -> 144426cff


[SPARK-24677][CORE] Avoid NoSuchElementException from MedianHeap

## What changes were proposed in this pull request?
When speculation is enabled,
TaskSetManager#markPartitionCompleted should write successful task duration to 
MedianHeap,
not just increase tasksSuccessful.

Otherwise when TaskSetManager#checkSpeculatableTasks,tasksSuccessful non-zero, 
but MedianHeap is empty.
Then throw an exception successfulTaskDurations.median 
java.util.NoSuchElementException: MedianHeap is empty.
Finally led to stopping SparkContext.
## How was this patch tested?
TaskSetManagerSuite.scala
unit test:[SPARK-24677] MedianHeap should not be empty when speculation is 
enabled

Author: sychen 

Closes #21656 from cxzl25/fix_MedianHeap_empty.

(cherry picked from commit c8bee932cb644627c4049b5a07dd8028968572d9)
Signed-off-by: Thomas Graves 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/144426cf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/144426cf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/144426cf

Branch: refs/heads/branch-2.2
Commit: 144426cffd6e4b26b676004f5489e218140f7df2
Parents: 17db572
Author: sychen 
Authored: Wed Jul 18 13:24:41 2018 -0500
Committer: Thomas Graves 
Committed: Wed Jul 18 13:26:24 2018 -0500

--
 .../spark/scheduler/TaskSchedulerImpl.scala |  7 ++-
 .../apache/spark/scheduler/TaskSetManager.scala |  7 ++-
 .../spark/scheduler/TaskSetManagerSuite.scala   | 49 
 3 files changed, 59 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/144426cf/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index df6407b..f8c62b4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -701,9 +701,12 @@ private[spark] class TaskSchedulerImpl private[scheduler](
* do not also submit those same tasks.  That also means that a task 
completion from an  earlier
* attempt can lead to the entire stage getting marked as successful.
*/
-  private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, 
partitionId: Int) = {
+  private[scheduler] def markPartitionCompletedInAllTaskSets(
+  stageId: Int,
+  partitionId: Int,
+  taskInfo: TaskInfo) = {
 taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm 
=>
-  tsm.markPartitionCompleted(partitionId)
+  tsm.markPartitionCompleted(partitionId, taskInfo)
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/144426cf/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index d9515fb..705b896 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -748,7 +748,7 @@ private[spark] class TaskSetManager(
 }
 // There may be multiple tasksets for this stage -- we let all of them 
know that the partition
 // was completed.  This may result in some of the tasksets getting 
completed.
-sched.markPartitionCompletedInAllTaskSets(stageId, 
tasks(index).partitionId)
+sched.markPartitionCompletedInAllTaskSets(stageId, 
tasks(index).partitionId, info)
 // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which 
holds the
 // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, 
we should not
 // "deserialize" the value when holding a lock to avoid blocking other 
threads. So we call
@@ -759,9 +759,12 @@ private[spark] class TaskSetManager(
 maybeFinishTaskSet()
   }
 
-  private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
+  private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: 
TaskInfo): Unit = {
 partitionToIndex.get(partitionId).foreach { index =>
   if (!successful(index)) {
+if (speculationEnabled && !isZombie) {
+  successfulTaskDurations.insert(taskInfo.duration)
+}
 tasksSuccessful += 1
 successful(index) = true
 if (tasksSuccessful == numTasks) {

http://git-wip-us.apache.org/repos/asf/spark/blob/144426cf/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

spark git commit: [SPARK-24677][CORE] Avoid NoSuchElementException from MedianHeap

2018-07-18 Thread tgraves
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 e31b4766b -> 7be70e29d


[SPARK-24677][CORE] Avoid NoSuchElementException from MedianHeap

## What changes were proposed in this pull request?
When speculation is enabled,
TaskSetManager#markPartitionCompleted should write successful task duration to 
MedianHeap,
not just increase tasksSuccessful.

Otherwise when TaskSetManager#checkSpeculatableTasks,tasksSuccessful non-zero, 
but MedianHeap is empty.
Then throw an exception successfulTaskDurations.median 
java.util.NoSuchElementException: MedianHeap is empty.
Finally led to stopping SparkContext.
## How was this patch tested?
TaskSetManagerSuite.scala
unit test:[SPARK-24677] MedianHeap should not be empty when speculation is 
enabled

Author: sychen 

Closes #21656 from cxzl25/fix_MedianHeap_empty.

(cherry picked from commit c8bee932cb644627c4049b5a07dd8028968572d9)
Signed-off-by: Thomas Graves 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7be70e29
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7be70e29
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7be70e29

Branch: refs/heads/branch-2.3
Commit: 7be70e29dd92de36dbb30ce39623d588f48e4cac
Parents: e31b476
Author: sychen 
Authored: Wed Jul 18 13:24:41 2018 -0500
Committer: Thomas Graves 
Committed: Wed Jul 18 13:24:54 2018 -0500

--
 .../spark/scheduler/TaskSchedulerImpl.scala |  7 ++-
 .../apache/spark/scheduler/TaskSetManager.scala |  7 ++-
 .../spark/scheduler/TaskSetManagerSuite.scala   | 49 
 3 files changed, 59 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7be70e29/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 598b62f..56c0bf6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -697,9 +697,12 @@ private[spark] class TaskSchedulerImpl(
* do not also submit those same tasks.  That also means that a task 
completion from an earlier
* attempt can lead to the entire stage getting marked as successful.
*/
-  private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, 
partitionId: Int) = {
+  private[scheduler] def markPartitionCompletedInAllTaskSets(
+  stageId: Int,
+  partitionId: Int,
+  taskInfo: TaskInfo) = {
 taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm 
=>
-  tsm.markPartitionCompleted(partitionId)
+  tsm.markPartitionCompleted(partitionId, taskInfo)
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7be70e29/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index b52e376..c90ae50 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -759,7 +759,7 @@ private[spark] class TaskSetManager(
 }
 // There may be multiple tasksets for this stage -- we let all of them 
know that the partition
 // was completed.  This may result in some of the tasksets getting 
completed.
-sched.markPartitionCompletedInAllTaskSets(stageId, 
tasks(index).partitionId)
+sched.markPartitionCompletedInAllTaskSets(stageId, 
tasks(index).partitionId, info)
 // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which 
holds the
 // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, 
we should not
 // "deserialize" the value when holding a lock to avoid blocking other 
threads. So we call
@@ -770,9 +770,12 @@ private[spark] class TaskSetManager(
 maybeFinishTaskSet()
   }
 
-  private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
+  private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: 
TaskInfo): Unit = {
 partitionToIndex.get(partitionId).foreach { index =>
   if (!successful(index)) {
+if (speculationEnabled && !isZombie) {
+  successfulTaskDurations.insert(taskInfo.duration)
+}
 tasksSuccessful += 1
 successful(index) = true
 if (tasksSuccessful == numTasks) {

http://git-wip-us.apache.org/repos/asf/spark/blob/7be70e29/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

spark git commit: [SPARK-24677][CORE] Avoid NoSuchElementException from MedianHeap

2018-07-18 Thread tgraves
Repository: spark
Updated Branches:
  refs/heads/master fc0c8c971 -> c8bee932c


[SPARK-24677][CORE] Avoid NoSuchElementException from MedianHeap

## What changes were proposed in this pull request?
When speculation is enabled,
TaskSetManager#markPartitionCompleted should write successful task duration to 
MedianHeap,
not just increase tasksSuccessful.

Otherwise when TaskSetManager#checkSpeculatableTasks,tasksSuccessful non-zero, 
but MedianHeap is empty.
Then throw an exception successfulTaskDurations.median 
java.util.NoSuchElementException: MedianHeap is empty.
Finally led to stopping SparkContext.
## How was this patch tested?
TaskSetManagerSuite.scala
unit test:[SPARK-24677] MedianHeap should not be empty when speculation is 
enabled

Author: sychen 

Closes #21656 from cxzl25/fix_MedianHeap_empty.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c8bee932
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c8bee932
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c8bee932

Branch: refs/heads/master
Commit: c8bee932cb644627c4049b5a07dd8028968572d9
Parents: fc0c8c9
Author: sychen 
Authored: Wed Jul 18 13:24:41 2018 -0500
Committer: Thomas Graves 
Committed: Wed Jul 18 13:24:41 2018 -0500

--
 .../spark/scheduler/TaskSchedulerImpl.scala |  7 ++-
 .../apache/spark/scheduler/TaskSetManager.scala |  7 ++-
 .../spark/scheduler/TaskSetManagerSuite.scala   | 49 
 3 files changed, 59 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c8bee932/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 598b62f..56c0bf6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -697,9 +697,12 @@ private[spark] class TaskSchedulerImpl(
* do not also submit those same tasks.  That also means that a task 
completion from an earlier
* attempt can lead to the entire stage getting marked as successful.
*/
-  private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, 
partitionId: Int) = {
+  private[scheduler] def markPartitionCompletedInAllTaskSets(
+  stageId: Int,
+  partitionId: Int,
+  taskInfo: TaskInfo) = {
 taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm 
=>
-  tsm.markPartitionCompleted(partitionId)
+  tsm.markPartitionCompleted(partitionId, taskInfo)
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c8bee932/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index a18c665..6071605 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -758,7 +758,7 @@ private[spark] class TaskSetManager(
 }
 // There may be multiple tasksets for this stage -- we let all of them 
know that the partition
 // was completed.  This may result in some of the tasksets getting 
completed.
-sched.markPartitionCompletedInAllTaskSets(stageId, 
tasks(index).partitionId)
+sched.markPartitionCompletedInAllTaskSets(stageId, 
tasks(index).partitionId, info)
 // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which 
holds the
 // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, 
we should not
 // "deserialize" the value when holding a lock to avoid blocking other 
threads. So we call
@@ -769,9 +769,12 @@ private[spark] class TaskSetManager(
 maybeFinishTaskSet()
   }
 
-  private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
+  private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: 
TaskInfo): Unit = {
 partitionToIndex.get(partitionId).foreach { index =>
   if (!successful(index)) {
+if (speculationEnabled && !isZombie) {
+  successfulTaskDurations.insert(taskInfo.duration)
+}
 tasksSuccessful += 1
 successful(index) = true
 if (tasksSuccessful == numTasks) {

http://git-wip-us.apache.org/repos/asf/spark/blob/c8bee932/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
--
diff --git