[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

2018-07-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21131#discussion_r203435933
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -764,6 +769,19 @@ private[spark] class TaskSetManager(
 maybeFinishTaskSet()
   }
 
+  private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
+partitionToIndex.get(partitionId).foreach { index =>
+  if (!successful(index)) {
+tasksSuccessful += 1
+successful(index) = true
+if (tasksSuccessful == numTasks) {
+  isZombie = true
+}
+maybeFinishTaskSet()
--- End diff --

it's too minor. If we touch this file again, let's remove it. Otherwise 
maybe not bother about it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

2018-07-18 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21131#discussion_r203415036
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -764,6 +769,19 @@ private[spark] class TaskSetManager(
 maybeFinishTaskSet()
   }
 
+  private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
+partitionToIndex.get(partitionId).foreach { index =>
+  if (!successful(index)) {
+tasksSuccessful += 1
+successful(index) = true
+if (tasksSuccessful == numTasks) {
+  isZombie = true
+}
+maybeFinishTaskSet()
--- End diff --

I think you're right, its not needed, its called when the tasks succeed, 
fail, or are aborted, and when this called while that taskset still has running 
tasks, then its a no-op, as it would fail the `runningTasks == 0` check inside 
`maybeFinishTaskSet()`.

do you think its worth removing?  I'm fine either way.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

2018-07-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21131#discussion_r203257926
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -764,6 +769,19 @@ private[spark] class TaskSetManager(
 maybeFinishTaskSet()
   }
 
+  private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
+partitionToIndex.get(partitionId).foreach { index =>
+  if (!successful(index)) {
+tasksSuccessful += 1
+successful(index) = true
+if (tasksSuccessful == numTasks) {
+  isZombie = true
+}
+maybeFinishTaskSet()
--- End diff --

is this line needed? We will call `maybeFinishTaskSet()` at the end of 
`handleSuccessfulTask`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

2018-05-03 Thread squito
Github user squito closed the pull request at:

https://github.com/apache/spark/pull/21131


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

2018-04-24 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21131#discussion_r183797532
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
   taskScheduler.initialize(new FakeSchedulerBackend)
 }
   }
+
+  test("Completions in zombie tasksets update status of non-zombie 
taskset") {
+val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+val valueSer = SparkEnv.get.serializer.newInstance()
+
+def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): 
Unit = {
+  val indexInTsm = tsm.partitionToIndex(partition)
+  val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == 
indexInTsm).head
+  val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
+  tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
+}
+
+// Submit a task set, have it fail with a fetch failed, and then 
re-submit the task attempt,
+// two times, so we have three active task sets for one stage.  (For 
this to really happen,
+// you'd need the previous stage to also get restarted, and then 
succeed, in between each
+// attempt, but that happens outside what we're mocking here.)
+val zombieAttempts = (0 until 2).map { stageAttempt =>
+  val attempt = FakeTask.createTaskSet(10, stageAttemptId = 
stageAttempt)
+  taskScheduler.submitTasks(attempt)
+  val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
+  val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+  taskScheduler.resourceOffers(offers)
+  assert(tsm.runningTasks === 10)
+  if (stageAttempt < 2) {
+// fail attempt
+tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, 
TaskState.FAILED,
+  FetchFailed(null, 0, 0, 0, "fetch failed"))
+// the attempt is a zombie, but the tasks are still running (this 
could be true even if
+// we actively killed those tasks, as killing is best-effort)
+assert(tsm.isZombie)
+assert(tsm.runningTasks === 9)
+  }
+  tsm
+}
+
+// we've now got 2 zombie attempts, each with 9 tasks still active.  
Submit the 3rd attempt for
+// the stage, but this time with insufficient resources so not all 
tasks are active.
+
+val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
+taskScheduler.submitTasks(finalAttempt)
+val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
+val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+val finalAttemptLaunchedPartitions = 
taskScheduler.resourceOffers(offers).flatten.map { task =>
--- End diff --

The explanation is quite clear and I get understand now. Thank you very 
mush! @squito 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

2018-04-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21131#discussion_r183793745
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
   taskScheduler.initialize(new FakeSchedulerBackend)
 }
   }
+
+  test("Completions in zombie tasksets update status of non-zombie 
taskset") {
+val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+val valueSer = SparkEnv.get.serializer.newInstance()
+
+def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): 
Unit = {
+  val indexInTsm = tsm.partitionToIndex(partition)
+  val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == 
indexInTsm).head
+  val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
+  tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
+}
+
+// Submit a task set, have it fail with a fetch failed, and then 
re-submit the task attempt,
+// two times, so we have three active task sets for one stage.  (For 
this to really happen,
+// you'd need the previous stage to also get restarted, and then 
succeed, in between each
+// attempt, but that happens outside what we're mocking here.)
+val zombieAttempts = (0 until 2).map { stageAttempt =>
+  val attempt = FakeTask.createTaskSet(10, stageAttemptId = 
stageAttempt)
+  taskScheduler.submitTasks(attempt)
+  val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
+  val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+  taskScheduler.resourceOffers(offers)
+  assert(tsm.runningTasks === 10)
+  if (stageAttempt < 2) {
+// fail attempt
+tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, 
TaskState.FAILED,
+  FetchFailed(null, 0, 0, 0, "fetch failed"))
+// the attempt is a zombie, but the tasks are still running (this 
could be true even if
+// we actively killed those tasks, as killing is best-effort)
+assert(tsm.isZombie)
+assert(tsm.runningTasks === 9)
+  }
+  tsm
+}
+
+// we've now got 2 zombie attempts, each with 9 tasks still active.  
Submit the 3rd attempt for
+// the stage, but this time with insufficient resources so not all 
tasks are active.
+
+val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
+taskScheduler.submitTasks(finalAttempt)
+val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
+val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+val finalAttemptLaunchedPartitions = 
taskScheduler.resourceOffers(offers).flatten.map { task =>
--- End diff --

the assumption is that a fetchfailure means that *all* data on that host is 
unavailable.  As shuffles are all-to-all, its very likely that every task is 
going to need some piece of data from that host.  Its possible that they 
already grabbed all the data they need, before the problem occurred with the 
host, we don't know.  Also, there is no "partial progress" for a task -- tasks 
don't know how to grab all the shuffle output they can, then just wait until 
the missing bit becomes available again.  They fail as soon as the data they 
need is unavailable (with some retries, but there is no "pause" nor a check for 
data on another source). 

Also the dagscheduler is a little confusing on this -- it does the 
unregister in two parts (I have no idea why anymore, to be honest):


https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1391


https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1406
 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

2018-04-24 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21131#discussion_r183790463
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
   taskScheduler.initialize(new FakeSchedulerBackend)
 }
   }
