[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-26 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r184462542
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * This tests the case where origin task success after speculative task 
got FetchFailed
+   * before.
+   */
+  test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore 
following " +
+"successful tasks") {
+// Create 3 RDDs with shuffle dependencies on each other: rddA <--- 
rddB <--- rddC
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = 
mapOutputTracker)
+
+submit(rddC, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task success
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
+  null))
+// Check currently missing partition.
+
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
+// The second result task self success soon.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
+// Missing partition number should not change, otherwise it will cause 
child stage
+// never succeed.
+
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
+  }
+
+  test("SPARK-23811: check ResultStage failed by FetchFailed can ignore 
following " +
+"successful tasks") {
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+submit(rddB, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task of rddB success
+assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]])
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
+  null))
+// Make sure failedStage is not empty now
+assert(scheduler.failedStages.nonEmpty)
+// The second result task self success soon.
+assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]])
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
+assertDataStructuresEmpty()
--- End diff --

Right. It is a check that we are cleaning up the contents of the 
DAGScheduler's data structures so that they do not grow without bound over time.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-25 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r184276403
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * This tests the case where origin task success after speculative task 
got FetchFailed
+   * before.
+   */
+  test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore 
following " +
+"successful tasks") {
+// Create 3 RDDs with shuffle dependencies on each other: rddA <--- 
rddB <--- rddC
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = 
mapOutputTracker)
+
+submit(rddC, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task success
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
+  null))
+// Check currently missing partition.
+
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
+// The second result task self success soon.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
+// Missing partition number should not change, otherwise it will cause 
child stage
+// never succeed.
+
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
+  }
+
+  test("SPARK-23811: check ResultStage failed by FetchFailed can ignore 
following " +
+"successful tasks") {
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+submit(rddB, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task of rddB success
+assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]])
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
+  null))
+// Make sure failedStage is not empty now
+assert(scheduler.failedStages.nonEmpty)
+// The second result task self success soon.
+assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]])
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
--- End diff --

Yep, you're right. The success completely event in UT was treated as normal 
success task. I fixed this by ignore this event at the beginning of 
handleTaskCompletion.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-25 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r184274946
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * This tests the case where origin task success after speculative task 
got FetchFailed
+   * before.
+   */
+  test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore 
following " +
+"successful tasks") {
+// Create 3 RDDs with shuffle dependencies on each other: rddA <--- 
rddB <--- rddC
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = 
mapOutputTracker)
+
+submit(rddC, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task success
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
+  null))
+// Check currently missing partition.
+
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
+// The second result task self success soon.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
+// Missing partition number should not change, otherwise it will cause 
child stage
+// never succeed.
+
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
+  }
+
+  test("SPARK-23811: check ResultStage failed by FetchFailed can ignore 
following " +
+"successful tasks") {
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+submit(rddB, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task of rddB success
+assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]])
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
+  null))
+// Make sure failedStage is not empty now
+assert(scheduler.failedStages.nonEmpty)
+// The second result task self success soon.
+assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]])
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
+assertDataStructuresEmpty()
--- End diff --

Ah, it's used for check job successful complete and all temp structure 
empty.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r184266100
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * This tests the case where origin task success after speculative task 
got FetchFailed
+   * before.
+   */
+  test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore 
following " +
+"successful tasks") {
+// Create 3 RDDs with shuffle dependencies on each other: rddA <--- 
rddB <--- rddC
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = 
mapOutputTracker)
+
+submit(rddC, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task success
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
+  null))
+// Check currently missing partition.
+
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
+// The second result task self success soon.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
+// Missing partition number should not change, otherwise it will cause 
child stage
+// never succeed.
+
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
+  }
+
+  test("SPARK-23811: check ResultStage failed by FetchFailed can ignore 
following " +
+"successful tasks") {
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+submit(rddB, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task of rddB success
+assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]])
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
+  null))
+// Make sure failedStage is not empty now
+assert(scheduler.failedStages.nonEmpty)
+// The second result task self success soon.
+assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]])
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
--- End diff --

