spark git commit: [SPARK-23433][CORE] Late zombie task completions update all tasksets

2018-05-03 Thread irashid
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 4f1ae3af9 -> 154bbc959


[SPARK-23433][CORE] Late zombie task completions update all tasksets

Fetch failure lead to multiple tasksets which are active for a given
stage.  While there is only one "active" version of the taskset, the
earlier attempts can still have running tasks, which can complete
successfully.  So a task completion needs to update every taskset
so that it knows the partition is completed.  That way the final active
taskset does not try to submit another task for the same partition,
and so that it knows when it is completed and when it should be
marked as a "zombie".

Added a regression test.

Author: Imran Rashid 

Closes #21131 from squito/SPARK-23433.

(cherry picked from commit 94641fe6cc68e5977dd8663b8f232a287a783acb)
Signed-off-by: Imran Rashid 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/154bbc95
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/154bbc95
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/154bbc95

Branch: refs/heads/branch-2.2
Commit: 154bbc95989b34b2007022b6082ed17864bbaa32
Parents: 4f1ae3a
Author: Imran Rashid 
Authored: Thu May 3 10:59:18 2018 -0500
Committer: Imran Rashid 
Committed: Thu May 3 10:59:45 2018 -0500

--
 .../spark/scheduler/TaskSchedulerImpl.scala |  14 +++
 .../apache/spark/scheduler/TaskSetManager.scala |  20 +++-
 .../scheduler/TaskSchedulerImplSuite.scala  | 104 +++
 3 files changed, 137 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/154bbc95/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 1b6bc91..df6407b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -693,6 +693,20 @@ private[spark] class TaskSchedulerImpl private[scheduler](
 }
   }
 
+  /**
+   * Marks the task has completed in all TaskSetManagers for the given stage.
+   *
+   * After stage failure and retry, there may be multiple TaskSetManagers for 
the stage.
+   * If an earlier attempt of a stage completes a task, we should ensure that 
the later attempts
+   * do not also submit those same tasks.  That also means that a task 
completion from an  earlier
+   * attempt can lead to the entire stage getting marked as successful.
+   */
+  private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, 
partitionId: Int) = {
+taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm 
=>
+  tsm.markPartitionCompleted(partitionId)
+}
+  }
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/154bbc95/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 2f4e46c..d9515fb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -74,6 +74,8 @@ private[spark] class TaskSetManager(
   val ser = env.closureSerializer.newInstance()
 
   val tasks = taskSet.tasks
+  private[scheduler] val partitionToIndex = tasks.zipWithIndex
+.map { case (t, idx) => t.partitionId -> idx }.toMap
   val numTasks = tasks.length
   val copiesRunning = new Array[Int](numTasks)
 
@@ -149,7 +151,7 @@ private[spark] class TaskSetManager(
   private[scheduler] val speculatableTasks = new HashSet[Int]
 
   // Task index, start and finish time for each task attempt (indexed by task 
ID)
-  private val taskInfos = new HashMap[Long, TaskInfo]
+  private[scheduler] val taskInfos = new HashMap[Long, TaskInfo]
 
   // Use a MedianHeap to record durations of successful tasks so we know when 
to launch
   // speculative tasks. This is only used when speculation is enabled, to 
avoid the overhead
@@ -744,6 +746,9 @@ private[spark] class TaskSetManager(
   logInfo("Ignoring task-finished event for " + info.id + " in stage " + 
taskSet.id +
 " because task " + index + " has already completed successfully")
 }
+// There may be multiple tasksets for this stage -- we let all of them 
know that the partition
+// was completed.  This may result in some of the tasksets getting 
completed.
+sched.markPartitionCompletedInAllTaskSets(stageId, 
tasks(index).partitionId)
 // This method is called by "TaskSchedulerImpl.

spark git commit: [SPARK-23433][CORE] Late zombie task completions update all tasksets

2018-05-03 Thread irashid
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 61e7bc0c1 -> 8509284e1


[SPARK-23433][CORE] Late zombie task completions update all tasksets

Fetch failure lead to multiple tasksets which are active for a given
stage.  While there is only one "active" version of the taskset, the
earlier attempts can still have running tasks, which can complete
successfully.  So a task completion needs to update every taskset
so that it knows the partition is completed.  That way the final active
taskset does not try to submit another task for the same partition,
and so that it knows when it is completed and when it should be
marked as a "zombie".

Added a regression test.

Author: Imran Rashid 

Closes #21131 from squito/SPARK-23433.

(cherry picked from commit 94641fe6cc68e5977dd8663b8f232a287a783acb)
Signed-off-by: Imran Rashid 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8509284e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8509284e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8509284e

Branch: refs/heads/branch-2.3
Commit: 8509284e1ec048d5afa87d41071c0429924e45c9
Parents: 61e7bc0
Author: Imran Rashid 
Authored: Thu May 3 10:59:18 2018 -0500
Committer: Imran Rashid 
Committed: Thu May 3 10:59:30 2018 -0500

--
 .../spark/scheduler/TaskSchedulerImpl.scala |  14 +++
 .../apache/spark/scheduler/TaskSetManager.scala |  20 +++-
 .../scheduler/TaskSchedulerImplSuite.scala  | 104 +++
 3 files changed, 137 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8509284e/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 0c11806..8e97b3d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -689,6 +689,20 @@ private[spark] class TaskSchedulerImpl(
 }
   }
 