+
+  test("Completions in zombie tasksets update status of non-zombie 
taskset") {
+val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+val valueSer = SparkEnv.get.serializer.newInstance()
+
+def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): 
Unit = {
+  val indexInTsm = tsm.partitionToIndex(partition)
+  val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == 
indexInTsm).head
+  val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
+  tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
+}
+
+// Submit a task set, have it fail with a fetch failed, and then 
re-submit the task attempt,
+// two times, so we have three active task sets for one stage.  (For 
this to really happen,
+// you'd need the previous stage to also get restarted, and then 
succeed, in between each
+// attempt, but that happens outside what we're mocking here.)
+val zombieAttempts = (0 until 2).map { stageAttempt =>
+  val attempt = FakeTask.createTaskSet(10, stageAttemptId = 
stageAttempt)
+  taskScheduler.submitTasks(attempt)
+  val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
+  val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+  taskScheduler.resourceOffers(offers)
+  assert(tsm.runningTasks === 10)
+  if (stageAttempt < 2) {
+// fail attempt
+tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, 
TaskState.FAILED,
+  FetchFailed(null, 0, 0, 0, "fetch failed"))
+// the attempt is a zombie, but the tasks are still running (this 
could be true even if
+// we actively killed those tasks, as killing is best-effort)
+assert(tsm.isZombie)
+assert(tsm.runningTasks === 9)
+  }
+  tsm
+}
+
+// we've now got 2 zombie attempts, each with 9 tasks still active.  
Submit the 3rd attempt for
+// the stage, but this time with insufficient resources so not all 
tasks are active.
+
+val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
+taskScheduler.submitTasks(finalAttempt)
+val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
+val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+val finalAttemptLaunchedPartitions = 
taskScheduler.resourceOffers(offers).flatten.map { task =>
+  finalAttempt.tasks(task.index).partitionId
+}.toSet
+assert(finalTsm.runningTasks === 5)
+assert(!finalTsm.isZombie)
+
+// We simulate late completions from our zombie tasksets, 
corresponding to all the pending
+// partitions in our final attempt.  This means we're only waiting on 
the tasks we've already
+// launched.
+val finalAttemptPendingPartitions = (0 until 
10).toSet.diff(finalAttemptLaunchedPartitions)
+finalAttemptPendingPartitions.foreach { partition =>
+  completeTaskSuccessfully(zombieAttempts(0), partition)
+}
+
+// If there is another resource offer, we shouldn't run anything.  
Though our final attempt
+// used to have pending tasks, now those tasks have been completed by 
zombie attempts.  The
+// remaining tasks to compute are already active in the non-zombie 
attempt.
+assert(
+  taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", 
"host-1", 1))).flatten.isEmpty)
+
+val allTaskSets = zombieAttempts ++ Seq(finalTsm)
+val remainingTasks = (0 until 
10).toSet.diff(finalAttemptPendingPartitions)
+
+// finally, if we finish the remaining partitions from a mix of 
tasksets, all attempts should be
+// marked as zombie.
+// for each of the remaining tasks, find the tasksets with an active 
copy of the task, and
+// finish the task.
+remainingTasks.foreach { partition =>
+  val tsm = if (partition == 0) {
+// we failed this task on both zombie attempts, this one is only 
present in the latest
+// taskset
+finalTsm
+  } else {
+// should be active in every taskset.  We choose a zombie taskset 
just to make sure that
+// we transition the active taskset correctly even if the final 
completion comes
+// from a zombie.
+zombieAttempts(partition 

[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

2018-04-24 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21131#discussion_r183789814
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
   taskScheduler.initialize(new FakeSchedulerBackend)
 }
   }
+
+  test("Completions in zombie tasksets update status of non-zombie 
taskset") {
+val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+val valueSer = SparkEnv.get.serializer.newInstance()
+
+def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): 
Unit = {
+  val indexInTsm = tsm.partitionToIndex(partition)
+  val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == 
indexInTsm).head
+  val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
+  tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
+}
+
+// Submit a task set, have it fail with a fetch failed, and then 
re-submit the task attempt,
+// two times, so we have three active task sets for one stage.  (For 
this to really happen,
+// you'd need the previous stage to also get restarted, and then 
succeed, in between each
+// attempt, but that happens outside what we're mocking here.)
+val zombieAttempts = (0 until 2).map { stageAttempt =>
+  val attempt = FakeTask.createTaskSet(10, stageAttemptId = 
stageAttempt)
+  taskScheduler.submitTasks(attempt)
+  val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
+  val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+  taskScheduler.resourceOffers(offers)
+  assert(tsm.runningTasks === 10)
+  if (stageAttempt < 2) {
+// fail attempt
+tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, 
TaskState.FAILED,
+  FetchFailed(null, 0, 0, 0, "fetch failed"))
+// the attempt is a zombie, but the tasks are still running (this 
could be true even if
+// we actively killed those tasks, as killing is best-effort)
+assert(tsm.isZombie)
+assert(tsm.runningTasks === 9)
+  }
+  tsm
+}
+
+// we've now got 2 zombie attempts, each with 9 tasks still active.  
Submit the 3rd attempt for
+// the stage, but this time with insufficient resources so not all 
tasks are active.
+
+val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
+taskScheduler.submitTasks(finalAttempt)
+val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
+val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+val finalAttemptLaunchedPartitions = 
taskScheduler.resourceOffers(offers).flatten.map { task =>
--- End diff --

> because they won't be able to get their shuffle input, same as the 
original fetch failure

why? In `DAGScheduler`, we only unregister one MapStatus of parent stage, 
so other running tasks within the failed (child) stage (caused by a fetch fail 
task)  may still get MapOutputs from `MapOutputTrackerMaster`, and fetch data 
from other `Executor`s. So, they can success normally. 
Do I miss something?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

2018-04-24 Thread attilapiros
Github user attilapiros commented on a diff in the pull request:

https://github.com/apache/spark/pull/21131#discussion_r183783006
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
   taskScheduler.initialize(new FakeSchedulerBackend)
 }
   }