and it seems Spark will wrongly isssue a job end event, can you check it in 
the test?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r184265979
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * This tests the case where origin task success after speculative task 
got FetchFailed
+   * before.
+   */
+  test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore 
following " +
+"successful tasks") {
+// Create 3 RDDs with shuffle dependencies on each other: rddA <--- 
rddB <--- rddC
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = 
mapOutputTracker)
+
+submit(rddC, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task success
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
+  null))
+// Check currently missing partition.
+
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
+// The second result task self success soon.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
+// Missing partition number should not change, otherwise it will cause 
child stage
+// never succeed.
+
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
+  }
+
+  test("SPARK-23811: check ResultStage failed by FetchFailed can ignore 
following " +
+"successful tasks") {
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+submit(rddB, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task of rddB success
+assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]])
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
+  null))
+// Make sure failedStage is not empty now
+assert(scheduler.failedStages.nonEmpty)
+// The second result task self success soon.
+assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]])
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
--- End diff --

> INFO DAGScheduler: ResultStage 1 () finished in 0.136 s

This is unexpected, isn't it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r184260815
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * This tests the case where origin task success after speculative task 
got FetchFailed
+   * before.
+   */
+  test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore 
following " +
+"successful tasks") {
+// Create 3 RDDs with shuffle dependencies on each other: rddA <--- 
rddB <--- rddC
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = 
mapOutputTracker)
+
+submit(rddC, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task success
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
+  null))
+// Check currently missing partition.
+
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
+// The second result task self success soon.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
+// Missing partition number should not change, otherwise it will cause 
child stage
+// never succeed.
+
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
+  }
+
+  test("SPARK-23811: check ResultStage failed by FetchFailed can ignore 
following " +
+"successful tasks") {
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+submit(rddB, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task of rddB success
+assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]])
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
+  null))
+// Make sure failedStage is not empty now
+assert(scheduler.failedStages.nonEmpty)
+// The second result task self success soon.
+assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]])
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
+assertDataStructuresEmpty()
--- End diff --

I mean the last line, `assertDataStructuresEmpty`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-25 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r184260597
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * This tests the case where origin task success after speculative task 
got FetchFailed
+   * before.
+   */
+  test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore 
following " +
+"successful tasks") {
+// Create 3 RDDs with shuffle dependencies on each other: rddA <--- 
rddB <--- rddC
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = 
mapOutputTracker)
+
+submit(rddC, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task success
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
+  null))
+// Check currently missing partition.
+
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
+// The second result task self success soon.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
+// Missing partition number should not change, otherwise it will cause 
child stage
+// never succeed.
+
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
+  }
+
+  test("SPARK-23811: check ResultStage failed by FetchFailed can ignore 
following " +
+"successful tasks") {
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+submit(rddB, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task of rddB success
+assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]])
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
+  null))
+// Make sure failedStage is not empty now
+assert(scheduler.failedStages.nonEmpty)
+// The second result task self success soon.
+assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]])
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
--- End diff --