+  /**
+   * Marks the task has completed in all TaskSetManagers for the given stage.
+   *
+   * After stage failure and retry, there may be multiple TaskSetManagers for 
the stage.
+   * If an earlier attempt of a stage completes a task, we should ensure that 
the later attempts
+   * do not also submit those same tasks.  That also means that a task 
completion from an  earlier
+   * attempt can lead to the entire stage getting marked as successful.
+   */
+  private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, 
partitionId: Int) = {
+taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm 
=>
+  tsm.markPartitionCompleted(partitionId)
+}
+  }
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8509284e/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index c3ed11b..b52e376 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -74,6 +74,8 @@ private[spark] class TaskSetManager(
   val ser = env.closureSerializer.newInstance()
 
   val tasks = taskSet.tasks
+  private[scheduler] val partitionToIndex = tasks.zipWithIndex
+.map { case (t, idx) => t.partitionId -> idx }.toMap
   val numTasks = tasks.length
   val copiesRunning = new Array[Int](numTasks)
 
@@ -154,7 +156,7 @@ private[spark] class TaskSetManager(
   private[scheduler] val speculatableTasks = new HashSet[Int]
 
   // Task index, start and finish time for each task attempt (indexed by task 
ID)
-  private val taskInfos = new HashMap[Long, TaskInfo]
+  private[scheduler] val taskInfos = new HashMap[Long, TaskInfo]
 
   // Use a MedianHeap to record durations of successful tasks so we know when 
to launch
   // speculative tasks. This is only used when speculation is enabled, to 
avoid the overhead
@@ -755,6 +757,9 @@ private[spark] class TaskSetManager(
   logInfo("Ignoring task-finished event for " + info.id + " in stage " + 
taskSet.id +
 " because task " + index + " has already completed successfully")
 }
+// There may be multiple tasksets for this stage -- we let all of them 
know that the partition
+// was completed.  This may result in some of the tasksets getting 
completed.
+sched.markPartitionCompletedInAllTaskSets(stageId, 
tasks(index).partitionId)
 // This method is called by "TaskSchedulerImpl.handleSuccessfulTas

spark git commit: [SPARK-23433][CORE] Late zombie task completions update all tasksets

2018-05-03 Thread irashid
Repository: spark
Updated Branches:
  refs/heads/master 96a50016b -> 94641fe6c


[SPARK-23433][CORE] Late zombie task completions update all tasksets

Fetch failure lead to multiple tasksets which are active for a given
stage.  While there is only one "active" version of the taskset, the
earlier attempts can still have running tasks, which can complete
successfully.  So a task completion needs to update every taskset
so that it knows the partition is completed.  That way the final active
taskset does not try to submit another task for the same partition,
and so that it knows when it is completed and when it should be
marked as a "zombie".

Added a regression test.

Author: Imran Rashid 

Closes #21131 from squito/SPARK-23433.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94641fe6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94641fe6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94641fe6

Branch: refs/heads/master
Commit: 94641fe6cc68e5977dd8663b8f232a287a783acb
Parents: 96a5001
Author: Imran Rashid 
Authored: Thu May 3 10:59:18 2018 -0500
Committer: Imran Rashid 
Committed: Thu May 3 10:59:18 2018 -0500

--
 .../spark/scheduler/TaskSchedulerImpl.scala |  14 +++
 .../apache/spark/scheduler/TaskSetManager.scala |  20 +++-
 .../scheduler/TaskSchedulerImplSuite.scala  | 104 +++
 3 files changed, 137 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/94641fe6/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 0c11806..8e97b3d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -689,6 +689,20 @@ private[spark] class TaskSchedulerImpl(
 }
   }
 
+  /**
+   * Marks the task has completed in all TaskSetManagers for the given stage.
+   *
+   * After stage failure and retry, there may be multiple TaskSetManagers for 
the stage.
+   * If an earlier attempt of a stage completes a task, we should ensure that 
the later attempts
+   * do not also submit those same tasks.  That also means that a task 
completion from an  earlier
+   * attempt can lead to the entire stage getting marked as successful.
+   */
+  private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, 
partitionId: Int) = {
+taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm 
=>
+  tsm.markPartitionCompleted(partitionId)
+}
+  }
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/94641fe6/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 8a96a76..195fc80 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -73,6 +73,8 @@ private[spark] class TaskSetManager(
   val ser = env.closureSerializer.newInstance()
 
   val tasks = taskSet.tasks
+  private[scheduler] val partitionToIndex = tasks.zipWithIndex
+.map { case (t, idx) => t.partitionId -> idx }.toMap
   val numTasks = tasks.length
   val copiesRunning = new Array[Int](numTasks)
 
@@ -153,7 +155,7 @@ private[spark] class TaskSetManager(
   private[scheduler] val speculatableTasks = new HashSet[Int]
 
   // Task index, start and finish time for each task attempt (indexed by task 
ID)
-  private val taskInfos = new HashMap[Long, TaskInfo]
+  private[scheduler] val taskInfos = new HashMap[Long, TaskInfo]
 
   // Use a MedianHeap to record durations of successful tasks so we know when 
to launch
   // speculative tasks. This is only used when speculation is enabled, to 
avoid the overhead
@@ -754,6 +756,9 @@ private[spark] class TaskSetManager(
   logInfo("Ignoring task-finished event for " + info.id + " in stage " + 
taskSet.id +
 " because task " + index + " has already completed successfully")
 }
+// There may be multiple tasksets for this stage -- we let all of them 
know that the partition
+// was completed.  This may result in some of the tasksets getting 
completed.
+sched.markPartitionCompletedInAllTaskSets(stageId, 
tasks(index).partitionId)
 // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which 
holds the
 // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, 
we shou