+
+  test("Completions in zombie tasksets update status of non-zombie 
taskset") {
+val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+val valueSer = SparkEnv.get.serializer.newInstance()
+
+def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): 
Unit = {
+  val indexInTsm = tsm.partitionToIndex(partition)
+  val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == 
indexInTsm).head
+  val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
+  tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
+}
+
+// Submit a task set, have it fail with a fetch failed, and then 
re-submit the task attempt,
+// two times, so we have three active task sets for one stage.  (For 
this to really happen,
+// you'd need the previous stage to also get restarted, and then 
succeed, in between each
+// attempt, but that happens outside what we're mocking here.)
+val zombieAttempts = (0 until 2).map { stageAttempt =>
+  val attempt = FakeTask.createTaskSet(10, stageAttemptId = 
stageAttempt)
+  taskScheduler.submitTasks(attempt)
+  val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
+  val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+  taskScheduler.resourceOffers(offers)
+  assert(tsm.runningTasks === 10)
+  if (stageAttempt < 2) {
+// fail attempt
+tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, 
TaskState.FAILED,
+  FetchFailed(null, 0, 0, 0, "fetch failed"))
+// the attempt is a zombie, but the tasks are still running (this 
could be true even if
+// we actively killed those tasks, as killing is best-effort)
+assert(tsm.isZombie)
+assert(tsm.runningTasks === 9)
+  }
+  tsm
+}
+
+// we've now got 2 zombie attempts, each with 9 tasks still active.  
Submit the 3rd attempt for
+// the stage, but this time with insufficient resources so not all 
tasks are active.
+
+val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
+taskScheduler.submitTasks(finalAttempt)
+val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
+val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+val finalAttemptLaunchedPartitions = 
taskScheduler.resourceOffers(offers).flatten.map { task =>
+  finalAttempt.tasks(task.index).partitionId
+}.toSet
+assert(finalTsm.runningTasks === 5)
+assert(!finalTsm.isZombie)
+
+// We simulate late completions from our zombie tasksets, 
corresponding to all the pending
+// partitions in our final attempt.  This means we're only waiting on 
the tasks we've already
+// launched.
+val finalAttemptPendingPartitions = (0 until 
10).toSet.diff(finalAttemptLaunchedPartitions)
+finalAttemptPendingPartitions.foreach { partition =>
+  completeTaskSuccessfully(zombieAttempts(0), partition)
+}
+
+// If there is another resource offer, we shouldn't run anything.  
Though our final attempt
+// used to have pending tasks, now those tasks have been completed by 
zombie attempts.  The
+// remaining tasks to compute are already active in the non-zombie 
attempt.
+assert(
+  taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", 
"host-1", 1))).flatten.isEmpty)
+
+val allTaskSets = zombieAttempts ++ Seq(finalTsm)
+val remainingTasks = (0 until 
10).toSet.diff(finalAttemptPendingPartitions)
--- End diff --

As I see remainingTasks is always the same as 
finalAttemptLaunchedPartitions. I am wondering whether it is more readable to 
use finalAttemptLaunchedPartitions here for initialisation. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

2018-04-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21131#discussion_r183781695
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
   taskScheduler.initialize(new FakeSchedulerBackend)
 }
   }
+
+  test("Completions in zombie tasksets update status of non-zombie 
taskset") {
+val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+val valueSer = SparkEnv.get.serializer.newInstance()
+
+def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): 
Unit = {
+  val indexInTsm = tsm.partitionToIndex(partition)
+  val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == 
indexInTsm).head
+  val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
+  tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
+}
+
+// Submit a task set, have it fail with a fetch failed, and then 
re-submit the task attempt,
+// two times, so we have three active task sets for one stage.  (For 
this to really happen,
+// you'd need the previous stage to also get restarted, and then 
succeed, in between each
+// attempt, but that happens outside what we're mocking here.)
+val zombieAttempts = (0 until 2).map { stageAttempt =>
+  val attempt = FakeTask.createTaskSet(10, stageAttemptId = 
stageAttempt)
+  taskScheduler.submitTasks(attempt)
+  val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
+  val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+  taskScheduler.resourceOffers(offers)
+  assert(tsm.runningTasks === 10)
+  if (stageAttempt < 2) {
+// fail attempt
+tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, 
TaskState.FAILED,
+  FetchFailed(null, 0, 0, 0, "fetch failed"))
+// the attempt is a zombie, but the tasks are still running (this 
could be true even if
+// we actively killed those tasks, as killing is best-effort)
+assert(tsm.isZombie)
+assert(tsm.runningTasks === 9)
+  }
+  tsm
+}
+
+// we've now got 2 zombie attempts, each with 9 tasks still active.  
Submit the 3rd attempt for
+// the stage, but this time with insufficient resources so not all 
tasks are active.
+
+val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
+taskScheduler.submitTasks(finalAttempt)
+val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
+val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+val finalAttemptLaunchedPartitions = 
taskScheduler.resourceOffers(offers).flatten.map { task =>
--- End diff --

we've previously debated about what to do with the tasks still running in a 
zombie attempt, and there hasn't been any definitive conclusion.  I'm just 
trying to do a correctness fix here.  Briefly, in general there is an 
expectation that those tasks are unlikely to succeed (because they won't be 
able to get their shuffle input, same as the original fetch failure), so we 
don't want to delay starting a new attempt of that task.  And perhaps we should 
even actively kill those tasks (you'll see comments about that in various 
places).  But if they do succeed, we need to handle them correctly.  Note that 
even if we did try to actively kill them, you'd still need to handle a 
late-completion, as killing would only be "best-effort".


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

2018-04-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21131#discussion_r183779968
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
   taskScheduler.initialize(new FakeSchedulerBackend)
 }
   }