The success task will be ignored by 
`OutputCommitCoordinator.taskCompleted`, in the taskCompleted logic, 
stageStates.getOrElse will return because the current stage is in failed set. 
The detailed log providing below:
```
18/04/26 10:50:24.524 ScalaTest-run-running-DAGSchedulerSuite INFO 
DAGScheduler: Resubmitting ShuffleMapStage 0 (RDD at 
DAGSchedulerSuite.scala:74) and ResultStage 1 () due to fetch failure
18/04/26 10:50:24.535 ScalaTest-run-running-DAGSchedulerSuite DEBUG 
DAGSchedulerSuite$$anon$6: Increasing epoch to 2
18/04/26 10:50:24.538 ScalaTest-run-running-DAGSchedulerSuite INFO 
DAGScheduler: Executor lost: exec-hostA (epoch 1)
18/04/26 10:50:24.540 ScalaTest-run-running-DAGSchedulerSuite INFO 
DAGScheduler: Shuffle files lost for executor: exec-hostA (epoch 1)
18/04/26 10:50:24.545 ScalaTest-run-running-DAGSchedulerSuite DEBUG 
DAGSchedulerSuite$$anon$6: Increasing epoch to 3
18/04/26 10:50:24.552 

[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-25 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r184260210
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * This tests the case where origin task success after speculative task 
got FetchFailed
+   * before.
+   */
+  test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore 
following " +
+"successful tasks") {
+// Create 3 RDDs with shuffle dependencies on each other: rddA <--- 
rddB <--- rddC
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = 
mapOutputTracker)
+
+submit(rddC, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task success
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
+  null))
+// Check currently missing partition.
+
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
+// The second result task self success soon.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
+// Missing partition number should not change, otherwise it will cause 
child stage
+// never succeed.
+
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
+  }
+
+  test("SPARK-23811: check ResultStage failed by FetchFailed can ignore 
following " +
+"successful tasks") {
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+submit(rddB, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task of rddB success
+assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]])
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
+  null))
+// Make sure failedStage is not empty now
+assert(scheduler.failedStages.nonEmpty)
+// The second result task self success soon.
+assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]])
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
+assertDataStructuresEmpty()
--- End diff --

I add this test for answering your previous question "Can you simulate what 
happens to result task if FechFaileded comes before task success?". This test 
can pass without my code changing in DAGScheduler.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r184255020
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * This tests the case where origin task success after speculative task 
got FetchFailed
+   * before.
+   */
+  test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore 
following " +
+"successful tasks") {
+// Create 3 RDDs with shuffle dependencies on each other: rddA <--- 
rddB <--- rddC
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = 
mapOutputTracker)
+
+submit(rddC, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task success
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
+  null))
+// Check currently missing partition.
+
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
+// The second result task self success soon.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
+// Missing partition number should not change, otherwise it will cause 
child stage
+// never succeed.
+
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
+  }
+
+  test("SPARK-23811: check ResultStage failed by FetchFailed can ignore 
following " +
+"successful tasks") {
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+submit(rddB, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task of rddB success
+assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]])
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
+  null))
+// Make sure failedStage is not empty now
+assert(scheduler.failedStages.nonEmpty)
+// The second result task self success soon.
+assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]])
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
--- End diff --

where is the code in `DAGScheduler` we ignore this task?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r184254856
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * This tests the case where origin task success after speculative task 
got FetchFailed
+   * before.
+   */
+  test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore 
following " +
+"successful tasks") {
+// Create 3 RDDs with shuffle dependencies on each other: rddA <--- 
rddB <--- rddC
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = 
mapOutputTracker)
+
+submit(rddC, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task success
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
+  null))
+// Check currently missing partition.
+
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
+// The second result task self success soon.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
+// Missing partition number should not change, otherwise it will cause 
child stage
+// never succeed.
+
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
+  }
+
+  test("SPARK-23811: check ResultStage failed by FetchFailed can ignore 
following " +
+"successful tasks") {
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+submit(rddB, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task of rddB success
+assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]])
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
+  null))
+// Make sure failedStage is not empty now
+assert(scheduler.failedStages.nonEmpty)
+// The second result task self success soon.
+assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]])
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
+assertDataStructuresEmpty()
--- End diff --

what does this test?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-25 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r184109204
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1266,6 +1266,9 @@ class DAGScheduler(
 }
 if (failedEpoch.contains(execId) && smt.epoch <= 
failedEpoch(execId)) {
   logInfo(s"Ignoring possibly bogus $smt completion from 
executor $execId")
+} else if (failedStages.contains(shuffleStage)) {
--- End diff --

Added the UT for simulating this scenario happens to result task.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-20 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r183198368
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1266,6 +1266,9 @@ class DAGScheduler(
 }
 if (failedEpoch.contains(execId) && smt.epoch <= 
failedEpoch(execId)) {
   logInfo(s"Ignoring possibly bogus $smt completion from 
executor $execId")
+} else if (failedStages.contains(shuffleStage)) {
--- End diff --

>Sorry I may nitpick here.

No, that's necessary, I should have to make sure about this, thanks for 
your advice! :)

> Can you simulate what happens to result task if FechFaileded comes before 
task success?

Sure, but it maybe hardly to reproduce this in real env, I'll try to fake 
it on UT first ASAP.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r182340309
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1266,6 +1266,9 @@ class DAGScheduler(
 }
 if (failedEpoch.contains(execId) && smt.epoch <= 
failedEpoch(execId)) {
   logInfo(s"Ignoring possibly bogus $smt completion from 
executor $execId")
+} else if (failedStages.contains(shuffleStage)) {
--- End diff --

seems we may mistakenly mark a job as finished?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r182338127
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1266,6 +1266,9 @@ class DAGScheduler(
 }
 if (failedEpoch.contains(execId) && smt.epoch <= 
failedEpoch(execId)) {
   logInfo(s"Ignoring possibly bogus $smt completion from 
executor $execId")
+} else if (failedStages.contains(shuffleStage)) {
--- End diff --

Sorry I may nitpick kere. Can you simulate what happens to result task if 
FechFaileded comes before task success?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r182336811
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1266,6 +1266,9 @@ class DAGScheduler(
 }
 if (failedEpoch.contains(execId) && smt.epoch <= 
failedEpoch(execId)) {
   logInfo(s"Ignoring possibly bogus $smt completion from 
executor $execId")
+} else if (failedStages.contains(shuffleStage)) {
--- End diff --

ResultStage usually has children. e.g. rdd1 depends on rdd2 with a shuffle 
dependency. Then the job has 2 stages, the result stage depends on the shuffle 
map stage.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-17 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r182309137
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1266,6 +1266,9 @@ class DAGScheduler(
 }
 if (failedEpoch.contains(execId) && smt.epoch <= 
failedEpoch(execId)) {
   logInfo(s"Ignoring possibly bogus $smt completion from 
executor $execId")
+} else if (failedStages.contains(shuffleStage)) {
--- End diff --

This also confuse me before, as far as I'm concerned, the result task in 
such scenario(speculative task fail but original task success) is ok because it 
has no child stage, we can use the success task's result and 
`markStageAsFinished`. But for shuffle map task, it will cause inconformity 
between mapOutputTracker and stage's pendingPartitions, it must fix.
I'm not sure of ResultTask's behavior, can you give some advice?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-17 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r182308871
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2399,50 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * This tests the case where origin task success after speculative task 
got FetchFailed
+   * before.
+   */
+  test("[SPARK-23811] FetchFailed comes before Success of same task will 
cause child stage" +
+" never succeed") {
+// Create 3 RDDs with shuffle dependencies on each other: rddA <--- 
rddB <--- rddC
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = 
mapOutputTracker)
+
+submit(rddC, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task success
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
--- End diff --

Maybe, you can `runEvent(SpeculativeTaskSubmitted)` first to simulate a 
speculative task submitted before you `runEvent(makeCompletetionEvent())`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-17 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r182307786
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2399,50 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * This tests the case where origin task success after speculative task 
got FetchFailed
+   * before.
+   */
+  test("[SPARK-23811] FetchFailed comes before Success of same task will 
cause child stage" +
+" never succeed") {
--- End diff --

Thanks, I'll change it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-17 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r182307728
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2399,50 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * This tests the case where origin task success after speculative task 
got FetchFailed
+   * before.
+   */
+  test("[SPARK-23811] FetchFailed comes before Success of same task will 
cause child stage" +
+" never succeed") {
+// Create 3 RDDs with shuffle dependencies on each other: rddA <--- 
rddB <--- rddC
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = 
mapOutputTracker)
+
+submit(rddC, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task success
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
--- End diff --

Here we only need to mock the speculative task failed event came before 
success event, `makeCompletionEvent` with same taskSets's task can achieve such 
goal. This also use in `task events always posted in speculation / when stage 
is killed`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r182299915
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1266,6 +1266,9 @@ class DAGScheduler(
 }
 if (failedEpoch.contains(execId) && smt.epoch <= 
failedEpoch(execId)) {
   logInfo(s"Ignoring possibly bogus $smt completion from 
executor $execId")
+} else if (failedStages.contains(shuffleStage)) {
--- End diff --

Why we only have a problem with shuffle map task not result task?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r182299803
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2399,50 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * This tests the case where origin task success after speculative task 
got FetchFailed
+   * before.
+   */
+  test("[SPARK-23811] FetchFailed comes before Success of same task will 
cause child stage" +
+" never succeed") {
+// Create 3 RDDs with shuffle dependencies on each other: rddA <--- 
rddB <--- rddC
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = 
mapOutputTracker)
+
+submit(rddC, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task success
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
--- End diff --

Sorry I'm not very familiar with this test suite, how can you tell it's a 
speculative task?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r182299503
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2399,50 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * This tests the case where origin task success after speculative task 
got FetchFailed
+   * before.
+   */
+  test("[SPARK-23811] FetchFailed comes before Success of same task will 
cause child stage" +
+" never succeed") {
--- End diff --

nit: the test name should describe the expected behavior not the wrong one.
`SPARK-23811: staged failed by FetchFailed should ignore following 
successful tasks`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-17 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r182296297
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -750,6 +752,10 @@ private[spark] class TaskSetManager(
   if (tasksSuccessful == numTasks) {
 isZombie = true
   }
+} else if (fetchFailedTaskIndexSet.contains(index)) {
+  logInfo("Ignoring task-finished event for " + info.id + " in stage " 
+ taskSet.id +
+" because task " + index + " has already failed by FetchFailed")
+  return
--- End diff --

Yep, as @cloud-fan 's suggestion, handle this in `DAGScheduler` is a better 
choice.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-17 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r182294068
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -750,6 +752,10 @@ private[spark] class TaskSetManager(
   if (tasksSuccessful == numTasks) {
 isZombie = true
   }
+} else if (fetchFailedTaskIndexSet.contains(index)) {
--- End diff --

Great thanks for you two's guidance guidance, that's more clear and the UT 
add for reproducing this problem can also used for checking it!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r181776729
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -750,6 +752,10 @@ private[spark] class TaskSetManager(
   if (tasksSuccessful == numTasks) {
 isZombie = true
   }
+} else if (fetchFailedTaskIndexSet.contains(index)) {
--- End diff --

We should handle this case in DAGScheduler, then we can look up the stage 
by task id, and see if the stage is failed or not. Then we don't need 
`fetchFailedTaskIndexSet`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-16 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r181729645
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -750,6 +752,10 @@ private[spark] class TaskSetManager(
   if (tasksSuccessful == numTasks) {
 isZombie = true
   }
+} else if (fetchFailedTaskIndexSet.contains(index)) {
+  logInfo("Ignoring task-finished event for " + info.id + " in stage " 
+ taskSet.id +
+" because task " + index + " has already failed by FetchFailed")
+  return
--- End diff --

We can not simply `return` here. And we should always send a task 
`CompletionEvent` to DAG, in case of there's any listeners are waiting for it. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-16 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r181732788
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -750,6 +752,10 @@ private[spark] class TaskSetManager(
   if (tasksSuccessful == numTasks) {
 isZombie = true
   }
+} else if (fetchFailedTaskIndexSet.contains(index)) {
+  logInfo("Ignoring task-finished event for " + info.id + " in stage " 
+ taskSet.id +
+" because task " + index + " has already failed by FetchFailed")
+  return
--- End diff --

Maybe, we can mark task as`FAILED` with `UnknownReason` here. And then, DAG 
will treat this task as no-op, and `registerMapOutput` will not be triggered. 
Though, it is not a elegant way.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-16 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r181674151
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -750,6 +752,10 @@ private[spark] class TaskSetManager(
   if (tasksSuccessful == numTasks) {
 isZombie = true
   }
+} else if (fetchFailedTaskIndexSet.contains(index)) {
--- End diff --

The change of `fetchFailedTaskIndexSet` is to ignore the task success event 
if the stage is marked as failed, as Wenchen's suggestion in before comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-16 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r181673508
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -794,6 +794,19 @@ private[spark] class TaskSetManager(
 fetchFailed.bmAddress.host, fetchFailed.bmAddress.executorId))
 }
 
+// Kill any other attempts for this FetchFailed task
+for (attemptInfo <- taskAttempts(index) if attemptInfo.running) {
+  logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task 
${attemptInfo.id} " +
+s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on 
${attemptInfo.host} " +
+s"as the attempt ${info.attemptNumber} failed because 
FetchFailed")
+  killedByOtherAttempt(index) = true
+  sched.backend.killTask(
--- End diff --

Got it, I remove the code and UT in next commit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-02 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r178537775
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -794,6 +794,19 @@ private[spark] class TaskSetManager(
 fetchFailed.bmAddress.host, fetchFailed.bmAddress.executorId))
 }
 
+// Kill any other attempts for this FetchFailed task
+for (attemptInfo <- taskAttempts(index) if attemptInfo.running) {
+  logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task 
${attemptInfo.id} " +
+s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on 
${attemptInfo.host} " +
+s"as the attempt ${info.attemptNumber} failed because 
FetchFailed")
+  killedByOtherAttempt(index) = true
+  sched.backend.killTask(
--- End diff --

I don't think so. Useless tasks should fail soon(FetchFailure usually means 
mapper is down).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-02 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r178528414
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -750,6 +752,10 @@ private[spark] class TaskSetManager(
   if (tasksSuccessful == numTasks) {
 isZombie = true
   }
+} else if (fetchFailedTaskIndexSet.contains(index)) {
--- End diff --

Why are you making this change? I don't quite get it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-02 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r178500184
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -794,6 +794,19 @@ private[spark] class TaskSetManager(
 fetchFailed.bmAddress.host, fetchFailed.bmAddress.executorId))
 }
 
+// Kill any other attempts for this FetchFailed task
+for (attemptInfo <- taskAttempts(index) if attemptInfo.running) {
+  logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task 
${attemptInfo.id} " +
+s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on 
${attemptInfo.host} " +
+s"as the attempt ${info.attemptNumber} failed because 
FetchFailed")
+  killedByOtherAttempt(index) = true
+  sched.backend.killTask(
--- End diff --

@jiangxb1987 Yes, ignore the finished event is necessary, maybe it's also 
needed to kill useless task?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-02 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r178499952
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -794,6 +794,19 @@ private[spark] class TaskSetManager(
 fetchFailed.bmAddress.host, fetchFailed.bmAddress.executorId))
 }
 
+// Kill any other attempts for this FetchFailed task
+for (attemptInfo <- taskAttempts(index) if attemptInfo.running) {
+  logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task 
${attemptInfo.id} " +
+s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on 
${attemptInfo.host} " +
+s"as the attempt ${info.attemptNumber} failed because 
FetchFailed")
+  killedByOtherAttempt(index) = true
+  sched.backend.killTask(
--- End diff --

@cloud-fan  Yes you're right, I should guarantee this in TaskSetManager.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-02 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r178494006
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -794,6 +794,19 @@ private[spark] class TaskSetManager(
 fetchFailed.bmAddress.host, fetchFailed.bmAddress.executorId))
 }
 
+// Kill any other attempts for this FetchFailed task
+for (attemptInfo <- taskAttempts(index) if attemptInfo.running) {
+  logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task 
${attemptInfo.id} " +
+s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on 
${attemptInfo.host} " +
+s"as the attempt ${info.attemptNumber} failed because 
FetchFailed")
+  killedByOtherAttempt(index) = true
+  sched.backend.killTask(
--- End diff --

This should not work. Maybe we shall just ignore the finished tasks 
submitted to a failed stage?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-01 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r178470893
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -794,6 +794,19 @@ private[spark] class TaskSetManager(
 fetchFailed.bmAddress.host, fetchFailed.bmAddress.executorId))
 }
 
+// Kill any other attempts for this FetchFailed task
+for (attemptInfo <- taskAttempts(index) if attemptInfo.running) {
+  logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task 
${attemptInfo.id} " +
+s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on 
${attemptInfo.host} " +
+s"as the attempt ${info.attemptNumber} failed because 
FetchFailed")
+  killedByOtherAttempt(index) = true
+  sched.backend.killTask(
--- End diff --

if this is async, we can't guarantee to not have task success events after 
marking staging as failed, right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org