[ https://issues.apache.org/jira/browse/SPARK-29551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
weixiuli updated SPARK-29551: ----------------------------- Description: There will be a regression when the executor lost and then causes 'fetch failed'. When an executor lost with some reason (eg:. the external shuffle service or host lost on the executor's host ) and the executor loses time happens to be reduce stage fetch failed from it which is really bad, the previous only call mapOutputTracker.unregisterMapOutput(shuffleId, mapIndex, bmAddress) to mark one map as broken in the map stage at this time , but other maps on the executor are also not available which can only be resubmitted by a nest retry stage which is the regression. As we all know that the previous will call mapOutputTracker.removeOutputsOnHost(host) or mapOutputTracker.removeOutputsOnExecutor(execId) when reduce stage fetches failed and the executor is active, while it does NOT for the above problems. So we should distinguish the failedEpoch of 'executor lost' from the fetchFailedEpoch of 'fetch failed' to solve the above problem. We can add an unittest in 'DAGSchedulerSuite.scala' to catch the above problem. {code} test("All shuffle files on the slave should be cleaned up when slave lost test") { // reset the test context with the right shuffle service config afterEach() val conf = new SparkConf() conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true") conf.set("spark.files.fetchFailure.unRegisterOutputOnHost", "true") init(conf) runEvent(ExecutorAdded("exec-hostA1", "hostA")) runEvent(ExecutorAdded("exec-hostA2", "hostA")) runEvent(ExecutorAdded("exec-hostB", "hostB")) val firstRDD = new MyRDD(sc, 3, Nil) val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(3)) val firstShuffleId = firstShuffleDep.shuffleId val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep)) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3)) val secondShuffleId = shuffleDep.shuffleId val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) submit(reduceRdd, Array(0)) // map stage1 completes successfully, with one task on each executor complete(taskSets(0), Seq( (Success, MapStatus( BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 5)), (Success, MapStatus( BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 6)), (Success, makeMapStatus("hostB", 1, mapTaskId = 7)) )) // map stage2 completes successfully, with one task on each executor complete(taskSets(1), Seq( (Success, MapStatus( BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 8)), (Success, MapStatus( BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 9)), (Success, makeMapStatus("hostB", 1, mapTaskId = 10)) )) // make sure our test setup is correct val initialMapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses // val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get assert(initialMapStatus1.count(_ != null) === 3) assert(initialMapStatus1.map{_.location.executorId}.toSet === Set("exec-hostA1", "exec-hostA2", "exec-hostB")) assert(initialMapStatus1.map{_.mapId}.toSet === Set(5, 6, 7)) val initialMapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses // val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get assert(initialMapStatus2.count(_ != null) === 3) assert(initialMapStatus2.map{_.location.executorId}.toSet === Set("exec-hostA1", "exec-hostA2", "exec-hostB")) assert(initialMapStatus2.map{_.mapId}.toSet === Set(8, 9, 10)) // kill exec-hostA2 runEvent(ExecutorLost("exec-hostA2", ExecutorKilled)) // reduce stage fails with a fetch failure from map stage from exec-hostA2 complete(taskSets(2), Seq( (FetchFailed(BlockManagerId("exec-hostA2", "hostA", 12345), secondShuffleId, 0L, 0, 0, "ignored"), null) )) // Here is the main assertion -- make sure that we de-register // the map outputs for both map stage from both executors on hostA val mapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses assert(mapStatus1.count(_ != null) === 1) assert(mapStatus1(2).location.executorId === "exec-hostB") assert(mapStatus1(2).location.host === "hostB") val mapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses assert(mapStatus2.count(_ != null) === 1) assert(mapStatus2(2).location.executorId === "exec-hostB") assert(mapStatus2(2).location.host === "hostB") } {code} The error output is: {code} 3 did not equal 1 ScalaTestFailureLocation: org.apache.spark.scheduler.DAGSchedulerSuite at (DAGSchedulerSuite.scala:609) Expected :1 Actual :3 <Click to see difference> org.scalatest.exceptions.TestFailedException: 3 did not equal 1 {code} was: There will be a regression when the executor lost and then causes 'fetch failed'. We can add an unittest in 'DAGSchedulerSuite.scala' to catch the above problem. {code} test("All shuffle files on the slave should be cleaned up when slave lost test") { // reset the test context with the right shuffle service config afterEach() val conf = new SparkConf() conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true") conf.set("spark.files.fetchFailure.unRegisterOutputOnHost", "true") init(conf) runEvent(ExecutorAdded("exec-hostA1", "hostA")) runEvent(ExecutorAdded("exec-hostA2", "hostA")) runEvent(ExecutorAdded("exec-hostB", "hostB")) val firstRDD = new MyRDD(sc, 3, Nil) val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(3)) val firstShuffleId = firstShuffleDep.shuffleId val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep)) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3)) val secondShuffleId = shuffleDep.shuffleId val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) submit(reduceRdd, Array(0)) // map stage1 completes successfully, with one task on each executor complete(taskSets(0), Seq( (Success, MapStatus( BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 5)), (Success, MapStatus( BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 6)), (Success, makeMapStatus("hostB", 1, mapTaskId = 7)) )) // map stage2 completes successfully, with one task on each executor complete(taskSets(1), Seq( (Success, MapStatus( BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 8)), (Success, MapStatus( BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 9)), (Success, makeMapStatus("hostB", 1, mapTaskId = 10)) )) // make sure our test setup is correct val initialMapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses // val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get assert(initialMapStatus1.count(_ != null) === 3) assert(initialMapStatus1.map{_.location.executorId}.toSet === Set("exec-hostA1", "exec-hostA2", "exec-hostB")) assert(initialMapStatus1.map{_.mapId}.toSet === Set(5, 6, 7)) val initialMapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses // val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get assert(initialMapStatus2.count(_ != null) === 3) assert(initialMapStatus2.map{_.location.executorId}.toSet === Set("exec-hostA1", "exec-hostA2", "exec-hostB")) assert(initialMapStatus2.map{_.mapId}.toSet === Set(8, 9, 10)) // kill exec-hostA2 runEvent(ExecutorLost("exec-hostA2", ExecutorKilled)) // reduce stage fails with a fetch failure from map stage from exec-hostA2 complete(taskSets(2), Seq( (FetchFailed(BlockManagerId("exec-hostA2", "hostA", 12345), secondShuffleId, 0L, 0, 0, "ignored"), null) )) // Here is the main assertion -- make sure that we de-register // the map outputs for both map stage from both executors on hostA val mapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses assert(mapStatus1.count(_ != null) === 1) assert(mapStatus1(2).location.executorId === "exec-hostB") assert(mapStatus1(2).location.host === "hostB") val mapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses assert(mapStatus2.count(_ != null) === 1) assert(mapStatus2(2).location.executorId === "exec-hostB") assert(mapStatus2(2).location.host === "hostB") } {code} The error output is: {code} 3 did not equal 1 ScalaTestFailureLocation: org.apache.spark.scheduler.DAGSchedulerSuite at (DAGSchedulerSuite.scala:609) Expected :1 Actual :3 <Click to see difference> org.scalatest.exceptions.TestFailedException: 3 did not equal 1 {code} > There is a bug about fetch failed when an executor lost > -------------------------------------------------------- > > Key: SPARK-29551 > URL: https://issues.apache.org/jira/browse/SPARK-29551 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.4.3 > Reporter: weixiuli > Priority: Major > > There will be a regression when the executor lost and then causes 'fetch > failed'. > When an executor lost with some reason (eg:. the external shuffle service or > host lost on the executor's host ) and the executor loses time happens to be > reduce stage fetch failed from it which is really bad, the previous only call > mapOutputTracker.unregisterMapOutput(shuffleId, mapIndex, bmAddress) to mark > one map as broken in the map stage at this time , but other maps on the > executor are also not available which can only be resubmitted by a nest retry > stage which is the regression. > As we all know that the previous will call > mapOutputTracker.removeOutputsOnHost(host) or > mapOutputTracker.removeOutputsOnExecutor(execId) when reduce stage fetches > failed and the executor is active, while it does NOT for the above problems. > So we should distinguish the failedEpoch of 'executor lost' from the > fetchFailedEpoch of 'fetch failed' to solve the above problem. > We can add an unittest in 'DAGSchedulerSuite.scala' to catch the above > problem. > {code} > test("All shuffle files on the slave should be cleaned up when slave lost > test") { > // reset the test context with the right shuffle service config > afterEach() > val conf = new SparkConf() > conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true") > conf.set("spark.files.fetchFailure.unRegisterOutputOnHost", "true") > init(conf) > runEvent(ExecutorAdded("exec-hostA1", "hostA")) > runEvent(ExecutorAdded("exec-hostA2", "hostA")) > runEvent(ExecutorAdded("exec-hostB", "hostB")) > val firstRDD = new MyRDD(sc, 3, Nil) > val firstShuffleDep = new ShuffleDependency(firstRDD, new > HashPartitioner(3)) > val firstShuffleId = firstShuffleDep.shuffleId > val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep)) > val shuffleDep = new ShuffleDependency(shuffleMapRdd, new > HashPartitioner(3)) > val secondShuffleId = shuffleDep.shuffleId > val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) > submit(reduceRdd, Array(0)) > // map stage1 completes successfully, with one task on each executor > complete(taskSets(0), Seq( > (Success, > MapStatus( > BlockManagerId("exec-hostA1", "hostA", 12345), > Array.fill[Long](1)(2), mapTaskId = 5)), > (Success, > MapStatus( > BlockManagerId("exec-hostA2", "hostA", 12345), > Array.fill[Long](1)(2), mapTaskId = 6)), > (Success, makeMapStatus("hostB", 1, mapTaskId = 7)) > )) > // map stage2 completes successfully, with one task on each executor > complete(taskSets(1), Seq( > (Success, > MapStatus( > BlockManagerId("exec-hostA1", "hostA", 12345), > Array.fill[Long](1)(2), mapTaskId = 8)), > (Success, > MapStatus( > BlockManagerId("exec-hostA2", "hostA", 12345), > Array.fill[Long](1)(2), mapTaskId = 9)), > (Success, makeMapStatus("hostB", 1, mapTaskId = 10)) > )) > // make sure our test setup is correct > val initialMapStatus1 = > mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses > // val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get > assert(initialMapStatus1.count(_ != null) === 3) > assert(initialMapStatus1.map{_.location.executorId}.toSet === > Set("exec-hostA1", "exec-hostA2", "exec-hostB")) > assert(initialMapStatus1.map{_.mapId}.toSet === Set(5, 6, 7)) > val initialMapStatus2 = > mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses > // val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get > assert(initialMapStatus2.count(_ != null) === 3) > assert(initialMapStatus2.map{_.location.executorId}.toSet === > Set("exec-hostA1", "exec-hostA2", "exec-hostB")) > assert(initialMapStatus2.map{_.mapId}.toSet === Set(8, 9, 10)) > // kill exec-hostA2 > runEvent(ExecutorLost("exec-hostA2", ExecutorKilled)) > // reduce stage fails with a fetch failure from map stage from exec-hostA2 > complete(taskSets(2), Seq( > (FetchFailed(BlockManagerId("exec-hostA2", "hostA", 12345), > secondShuffleId, 0L, 0, 0, "ignored"), > null) > )) > // Here is the main assertion -- make sure that we de-register > // the map outputs for both map stage from both executors on hostA > val mapStatus1 = > mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses > assert(mapStatus1.count(_ != null) === 1) > assert(mapStatus1(2).location.executorId === "exec-hostB") > assert(mapStatus1(2).location.host === "hostB") > val mapStatus2 = > mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses > assert(mapStatus2.count(_ != null) === 1) > assert(mapStatus2(2).location.executorId === "exec-hostB") > assert(mapStatus2(2).location.host === "hostB") > } > {code} > The error output is: > {code} > 3 did not equal 1 > ScalaTestFailureLocation: org.apache.spark.scheduler.DAGSchedulerSuite at > (DAGSchedulerSuite.scala:609) > Expected :1 > Actual :3 > <Click to see difference> > org.scalatest.exceptions.TestFailedException: 3 did not equal 1 > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org