+
+  test("Completions in zombie tasksets update status of non-zombie 
taskset") {
+val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+val valueSer = SparkEnv.get.serializer.newInstance()
+
+def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): 
Unit = {
+  val indexInTsm = tsm.partitionToIndex(partition)
+  val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == 
indexInTsm).head
+  val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
+  tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
+}
+
+// Submit a task set, have it fail with a fetch failed, and then 
re-submit the task attempt,
+// two times, so we have three active task sets for one stage.  (For 
this to really happen,
+// you'd need the previous stage to also get restarted, and then 
succeed, in between each
+// attempt, but that happens outside what we're mocking here.)
+val zombieAttempts = (0 until 2).map { stageAttempt =>
+  val attempt = FakeTask.createTaskSet(10, stageAttemptId = 
stageAttempt)
+  taskScheduler.submitTasks(attempt)
+  val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
+  val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+  taskScheduler.resourceOffers(offers)
+  assert(tsm.runningTasks === 10)
+  if (stageAttempt < 2) {
--- End diff --

good point, fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

2018-04-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21131#discussion_r183778637
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
   taskScheduler.initialize(new FakeSchedulerBackend)
 }
   }
+
+  test("Completions in zombie tasksets update status of non-zombie 
taskset") {
+val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+val valueSer = SparkEnv.get.serializer.newInstance()
+
+def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): 
Unit = {
+  val indexInTsm = tsm.partitionToIndex(partition)
+  val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == 
indexInTsm).head
+  val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
+  tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
+}
+
+// Submit a task set, have it fail with a fetch failed, and then 
re-submit the task attempt,
+// two times, so we have three active task sets for one stage.  (For 
this to really happen,
+// you'd need the previous stage to also get restarted, and then 
succeed, in between each
+// attempt, but that happens outside what we're mocking here.)
+val zombieAttempts = (0 until 2).map { stageAttempt =>
+  val attempt = FakeTask.createTaskSet(10, stageAttemptId = 
stageAttempt)
+  taskScheduler.submitTasks(attempt)
+  val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
+  val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+  taskScheduler.resourceOffers(offers)
+  assert(tsm.runningTasks === 10)
+  if (stageAttempt < 2) {
+// fail attempt
+tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, 
TaskState.FAILED,
+  FetchFailed(null, 0, 0, 0, "fetch failed"))
+// the attempt is a zombie, but the tasks are still running (this 
could be true even if
+// we actively killed those tasks, as killing is best-effort)
+assert(tsm.isZombie)
+assert(tsm.runningTasks === 9)
+  }
+  tsm
+}
+
+// we've now got 2 zombie attempts, each with 9 tasks still active.  
Submit the 3rd attempt for
+// the stage, but this time with insufficient resources so not all 
tasks are active.
+
+val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
+taskScheduler.submitTasks(finalAttempt)
+val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
+val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+val finalAttemptLaunchedPartitions = 
taskScheduler.resourceOffers(offers).flatten.map { task =>
+  finalAttempt.tasks(task.index).partitionId
+}.toSet
+assert(finalTsm.runningTasks === 5)
+assert(!finalTsm.isZombie)
+
+// We simulate late completions from our zombie tasksets, 
corresponding to all the pending
+// partitions in our final attempt.  This means we're only waiting on 
the tasks we've already
+// launched.
+val finalAttemptPendingPartitions = (0 until 
10).toSet.diff(finalAttemptLaunchedPartitions)
+finalAttemptPendingPartitions.foreach { partition =>
+  completeTaskSuccessfully(zombieAttempts(0), partition)
+}
+
+// If there is another resource offer, we shouldn't run anything.  
Though our final attempt
+// used to have pending tasks, now those tasks have been completed by 
zombie attempts.  The
+// remaining tasks to compute are already active in the non-zombie 
attempt.
+assert(
+  taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", 
"host-1", 1))).flatten.isEmpty)
+
+val allTaskSets = zombieAttempts ++ Seq(finalTsm)
+val remainingTasks = (0 until 
10).toSet.diff(finalAttemptPendingPartitions)
+
+// finally, if we finish the remaining partitions from a mix of 
tasksets, all attempts should be
+// marked as zombie.
+// for each of the remaining tasks, find the tasksets with an active 
copy of the task, and
+// finish the task.
+remainingTasks.foreach { partition =>
+  val tsm = if (partition == 0) {
+// we failed this task on both zombie attempts, this one is only 
present in the latest
+// taskset
+finalTsm
+  } else {
+// should be active in every taskset.  We choose a zombie taskset 
just to make sure that
+// we transition the active taskset correctly even if the final 
completion comes
+// from a zombie.
+zombieAttempts(partition %

[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

2018-04-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21131#discussion_r183777930
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -689,6 +689,20 @@ private[spark] class TaskSchedulerImpl(
 }
   }
 
+  /**
+   * Marks the task has completed in all TaskSetManagers for the given 
stage.
+   *
+   * After stage failure and retry, there may be multiple active 
TaskSetManagers for the stage.
--- End diff --

yeah the terminology is a bit of mess here ... I dunno if we consistently 
distinguish the use of "active" for one taskset which is non-zombie vs. all the 
tasksets which have some tasks that are running (though all-but-one must be 
zombies).
@markhamstra @kayousterhout thoughts on naming?

In any case, I think you're right, I will remove "active" here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

2018-04-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21131#discussion_r183776603
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
   taskScheduler.initialize(new FakeSchedulerBackend)
 }
   }
+
+  test("Completions in zombie tasksets update status of non-zombie 
taskset") {
+val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+val valueSer = SparkEnv.get.serializer.newInstance()
+
+def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): 
Unit = {
+  val indexInTsm = tsm.partitionToIndex(partition)
+  val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == 
indexInTsm).head
+  val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
+  tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
+}
+
+// Submit a task set, have it fail with a fetch failed, and then 
re-submit the task attempt,
+// two times, so we have three active task sets for one stage.  (For 
this to really happen,
+// you'd need the previous stage to also get restarted, and then 
succeed, in between each
+// attempt, but that happens outside what we're mocking here.)
+val zombieAttempts = (0 until 2).map { stageAttempt =>
+  val attempt = FakeTask.createTaskSet(10, stageAttemptId = 
stageAttempt)
+  taskScheduler.submitTasks(attempt)
+  val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
+  val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+  taskScheduler.resourceOffers(offers)
+  assert(tsm.runningTasks === 10)
+  if (stageAttempt < 2) {
+// fail attempt
+tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, 
TaskState.FAILED,
+  FetchFailed(null, 0, 0, 0, "fetch failed"))
+// the attempt is a zombie, but the tasks are still running (this 
could be true even if
+// we actively killed those tasks, as killing is best-effort)
+assert(tsm.isZombie)
+assert(tsm.runningTasks === 9)
+  }
+  tsm
+}
+
+// we've now got 2 zombie attempts, each with 9 tasks still active.  
Submit the 3rd attempt for
+// the stage, but this time with insufficient resources so not all 
tasks are active.
+
+val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
+taskScheduler.submitTasks(finalAttempt)
+val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
+val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+val finalAttemptLaunchedPartitions = 
taskScheduler.resourceOffers(offers).flatten.map { task =>
+  finalAttempt.tasks(task.index).partitionId
+}.toSet
+assert(finalTsm.runningTasks === 5)
+assert(!finalTsm.isZombie)
+
+// We simulate late completions from our zombie tasksets, 
corresponding to all the pending
+// partitions in our final attempt.  This means we're only waiting on 
the tasks we've already
+// launched.
+val finalAttemptPendingPartitions = (0 until 
10).toSet.diff(finalAttemptLaunchedPartitions)
+finalAttemptPendingPartitions.foreach { partition =>
+  completeTaskSuccessfully(zombieAttempts(0), partition)
+}
+
+// If there is another resource offer, we shouldn't run anything.  
Though our final attempt
+// used to have pending tasks, now those tasks have been completed by 
zombie attempts.  The
+// remaining tasks to compute are already active in the non-zombie 
attempt.
+assert(
+  taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", 
"host-1", 1))).flatten.isEmpty)
+
+val allTaskSets = zombieAttempts ++ Seq(finalTsm)
+val remainingTasks = (0 until 
10).toSet.diff(finalAttemptPendingPartitions)
+
+// finally, if we finish the remaining partitions from a mix of 
tasksets, all attempts should be
+// marked as zombie.
+// for each of the remaining tasks, find the tasksets with an active 
copy of the task, and
+// finish the task.
+remainingTasks.foreach { partition =>
+  val tsm = if (partition == 0) {
+// we failed this task on both zombie attempts, this one is only 
present in the latest
+// taskset
+finalTsm
+  } else {
+// should be active in every taskset.  We choose a zombie taskset 
just to make sure that
+// we transition the active taskset correctly even if the final 
completion comes
+// from a zombie.
+zombieAttempts(partition %

[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

2018-04-24 Thread attilapiros
Github user attilapiros commented on a diff in the pull request:

https://github.com/apache/spark/pull/21131#discussion_r183775077
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
   taskScheduler.initialize(new FakeSchedulerBackend)
 }
   }
+
+  test("Completions in zombie tasksets update status of non-zombie 
taskset") {
+val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+val valueSer = SparkEnv.get.serializer.newInstance()
+
+def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): 
Unit = {
+  val indexInTsm = tsm.partitionToIndex(partition)
+  val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == 
indexInTsm).head
+  val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
+  tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
+}
+
+// Submit a task set, have it fail with a fetch failed, and then 
re-submit the task attempt,
+// two times, so we have three active task sets for one stage.  (For 
this to really happen,
+// you'd need the previous stage to also get restarted, and then 
succeed, in between each
+// attempt, but that happens outside what we're mocking here.)
+val zombieAttempts = (0 until 2).map { stageAttempt =>
+  val attempt = FakeTask.createTaskSet(10, stageAttemptId = 
stageAttempt)
+  taskScheduler.submitTasks(attempt)
+  val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
+  val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+  taskScheduler.resourceOffers(offers)
+  assert(tsm.runningTasks === 10)
+  if (stageAttempt < 2) {
--- End diff --

This condition is not needed as the stageAttempt iterates on Range(0, 1).  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

2018-04-24 Thread attilapiros
Github user attilapiros commented on a diff in the pull request:

https://github.com/apache/spark/pull/21131#discussion_r183774355
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
   taskScheduler.initialize(new FakeSchedulerBackend)
 }
   }
+
+  test("Completions in zombie tasksets update status of non-zombie 
taskset") {
+val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+val valueSer = SparkEnv.get.serializer.newInstance()
+
+def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): 
Unit = {
+  val indexInTsm = tsm.partitionToIndex(partition)
+  val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == 
indexInTsm).head
+  val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
+  tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
+}
+
+// Submit a task set, have it fail with a fetch failed, and then 
re-submit the task attempt,
+// two times, so we have three active task sets for one stage.  (For 
this to really happen,
+// you'd need the previous stage to also get restarted, and then 
succeed, in between each
+// attempt, but that happens outside what we're mocking here.)
+val zombieAttempts = (0 until 2).map { stageAttempt =>
+  val attempt = FakeTask.createTaskSet(10, stageAttemptId = 
stageAttempt)
+  taskScheduler.submitTasks(attempt)
+  val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
+  val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+  taskScheduler.resourceOffers(offers)
+  assert(tsm.runningTasks === 10)
+  if (stageAttempt < 2) {
+// fail attempt
+tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, 
TaskState.FAILED,
+  FetchFailed(null, 0, 0, 0, "fetch failed"))
+// the attempt is a zombie, but the tasks are still running (this 
could be true even if
+// we actively killed those tasks, as killing is best-effort)
+assert(tsm.isZombie)
+assert(tsm.runningTasks === 9)
+  }
+  tsm
+}
+
+// we've now got 2 zombie attempts, each with 9 tasks still active.  
Submit the 3rd attempt for
+// the stage, but this time with insufficient resources so not all 
tasks are active.
+
+val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
+taskScheduler.submitTasks(finalAttempt)
+val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
+val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+val finalAttemptLaunchedPartitions = 
taskScheduler.resourceOffers(offers).flatten.map { task =>
+  finalAttempt.tasks(task.index).partitionId
+}.toSet
+assert(finalTsm.runningTasks === 5)
+assert(!finalTsm.isZombie)
+
+// We simulate late completions from our zombie tasksets, 
corresponding to all the pending
+// partitions in our final attempt.  This means we're only waiting on 
the tasks we've already
+// launched.
+val finalAttemptPendingPartitions = (0 until 
10).toSet.diff(finalAttemptLaunchedPartitions)
+finalAttemptPendingPartitions.foreach { partition =>
+  completeTaskSuccessfully(zombieAttempts(0), partition)
+}
+
+// If there is another resource offer, we shouldn't run anything.  
Though our final attempt
+// used to have pending tasks, now those tasks have been completed by 
zombie attempts.  The
+// remaining tasks to compute are already active in the non-zombie 
attempt.
+assert(
+  taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", 
"host-1", 1))).flatten.isEmpty)
+
+val allTaskSets = zombieAttempts ++ Seq(finalTsm)
+val remainingTasks = (0 until 
10).toSet.diff(finalAttemptPendingPartitions)
+
+// finally, if we finish the remaining partitions from a mix of 
tasksets, all attempts should be
+// marked as zombie.
+// for each of the remaining tasks, find the tasksets with an active 
copy of the task, and
+// finish the task.
+remainingTasks.foreach { partition =>
+  val tsm = if (partition == 0) {
+// we failed this task on both zombie attempts, this one is only 
present in the latest
+// taskset
+finalTsm
+  } else {
+// should be active in every taskset.  We choose a zombie taskset 
just to make sure that
+// we transition the active taskset correctly even if the final 
completion comes
+// from a zombie.
+zombieAttempts(partit

[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

2018-04-24 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21131#discussion_r183775539
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
   taskScheduler.initialize(new FakeSchedulerBackend)
 }
   }
+
+  test("Completions in zombie tasksets update status of non-zombie 
taskset") {
+val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+val valueSer = SparkEnv.get.serializer.newInstance()
+
+def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): 
Unit = {
+  val indexInTsm = tsm.partitionToIndex(partition)
+  val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == 
indexInTsm).head
+  val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
+  tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
+}
+
+// Submit a task set, have it fail with a fetch failed, and then 
re-submit the task attempt,
+// two times, so we have three active task sets for one stage.  (For 
this to really happen,
+// you'd need the previous stage to also get restarted, and then 
succeed, in between each
+// attempt, but that happens outside what we're mocking here.)
+val zombieAttempts = (0 until 2).map { stageAttempt =>
+  val attempt = FakeTask.createTaskSet(10, stageAttemptId = 
stageAttempt)
+  taskScheduler.submitTasks(attempt)
+  val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
+  val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+  taskScheduler.resourceOffers(offers)
+  assert(tsm.runningTasks === 10)
+  if (stageAttempt < 2) {
+// fail attempt
+tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, 
TaskState.FAILED,
+  FetchFailed(null, 0, 0, 0, "fetch failed"))
+// the attempt is a zombie, but the tasks are still running (this 
could be true even if
+// we actively killed those tasks, as killing is best-effort)
+assert(tsm.isZombie)
+assert(tsm.runningTasks === 9)
+  }
+  tsm
+}
+
+// we've now got 2 zombie attempts, each with 9 tasks still active.  
Submit the 3rd attempt for
+// the stage, but this time with insufficient resources so not all 
tasks are active.
+
+val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
+taskScheduler.submitTasks(finalAttempt)
+val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
+val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+val finalAttemptLaunchedPartitions = 
taskScheduler.resourceOffers(offers).flatten.map { task =>
+  finalAttempt.tasks(task.index).partitionId
+}.toSet
+assert(finalTsm.runningTasks === 5)
+assert(!finalTsm.isZombie)
+
+// We simulate late completions from our zombie tasksets, 
corresponding to all the pending
+// partitions in our final attempt.  This means we're only waiting on 
the tasks we've already
+// launched.
+val finalAttemptPendingPartitions = (0 until 
10).toSet.diff(finalAttemptLaunchedPartitions)
+finalAttemptPendingPartitions.foreach { partition =>
+  completeTaskSuccessfully(zombieAttempts(0), partition)
+}
+
+// If there is another resource offer, we shouldn't run anything.  
Though our final attempt
+// used to have pending tasks, now those tasks have been completed by 
zombie attempts.  The
+// remaining tasks to compute are already active in the non-zombie 
attempt.
+assert(
+  taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", 
"host-1", 1))).flatten.isEmpty)
+
+val allTaskSets = zombieAttempts ++ Seq(finalTsm)
+val remainingTasks = (0 until 
10).toSet.diff(finalAttemptPendingPartitions)
+
+// finally, if we finish the remaining partitions from a mix of 
tasksets, all attempts should be
+// marked as zombie.
+// for each of the remaining tasks, find the tasksets with an active 
copy of the task, and
+// finish the task.
+remainingTasks.foreach { partition =>
+  val tsm = if (partition == 0) {
+// we failed this task on both zombie attempts, this one is only 
present in the latest
+// taskset
+finalTsm
+  } else {
+// should be active in every taskset.  We choose a zombie taskset 
just to make sure that
+// we transition the active taskset correctly even if the final 
completion comes
+// from a zombie.
+zombieAttempts(partition %

[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

2018-04-24 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21131#discussion_r183766277
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
   taskScheduler.initialize(new FakeSchedulerBackend)
 }
   }
+
+  test("Completions in zombie tasksets update status of non-zombie 
taskset") {
+val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+val valueSer = SparkEnv.get.serializer.newInstance()
+
+def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): 
Unit = {
+  val indexInTsm = tsm.partitionToIndex(partition)
+  val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == 
indexInTsm).head
+  val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
+  tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
+}
+
+// Submit a task set, have it fail with a fetch failed, and then 
re-submit the task attempt,
+// two times, so we have three active task sets for one stage.  (For 
this to really happen,
+// you'd need the previous stage to also get restarted, and then 
succeed, in between each
+// attempt, but that happens outside what we're mocking here.)
+val zombieAttempts = (0 until 2).map { stageAttempt =>
+  val attempt = FakeTask.createTaskSet(10, stageAttemptId = 
stageAttempt)
+  taskScheduler.submitTasks(attempt)
+  val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
+  val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+  taskScheduler.resourceOffers(offers)
+  assert(tsm.runningTasks === 10)
+  if (stageAttempt < 2) {
+// fail attempt
+tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, 
TaskState.FAILED,
+  FetchFailed(null, 0, 0, 0, "fetch failed"))
+// the attempt is a zombie, but the tasks are still running (this 
could be true even if
+// we actively killed those tasks, as killing is best-effort)
+assert(tsm.isZombie)
+assert(tsm.runningTasks === 9)
+  }
+  tsm
+}
+
+// we've now got 2 zombie attempts, each with 9 tasks still active.  
Submit the 3rd attempt for
+// the stage, but this time with insufficient resources so not all 
tasks are active.
+
+val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
+taskScheduler.submitTasks(finalAttempt)
+val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
+val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+val finalAttemptLaunchedPartitions = 
taskScheduler.resourceOffers(offers).flatten.map { task =>
+  finalAttempt.tasks(task.index).partitionId
+}.toSet
+assert(finalTsm.runningTasks === 5)
+assert(!finalTsm.isZombie)
+
+// We simulate late completions from our zombie tasksets, 
corresponding to all the pending
+// partitions in our final attempt.  This means we're only waiting on 
the tasks we've already
+// launched.
+val finalAttemptPendingPartitions = (0 until 
10).toSet.diff(finalAttemptLaunchedPartitions)
+finalAttemptPendingPartitions.foreach { partition =>
+  completeTaskSuccessfully(zombieAttempts(0), partition)
+}
+
+// If there is another resource offer, we shouldn't run anything.  
Though our final attempt
+// used to have pending tasks, now those tasks have been completed by 
zombie attempts.  The
+// remaining tasks to compute are already active in the non-zombie 
attempt.
+assert(
+  taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", 
"host-1", 1))).flatten.isEmpty)
+
+val allTaskSets = zombieAttempts ++ Seq(finalTsm)
+val remainingTasks = (0 until 
10).toSet.diff(finalAttemptPendingPartitions)
+
+// finally, if we finish the remaining partitions from a mix of 
tasksets, all attempts should be
+// marked as zombie.
+// for each of the remaining tasks, find the tasksets with an active 
copy of the task, and
+// finish the task.
+remainingTasks.foreach { partition =>
+  val tsm = if (partition == 0) {
+// we failed this task on both zombie attempts, this one is only 
present in the latest
+// taskset
+finalTsm
+  } else {
+// should be active in every taskset.  We choose a zombie taskset 
just to make sure that
+// we transition the active taskset correctly even if the final 
completion comes
+// from a zombie.
+zombieAttempts(partit

[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

2018-04-24 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21131#discussion_r183701269
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
   taskScheduler.initialize(new FakeSchedulerBackend)
 }
   }
+
+  test("Completions in zombie tasksets update status of non-zombie 
taskset") {
+val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+val valueSer = SparkEnv.get.serializer.newInstance()
+
+def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): 
Unit = {
+  val indexInTsm = tsm.partitionToIndex(partition)
+  val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == 
indexInTsm).head
+  val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
+  tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
+}
+
+// Submit a task set, have it fail with a fetch failed, and then 
re-submit the task attempt,
+// two times, so we have three active task sets for one stage.  (For 
this to really happen,
+// you'd need the previous stage to also get restarted, and then 
succeed, in between each
+// attempt, but that happens outside what we're mocking here.)
+val zombieAttempts = (0 until 2).map { stageAttempt =>
+  val attempt = FakeTask.createTaskSet(10, stageAttemptId = 
stageAttempt)
+  taskScheduler.submitTasks(attempt)
+  val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
+  val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+  taskScheduler.resourceOffers(offers)
+  assert(tsm.runningTasks === 10)
+  if (stageAttempt < 2) {
+// fail attempt
+tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, 
TaskState.FAILED,
+  FetchFailed(null, 0, 0, 0, "fetch failed"))
+// the attempt is a zombie, but the tasks are still running (this 
could be true even if
+// we actively killed those tasks, as killing is best-effort)
+assert(tsm.isZombie)
+assert(tsm.runningTasks === 9)
+  }
+  tsm
+}
+
+// we've now got 2 zombie attempts, each with 9 tasks still active.  
Submit the 3rd attempt for
+// the stage, but this time with insufficient resources so not all 
tasks are active.
+
+val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
+taskScheduler.submitTasks(finalAttempt)
+val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
+val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+val finalAttemptLaunchedPartitions = 
taskScheduler.resourceOffers(offers).flatten.map { task =>
--- End diff --

Yet, launched tasks has nothing to do with other running tasks in other 
`TaskSet`s. But, is it possible to take those running tasks into consideration 
when launch a new task (in source code) ? For example,  launching FetchFailed 
task or tasks who do not have a running copy across `TaskSet`s firstly ?

(But, it seems we will always have running copies in other `TaskSet`s for 
our  final `TaskSet`, except FetchFailed task, right? It's more like we are not 
talking about resubmitting a stage, but resubmitting tasks who do not have 
running copies across previous `TaskSet`s.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

2018-04-24 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21131#discussion_r183690133
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
   taskScheduler.initialize(new FakeSchedulerBackend)
 }
   }
+
+  test("Completions in zombie tasksets update status of non-zombie 
taskset") {
+val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+val valueSer = SparkEnv.get.serializer.newInstance()
+
+def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): 
Unit = {
+  val indexInTsm = tsm.partitionToIndex(partition)
+  val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == 
indexInTsm).head
+  val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
+  tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
+}
+
+// Submit a task set, have it fail with a fetch failed, and then 
re-submit the task attempt,
+// two times, so we have three active task sets for one stage.  (For 
this to really happen,
+// you'd need the previous stage to also get restarted, and then 
succeed, in between each
+// attempt, but that happens outside what we're mocking here.)
+val zombieAttempts = (0 until 2).map { stageAttempt =>
+  val attempt = FakeTask.createTaskSet(10, stageAttemptId = 
stageAttempt)
+  taskScheduler.submitTasks(attempt)
+  val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
+  val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+  taskScheduler.resourceOffers(offers)
+  assert(tsm.runningTasks === 10)
+  if (stageAttempt < 2) {
+// fail attempt
+tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, 
TaskState.FAILED,
+  FetchFailed(null, 0, 0, 0, "fetch failed"))
+// the attempt is a zombie, but the tasks are still running (this 
could be true even if
+// we actively killed those tasks, as killing is best-effort)
+assert(tsm.isZombie)
+assert(tsm.runningTasks === 9)
+  }
+  tsm
+}
+
+// we've now got 2 zombie attempts, each with 9 tasks still active.  
Submit the 3rd attempt for
+// the stage, but this time with insufficient resources so not all 
tasks are active.
+
+val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
+taskScheduler.submitTasks(finalAttempt)
+val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
+val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", 
s"host-$idx", 1) }
+val finalAttemptLaunchedPartitions = 
taskScheduler.resourceOffers(offers).flatten.map { task =>
+  finalAttempt.tasks(task.index).partitionId
+}.toSet
+assert(finalTsm.runningTasks === 5)
+assert(!finalTsm.isZombie)
+
+// We simulate late completions from our zombie tasksets, 
corresponding to all the pending
+// partitions in our final attempt.  This means we're only waiting on 
the tasks we've already
+// launched.
+val finalAttemptPendingPartitions = (0 until 
10).toSet.diff(finalAttemptLaunchedPartitions)
+finalAttemptPendingPartitions.foreach { partition =>
+  completeTaskSuccessfully(zombieAttempts(0), partition)
+}
+
+// If there is another resource offer, we shouldn't run anything.  
Though our final attempt
+// used to have pending tasks, now those tasks have been completed by 
zombie attempts.  The
+// remaining tasks to compute are already active in the non-zombie 
attempt.
+assert(
+  taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", 
"host-1", 1))).flatten.isEmpty)
+
+val allTaskSets = zombieAttempts ++ Seq(finalTsm)
+val remainingTasks = (0 until 
10).toSet.diff(finalAttemptPendingPartitions)
+
+// finally, if we finish the remaining partitions from a mix of 
tasksets, all attempts should be
+// marked as zombie.
+// for each of the remaining tasks, find the tasksets with an active 
copy of the task, and
+// finish the task.
+remainingTasks.foreach { partition =>
+  val tsm = if (partition == 0) {
+// we failed this task on both zombie attempts, this one is only 
present in the latest
+// taskset
+finalTsm
+  } else {
+// should be active in every taskset.  We choose a zombie taskset 
just to make sure that
+// we transition the active taskset correctly even if the final 
completion comes
+// from a zombie.
+zombieAttempts(partition 

[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

2018-04-24 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21131#discussion_r183619646
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -689,6 +689,20 @@ private[spark] class TaskSchedulerImpl(
 }
   }
 
+  /**
+   * Marks the task has completed in all TaskSetManagers for the given 
stage.
+   *
+   * After stage failure and retry, there may be multiple active 
TaskSetManagers for the stage.
--- End diff --

IIRC, there's only one active `TaskSetManager` for a given stage, and with 
some zombie `TaskSetManager`s possibly. Though, there may be some running tasks 
in zombie `TaskSetManager`s.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

2018-04-24 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21131#discussion_r183619704
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -689,6 +689,20 @@ private[spark] class TaskSchedulerImpl(
 }
   }
 
+  /**
+   * Marks the task has completed in all TaskSetManagers for the given 
stage.
+   *
+   * After stage failure and retry, there may be multiple active 
TaskSetManagers for the stage.
+   * If an earlier attempt of a stage completes a task, we should ensure 
that the later attempts
+   * do not also submit those same tasks.  That also means that a task 
completion from an  earlier
+   * attempt can lead to the entire stage getting marked as successful.
+   */
+  private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, 
partitionId: Int) = {
+taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { 
tsm =>
--- End diff --

Generally, it seems impossible for a unfinished `TaskSet` to get an empty 
`Map()` in `taskSetsByStageIdAndAttempt` .  But, if it does, maybe, we can tell 
the caller the stage has already finished.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

2018-04-23 Thread squito
GitHub user squito opened a pull request:

https://github.com/apache/spark/pull/21131

[SPARK-23433][CORE] Late zombie task completions update all tasksets

Fetch failure lead to multiple tasksets which are active for a given
stage.  A late completion from an earlier attempt of the stage
should update the most recent attempt for the stage, so it does 
not try to submit another task for the same partition, and so that
 it knows when it is completed and when it should be marked as
a "zombie".

Added a regression test.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/squito/spark SPARK-23433

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21131.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21131


commit 0720a7cd6826614e516c3d3a51bd4519259cbe3b
Author: Imran Rashid 
Date:   2018-02-21T20:21:14Z

[SPARK-23433][CORE] Late zombie task completions update all tasksets

After a fetch failure and stage retry, we may have multiple tasksets
which are active for a given stage.  A late completion from an earlier
attempt of the stage should update the most recent attempt for the
stage, so it does not try to submit another task for the same partition,
and so that it knows when it is completed.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org