[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r184462542 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * This tests the case where origin task success after speculative task got FetchFailed + * before. + */ + test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore following " + +"successful tasks") { +// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) + +submit(rddC, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task success +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Check currently missing partition. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) +// The second result task self success soon. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) +// Missing partition number should not change, otherwise it will cause child stage +// never succeed. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) + } + + test("SPARK-23811: check ResultStage failed by FetchFailed can ignore following " + +"successful tasks") { +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +submit(rddB, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task of rddB success +assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Make sure failedStage is not empty now +assert(scheduler.failedStages.nonEmpty) +// The second result task self success soon. +assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) +assertDataStructuresEmpty() --- End diff -- Right. It is a check that we are cleaning up the contents of the DAGScheduler's data structures so that they do not grow without bound over time. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r184276403 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * This tests the case where origin task success after speculative task got FetchFailed + * before. + */ + test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore following " + +"successful tasks") { +// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) + +submit(rddC, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task success +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Check currently missing partition. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) +// The second result task self success soon. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) +// Missing partition number should not change, otherwise it will cause child stage +// never succeed. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) + } + + test("SPARK-23811: check ResultStage failed by FetchFailed can ignore following " + +"successful tasks") { +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +submit(rddB, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task of rddB success +assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Make sure failedStage is not empty now +assert(scheduler.failedStages.nonEmpty) +// The second result task self success soon. +assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) --- End diff -- Yep, you're right. The success completely event in UT was treated as normal success task. I fixed this by ignore this event at the beginning of handleTaskCompletion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r184274946 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * This tests the case where origin task success after speculative task got FetchFailed + * before. + */ + test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore following " + +"successful tasks") { +// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) + +submit(rddC, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task success +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Check currently missing partition. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) +// The second result task self success soon. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) +// Missing partition number should not change, otherwise it will cause child stage +// never succeed. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) + } + + test("SPARK-23811: check ResultStage failed by FetchFailed can ignore following " + +"successful tasks") { +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +submit(rddB, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task of rddB success +assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Make sure failedStage is not empty now +assert(scheduler.failedStages.nonEmpty) +// The second result task self success soon. +assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) +assertDataStructuresEmpty() --- End diff -- Ah, it's used for check job successful complete and all temp structure empty. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r184266100 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * This tests the case where origin task success after speculative task got FetchFailed + * before. + */ + test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore following " + +"successful tasks") { +// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) + +submit(rddC, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task success +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Check currently missing partition. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) +// The second result task self success soon. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) +// Missing partition number should not change, otherwise it will cause child stage +// never succeed. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) + } + + test("SPARK-23811: check ResultStage failed by FetchFailed can ignore following " + +"successful tasks") { +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +submit(rddB, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task of rddB success +assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Make sure failedStage is not empty now +assert(scheduler.failedStages.nonEmpty) +// The second result task self success soon. +assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) --- End diff -- and it seems Spark will wrongly isssue a job end event, can you check it in the test? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r184265979 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * This tests the case where origin task success after speculative task got FetchFailed + * before. + */ + test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore following " + +"successful tasks") { +// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) + +submit(rddC, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task success +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Check currently missing partition. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) +// The second result task self success soon. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) +// Missing partition number should not change, otherwise it will cause child stage +// never succeed. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) + } + + test("SPARK-23811: check ResultStage failed by FetchFailed can ignore following " + +"successful tasks") { +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +submit(rddB, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task of rddB success +assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Make sure failedStage is not empty now +assert(scheduler.failedStages.nonEmpty) +// The second result task self success soon. +assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) --- End diff -- > INFO DAGScheduler: ResultStage 1 () finished in 0.136 s This is unexpected, isn't it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r184260815 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * This tests the case where origin task success after speculative task got FetchFailed + * before. + */ + test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore following " + +"successful tasks") { +// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) + +submit(rddC, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task success +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Check currently missing partition. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) +// The second result task self success soon. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) +// Missing partition number should not change, otherwise it will cause child stage +// never succeed. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) + } + + test("SPARK-23811: check ResultStage failed by FetchFailed can ignore following " + +"successful tasks") { +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +submit(rddB, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task of rddB success +assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Make sure failedStage is not empty now +assert(scheduler.failedStages.nonEmpty) +// The second result task self success soon. +assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) +assertDataStructuresEmpty() --- End diff -- I mean the last line, `assertDataStructuresEmpty` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r184260597 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * This tests the case where origin task success after speculative task got FetchFailed + * before. + */ + test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore following " + +"successful tasks") { +// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) + +submit(rddC, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task success +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Check currently missing partition. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) +// The second result task self success soon. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) +// Missing partition number should not change, otherwise it will cause child stage +// never succeed. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) + } + + test("SPARK-23811: check ResultStage failed by FetchFailed can ignore following " + +"successful tasks") { +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +submit(rddB, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task of rddB success +assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Make sure failedStage is not empty now +assert(scheduler.failedStages.nonEmpty) +// The second result task self success soon. +assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) --- End diff -- The success task will be ignored by `OutputCommitCoordinator.taskCompleted`, in the taskCompleted logic, stageStates.getOrElse will return because the current stage is in failed set. The detailed log providing below: ``` 18/04/26 10:50:24.524 ScalaTest-run-running-DAGSchedulerSuite INFO DAGScheduler: Resubmitting ShuffleMapStage 0 (RDD at DAGSchedulerSuite.scala:74) and ResultStage 1 () due to fetch failure 18/04/26 10:50:24.535 ScalaTest-run-running-DAGSchedulerSuite DEBUG DAGSchedulerSuite$$anon$6: Increasing epoch to 2 18/04/26 10:50:24.538 ScalaTest-run-running-DAGSchedulerSuite INFO DAGScheduler: Executor lost: exec-hostA (epoch 1) 18/04/26 10:50:24.540 ScalaTest-run-running-DAGSchedulerSuite INFO DAGScheduler: Shuffle files lost for executor: exec-hostA (epoch 1) 18/04/26 10:50:24.545 ScalaTest-run-running-DAGSchedulerSuite DEBUG DAGSchedulerSuite$$anon$6: Increasing epoch to 3 18/04/26 10:50:24.552 ScalaTest-ru
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r184260210 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * This tests the case where origin task success after speculative task got FetchFailed + * before. + */ + test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore following " + +"successful tasks") { +// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) + +submit(rddC, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task success +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Check currently missing partition. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) +// The second result task self success soon. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) +// Missing partition number should not change, otherwise it will cause child stage +// never succeed. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) + } + + test("SPARK-23811: check ResultStage failed by FetchFailed can ignore following " + +"successful tasks") { +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +submit(rddB, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task of rddB success +assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Make sure failedStage is not empty now +assert(scheduler.failedStages.nonEmpty) +// The second result task self success soon. +assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) +assertDataStructuresEmpty() --- End diff -- I add this test for answering your previous question "Can you simulate what happens to result task if FechFaileded comes before task success?". This test can pass without my code changing in DAGScheduler. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r184255020 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * This tests the case where origin task success after speculative task got FetchFailed + * before. + */ + test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore following " + +"successful tasks") { +// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) + +submit(rddC, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task success +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Check currently missing partition. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) +// The second result task self success soon. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) +// Missing partition number should not change, otherwise it will cause child stage +// never succeed. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) + } + + test("SPARK-23811: check ResultStage failed by FetchFailed can ignore following " + +"successful tasks") { +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +submit(rddB, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task of rddB success +assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Make sure failedStage is not empty now +assert(scheduler.failedStages.nonEmpty) +// The second result task self success soon. +assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) --- End diff -- where is the code in `DAGScheduler` we ignore this task? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r184254856 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * This tests the case where origin task success after speculative task got FetchFailed + * before. + */ + test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore following " + +"successful tasks") { +// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) + +submit(rddC, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task success +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Check currently missing partition. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) +// The second result task self success soon. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) +// Missing partition number should not change, otherwise it will cause child stage +// never succeed. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) + } + + test("SPARK-23811: check ResultStage failed by FetchFailed can ignore following " + +"successful tasks") { +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +submit(rddB, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task of rddB success +assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Make sure failedStage is not empty now +assert(scheduler.failedStages.nonEmpty) +// The second result task self success soon. +assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) +assertDataStructuresEmpty() --- End diff -- what does this test? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r184109204 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1266,6 +1266,9 @@ class DAGScheduler( } if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") +} else if (failedStages.contains(shuffleStage)) { --- End diff -- Added the UT for simulating this scenario happens to result task. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r183198368 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1266,6 +1266,9 @@ class DAGScheduler( } if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") +} else if (failedStages.contains(shuffleStage)) { --- End diff -- >Sorry I may nitpick here. No, that's necessary, I should have to make sure about this, thanks for your advice! :) > Can you simulate what happens to result task if FechFaileded comes before task success? Sure, but it maybe hardly to reproduce this in real env, I'll try to fake it on UT first ASAP. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r182340309 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1266,6 +1266,9 @@ class DAGScheduler( } if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") +} else if (failedStages.contains(shuffleStage)) { --- End diff -- seems we may mistakenly mark a job as finished? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r182338127 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1266,6 +1266,9 @@ class DAGScheduler( } if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") +} else if (failedStages.contains(shuffleStage)) { --- End diff -- Sorry I may nitpick kere. Can you simulate what happens to result task if FechFaileded comes before task success? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r182336811 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1266,6 +1266,9 @@ class DAGScheduler( } if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") +} else if (failedStages.contains(shuffleStage)) { --- End diff -- ResultStage usually has children. e.g. rdd1 depends on rdd2 with a shuffle dependency. Then the job has 2 stages, the result stage depends on the shuffle map stage. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r182309137 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1266,6 +1266,9 @@ class DAGScheduler( } if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") +} else if (failedStages.contains(shuffleStage)) { --- End diff -- This also confuse me before, as far as I'm concerned, the result task in such scenario(speculative task fail but original task success) is ok because it has no child stage, we can use the success task's result and `markStageAsFinished`. But for shuffle map task, it will cause inconformity between mapOutputTracker and stage's pendingPartitions, it must fix. I'm not sure of ResultTask's behavior, can you give some advice? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r182308871 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2399,50 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * This tests the case where origin task success after speculative task got FetchFailed + * before. + */ + test("[SPARK-23811] FetchFailed comes before Success of same task will cause child stage" + +" never succeed") { +// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) + +submit(rddC, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task success +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), --- End diff -- Maybe, you can `runEvent(SpeculativeTaskSubmitted)` first to simulate a speculative task submitted before you `runEvent(makeCompletetionEvent())`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r182307786 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2399,50 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * This tests the case where origin task success after speculative task got FetchFailed + * before. + */ + test("[SPARK-23811] FetchFailed comes before Success of same task will cause child stage" + +" never succeed") { --- End diff -- Thanks, I'll change it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r182307728 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2399,50 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * This tests the case where origin task success after speculative task got FetchFailed + * before. + */ + test("[SPARK-23811] FetchFailed comes before Success of same task will cause child stage" + +" never succeed") { +// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) + +submit(rddC, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task success +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), --- End diff -- Here we only need to mock the speculative task failed event came before success event, `makeCompletionEvent` with same taskSets's task can achieve such goal. This also use in `task events always posted in speculation / when stage is killed`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r182299915 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1266,6 +1266,9 @@ class DAGScheduler( } if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") +} else if (failedStages.contains(shuffleStage)) { --- End diff -- Why we only have a problem with shuffle map task not result task? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r182299803 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2399,50 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * This tests the case where origin task success after speculative task got FetchFailed + * before. + */ + test("[SPARK-23811] FetchFailed comes before Success of same task will cause child stage" + +" never succeed") { +// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) + +submit(rddC, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task success +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), --- End diff -- Sorry I'm not very familiar with this test suite, how can you tell it's a speculative task? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r182299503 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2399,50 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * This tests the case where origin task success after speculative task got FetchFailed + * before. + */ + test("[SPARK-23811] FetchFailed comes before Success of same task will cause child stage" + +" never succeed") { --- End diff -- nit: the test name should describe the expected behavior not the wrong one. `SPARK-23811: staged failed by FetchFailed should ignore following successful tasks` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r182296297 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -750,6 +752,10 @@ private[spark] class TaskSetManager( if (tasksSuccessful == numTasks) { isZombie = true } +} else if (fetchFailedTaskIndexSet.contains(index)) { + logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + +" because task " + index + " has already failed by FetchFailed") + return --- End diff -- Yep, as @cloud-fan 's suggestion, handle this in `DAGScheduler` is a better choice. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r182294068 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -750,6 +752,10 @@ private[spark] class TaskSetManager( if (tasksSuccessful == numTasks) { isZombie = true } +} else if (fetchFailedTaskIndexSet.contains(index)) { --- End diff -- Great thanks for you two's guidance guidance, that's more clear and the UT add for reproducing this problem can also used for checking it! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r181776729 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -750,6 +752,10 @@ private[spark] class TaskSetManager( if (tasksSuccessful == numTasks) { isZombie = true } +} else if (fetchFailedTaskIndexSet.contains(index)) { --- End diff -- We should handle this case in DAGScheduler, then we can look up the stage by task id, and see if the stage is failed or not. Then we don't need `fetchFailedTaskIndexSet`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r181729645 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -750,6 +752,10 @@ private[spark] class TaskSetManager( if (tasksSuccessful == numTasks) { isZombie = true } +} else if (fetchFailedTaskIndexSet.contains(index)) { + logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + +" because task " + index + " has already failed by FetchFailed") + return --- End diff -- We can not simply `return` here. And we should always send a task `CompletionEvent` to DAG, in case of there's any listeners are waiting for it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r181732788 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -750,6 +752,10 @@ private[spark] class TaskSetManager( if (tasksSuccessful == numTasks) { isZombie = true } +} else if (fetchFailedTaskIndexSet.contains(index)) { + logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + +" because task " + index + " has already failed by FetchFailed") + return --- End diff -- Maybe, we can mark task as`FAILED` with `UnknownReason` here. And then, DAG will treat this task as no-op, and `registerMapOutput` will not be triggered. Though, it is not a elegant way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r181674151 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -750,6 +752,10 @@ private[spark] class TaskSetManager( if (tasksSuccessful == numTasks) { isZombie = true } +} else if (fetchFailedTaskIndexSet.contains(index)) { --- End diff -- The change of `fetchFailedTaskIndexSet` is to ignore the task success event if the stage is marked as failed, as Wenchen's suggestion in before comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r181673508 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -794,6 +794,19 @@ private[spark] class TaskSetManager( fetchFailed.bmAddress.host, fetchFailed.bmAddress.executorId)) } +// Kill any other attempts for this FetchFailed task +for (attemptInfo <- taskAttempts(index) if attemptInfo.running) { + logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " + +s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " + +s"as the attempt ${info.attemptNumber} failed because FetchFailed") + killedByOtherAttempt(index) = true + sched.backend.killTask( --- End diff -- Got it, I remove the code and UT in next commit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r178537775 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -794,6 +794,19 @@ private[spark] class TaskSetManager( fetchFailed.bmAddress.host, fetchFailed.bmAddress.executorId)) } +// Kill any other attempts for this FetchFailed task +for (attemptInfo <- taskAttempts(index) if attemptInfo.running) { + logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " + +s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " + +s"as the attempt ${info.attemptNumber} failed because FetchFailed") + killedByOtherAttempt(index) = true + sched.backend.killTask( --- End diff -- I don't think so. Useless tasks should fail soon(FetchFailure usually means mapper is down). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r178528414 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -750,6 +752,10 @@ private[spark] class TaskSetManager( if (tasksSuccessful == numTasks) { isZombie = true } +} else if (fetchFailedTaskIndexSet.contains(index)) { --- End diff -- Why are you making this change? I don't quite get it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r178500184 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -794,6 +794,19 @@ private[spark] class TaskSetManager( fetchFailed.bmAddress.host, fetchFailed.bmAddress.executorId)) } +// Kill any other attempts for this FetchFailed task +for (attemptInfo <- taskAttempts(index) if attemptInfo.running) { + logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " + +s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " + +s"as the attempt ${info.attemptNumber} failed because FetchFailed") + killedByOtherAttempt(index) = true + sched.backend.killTask( --- End diff -- @jiangxb1987 Yes, ignore the finished event is necessary, maybe it's also needed to kill useless task? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r178499952 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -794,6 +794,19 @@ private[spark] class TaskSetManager( fetchFailed.bmAddress.host, fetchFailed.bmAddress.executorId)) } +// Kill any other attempts for this FetchFailed task +for (attemptInfo <- taskAttempts(index) if attemptInfo.running) { + logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " + +s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " + +s"as the attempt ${info.attemptNumber} failed because FetchFailed") + killedByOtherAttempt(index) = true + sched.backend.killTask( --- End diff -- @cloud-fan Yes you're right, I should guarantee this in TaskSetManager. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r178494006 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -794,6 +794,19 @@ private[spark] class TaskSetManager( fetchFailed.bmAddress.host, fetchFailed.bmAddress.executorId)) } +// Kill any other attempts for this FetchFailed task +for (attemptInfo <- taskAttempts(index) if attemptInfo.running) { + logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " + +s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " + +s"as the attempt ${info.attemptNumber} failed because FetchFailed") + killedByOtherAttempt(index) = true + sched.backend.killTask( --- End diff -- This should not work. Maybe we shall just ignore the finished tasks submitted to a failed stage? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r178470893 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -794,6 +794,19 @@ private[spark] class TaskSetManager( fetchFailed.bmAddress.host, fetchFailed.bmAddress.executorId)) } +// Kill any other attempts for this FetchFailed task +for (attemptInfo <- taskAttempts(index) if attemptInfo.running) { + logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " + +s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " + +s"as the attempt ${info.attemptNumber} failed because FetchFailed") + killedByOtherAttempt(index) = true + sched.backend.killTask( --- End diff -- if this is async, we can't guarantee to not have task success events after marking staging as failed, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org