[ 
https://issues.apache.org/jira/browse/SPARK-29551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

weixiuli updated SPARK-29551:
-----------------------------
    Description: 
There will be a regression when the executor lost and then causes 'fetch 
failed'.

When an executor lost with some reason (eg:. the external shuffle service or 
host lost on the executor's host ) and the executor loses time happens to be 
reduce stage fetch failed from it which is really bad, the previous only call 
mapOutputTracker.unregisterMapOutput(shuffleId, mapIndex, bmAddress) to mark 
one map as broken in the map stage at this time , but other maps on the 
executor are also not available which can only be resubmitted by a nest retry 
stage which is the regression.

As we all know that the previous will call 
mapOutputTracker.removeOutputsOnHost(host) or
mapOutputTracker.removeOutputsOnExecutor(execId) when reduce stage fetches 
failed and the executor is active, while it does NOT for the above problems.

So we should distinguish the failedEpoch of 'executor lost' from the 
fetchFailedEpoch of 'fetch failed' to solve the above problem.

We can add  an unittest in 'DAGSchedulerSuite.scala'  to catch the above 
problem.

{code}
test("All shuffle files on the slave should be cleaned up when slave lost 
test") {
    // reset the test context with the right shuffle service config
    afterEach()
    val conf = new SparkConf()
    conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
    conf.set("spark.files.fetchFailure.unRegisterOutputOnHost", "true")
    init(conf)
    runEvent(ExecutorAdded("exec-hostA1", "hostA"))
    runEvent(ExecutorAdded("exec-hostA2", "hostA"))
    runEvent(ExecutorAdded("exec-hostB", "hostB"))
    val firstRDD = new MyRDD(sc, 3, Nil)
    val firstShuffleDep = new ShuffleDependency(firstRDD, new 
HashPartitioner(3))
    val firstShuffleId = firstShuffleDep.shuffleId
    val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(3))
    val secondShuffleId = shuffleDep.shuffleId
    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
    submit(reduceRdd, Array(0))
    // map stage1 completes successfully, with one task on each executor
    complete(taskSets(0), Seq(
      (Success,
        MapStatus(
          BlockManagerId("exec-hostA1", "hostA", 12345), 
Array.fill[Long](1)(2), mapTaskId = 5)),
      (Success,
        MapStatus(
          BlockManagerId("exec-hostA2", "hostA", 12345), 
Array.fill[Long](1)(2), mapTaskId = 6)),
      (Success, makeMapStatus("hostB", 1, mapTaskId = 7))
    ))
    // map stage2 completes successfully, with one task on each executor
    complete(taskSets(1), Seq(
      (Success,
        MapStatus(
          BlockManagerId("exec-hostA1", "hostA", 12345), 
Array.fill[Long](1)(2), mapTaskId = 8)),
      (Success,
        MapStatus(
          BlockManagerId("exec-hostA2", "hostA", 12345), 
Array.fill[Long](1)(2), mapTaskId = 9)),
      (Success, makeMapStatus("hostB", 1, mapTaskId = 10))
    ))
    // make sure our test setup is correct
    val initialMapStatus1 = 
mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses
    //  val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get
    assert(initialMapStatus1.count(_ != null) === 3)
    assert(initialMapStatus1.map{_.location.executorId}.toSet ===
      Set("exec-hostA1", "exec-hostA2", "exec-hostB"))
    assert(initialMapStatus1.map{_.mapId}.toSet === Set(5, 6, 7))

    val initialMapStatus2 = 
mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses
    //  val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get
    assert(initialMapStatus2.count(_ != null) === 3)
    assert(initialMapStatus2.map{_.location.executorId}.toSet ===
      Set("exec-hostA1", "exec-hostA2", "exec-hostB"))
    assert(initialMapStatus2.map{_.mapId}.toSet === Set(8, 9, 10))

    // kill exec-hostA2
    runEvent(ExecutorLost("exec-hostA2", ExecutorKilled))
    // reduce stage fails with a fetch failure from map stage from exec-hostA2
    complete(taskSets(2), Seq(
      (FetchFailed(BlockManagerId("exec-hostA2", "hostA", 12345),
        secondShuffleId, 0L, 0, 0, "ignored"),
        null)
    ))
    // Here is the main assertion -- make sure that we de-register
    // the map outputs for both map stage from both executors on hostA
    val mapStatus1 = 
mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses
    assert(mapStatus1.count(_ != null) === 1)
    assert(mapStatus1(2).location.executorId === "exec-hostB")
    assert(mapStatus1(2).location.host === "hostB")

    val mapStatus2 = 
mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses
    assert(mapStatus2.count(_ != null) === 1)
    assert(mapStatus2(2).location.executorId === "exec-hostB")
    assert(mapStatus2(2).location.host === "hostB")
  }
{code}

The error output is:
{code}

3 did not equal 1
ScalaTestFailureLocation: org.apache.spark.scheduler.DAGSchedulerSuite at 
(DAGSchedulerSuite.scala:609)
Expected :1
Actual   :3
 <Click to see difference>

org.scalatest.exceptions.TestFailedException: 3 did not equal 1

{code}




  was:
There will be a regression when the executor lost and then causes 'fetch 
failed'.

We can add  an unittest in 'DAGSchedulerSuite.scala'  to catch the above 
problem.

{code}
test("All shuffle files on the slave should be cleaned up when slave lost 
test") {
    // reset the test context with the right shuffle service config
    afterEach()
    val conf = new SparkConf()
    conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
    conf.set("spark.files.fetchFailure.unRegisterOutputOnHost", "true")
    init(conf)
    runEvent(ExecutorAdded("exec-hostA1", "hostA"))
    runEvent(ExecutorAdded("exec-hostA2", "hostA"))
    runEvent(ExecutorAdded("exec-hostB", "hostB"))
    val firstRDD = new MyRDD(sc, 3, Nil)
    val firstShuffleDep = new ShuffleDependency(firstRDD, new 
HashPartitioner(3))
    val firstShuffleId = firstShuffleDep.shuffleId
    val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(3))
    val secondShuffleId = shuffleDep.shuffleId
    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
    submit(reduceRdd, Array(0))
    // map stage1 completes successfully, with one task on each executor
    complete(taskSets(0), Seq(
      (Success,
        MapStatus(
          BlockManagerId("exec-hostA1", "hostA", 12345), 
Array.fill[Long](1)(2), mapTaskId = 5)),
      (Success,
        MapStatus(
          BlockManagerId("exec-hostA2", "hostA", 12345), 
Array.fill[Long](1)(2), mapTaskId = 6)),
      (Success, makeMapStatus("hostB", 1, mapTaskId = 7))
    ))
    // map stage2 completes successfully, with one task on each executor
    complete(taskSets(1), Seq(
      (Success,
        MapStatus(
          BlockManagerId("exec-hostA1", "hostA", 12345), 
Array.fill[Long](1)(2), mapTaskId = 8)),
      (Success,
        MapStatus(
          BlockManagerId("exec-hostA2", "hostA", 12345), 
Array.fill[Long](1)(2), mapTaskId = 9)),
      (Success, makeMapStatus("hostB", 1, mapTaskId = 10))
    ))
    // make sure our test setup is correct
    val initialMapStatus1 = 
mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses
    //  val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get
    assert(initialMapStatus1.count(_ != null) === 3)
    assert(initialMapStatus1.map{_.location.executorId}.toSet ===
      Set("exec-hostA1", "exec-hostA2", "exec-hostB"))
    assert(initialMapStatus1.map{_.mapId}.toSet === Set(5, 6, 7))

    val initialMapStatus2 = 
mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses
    //  val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get
    assert(initialMapStatus2.count(_ != null) === 3)
    assert(initialMapStatus2.map{_.location.executorId}.toSet ===
      Set("exec-hostA1", "exec-hostA2", "exec-hostB"))
    assert(initialMapStatus2.map{_.mapId}.toSet === Set(8, 9, 10))

    // kill exec-hostA2
    runEvent(ExecutorLost("exec-hostA2", ExecutorKilled))
    // reduce stage fails with a fetch failure from map stage from exec-hostA2
    complete(taskSets(2), Seq(
      (FetchFailed(BlockManagerId("exec-hostA2", "hostA", 12345),
        secondShuffleId, 0L, 0, 0, "ignored"),
        null)
    ))
    // Here is the main assertion -- make sure that we de-register
    // the map outputs for both map stage from both executors on hostA
    val mapStatus1 = 
mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses
    assert(mapStatus1.count(_ != null) === 1)
    assert(mapStatus1(2).location.executorId === "exec-hostB")
    assert(mapStatus1(2).location.host === "hostB")

    val mapStatus2 = 
mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses
    assert(mapStatus2.count(_ != null) === 1)
    assert(mapStatus2(2).location.executorId === "exec-hostB")
    assert(mapStatus2(2).location.host === "hostB")
  }
{code}

The error output is:
{code}

3 did not equal 1
ScalaTestFailureLocation: org.apache.spark.scheduler.DAGSchedulerSuite at 
(DAGSchedulerSuite.scala:609)
Expected :1
Actual   :3
 <Click to see difference>

org.scalatest.exceptions.TestFailedException: 3 did not equal 1

{code}





> There is a bug about fetch failed when an executor lost 
> --------------------------------------------------------
>
>                 Key: SPARK-29551
>                 URL: https://issues.apache.org/jira/browse/SPARK-29551
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.3
>            Reporter: weixiuli
>            Priority: Major
>
> There will be a regression when the executor lost and then causes 'fetch 
> failed'.
> When an executor lost with some reason (eg:. the external shuffle service or 
> host lost on the executor's host ) and the executor loses time happens to be 
> reduce stage fetch failed from it which is really bad, the previous only call 
> mapOutputTracker.unregisterMapOutput(shuffleId, mapIndex, bmAddress) to mark 
> one map as broken in the map stage at this time , but other maps on the 
> executor are also not available which can only be resubmitted by a nest retry 
> stage which is the regression.
> As we all know that the previous will call 
> mapOutputTracker.removeOutputsOnHost(host) or
> mapOutputTracker.removeOutputsOnExecutor(execId) when reduce stage fetches 
> failed and the executor is active, while it does NOT for the above problems.
> So we should distinguish the failedEpoch of 'executor lost' from the 
> fetchFailedEpoch of 'fetch failed' to solve the above problem.
> We can add  an unittest in 'DAGSchedulerSuite.scala'  to catch the above 
> problem.
> {code}
> test("All shuffle files on the slave should be cleaned up when slave lost 
> test") {
>     // reset the test context with the right shuffle service config
>     afterEach()
>     val conf = new SparkConf()
>     conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
>     conf.set("spark.files.fetchFailure.unRegisterOutputOnHost", "true")
>     init(conf)
>     runEvent(ExecutorAdded("exec-hostA1", "hostA"))
>     runEvent(ExecutorAdded("exec-hostA2", "hostA"))
>     runEvent(ExecutorAdded("exec-hostB", "hostB"))
>     val firstRDD = new MyRDD(sc, 3, Nil)
>     val firstShuffleDep = new ShuffleDependency(firstRDD, new 
> HashPartitioner(3))
>     val firstShuffleId = firstShuffleDep.shuffleId
>     val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
>     val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
> HashPartitioner(3))
>     val secondShuffleId = shuffleDep.shuffleId
>     val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
>     submit(reduceRdd, Array(0))
>     // map stage1 completes successfully, with one task on each executor
>     complete(taskSets(0), Seq(
>       (Success,
>         MapStatus(
>           BlockManagerId("exec-hostA1", "hostA", 12345), 
> Array.fill[Long](1)(2), mapTaskId = 5)),
>       (Success,
>         MapStatus(
>           BlockManagerId("exec-hostA2", "hostA", 12345), 
> Array.fill[Long](1)(2), mapTaskId = 6)),
>       (Success, makeMapStatus("hostB", 1, mapTaskId = 7))
>     ))
>     // map stage2 completes successfully, with one task on each executor
>     complete(taskSets(1), Seq(
>       (Success,
>         MapStatus(
>           BlockManagerId("exec-hostA1", "hostA", 12345), 
> Array.fill[Long](1)(2), mapTaskId = 8)),
>       (Success,
>         MapStatus(
>           BlockManagerId("exec-hostA2", "hostA", 12345), 
> Array.fill[Long](1)(2), mapTaskId = 9)),
>       (Success, makeMapStatus("hostB", 1, mapTaskId = 10))
>     ))
>     // make sure our test setup is correct
>     val initialMapStatus1 = 
> mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses
>     //  val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get
>     assert(initialMapStatus1.count(_ != null) === 3)
>     assert(initialMapStatus1.map{_.location.executorId}.toSet ===
>       Set("exec-hostA1", "exec-hostA2", "exec-hostB"))
>     assert(initialMapStatus1.map{_.mapId}.toSet === Set(5, 6, 7))
>     val initialMapStatus2 = 
> mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses
>     //  val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get
>     assert(initialMapStatus2.count(_ != null) === 3)
>     assert(initialMapStatus2.map{_.location.executorId}.toSet ===
>       Set("exec-hostA1", "exec-hostA2", "exec-hostB"))
>     assert(initialMapStatus2.map{_.mapId}.toSet === Set(8, 9, 10))
>     // kill exec-hostA2
>     runEvent(ExecutorLost("exec-hostA2", ExecutorKilled))
>     // reduce stage fails with a fetch failure from map stage from exec-hostA2
>     complete(taskSets(2), Seq(
>       (FetchFailed(BlockManagerId("exec-hostA2", "hostA", 12345),
>         secondShuffleId, 0L, 0, 0, "ignored"),
>         null)
>     ))
>     // Here is the main assertion -- make sure that we de-register
>     // the map outputs for both map stage from both executors on hostA
>     val mapStatus1 = 
> mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses
>     assert(mapStatus1.count(_ != null) === 1)
>     assert(mapStatus1(2).location.executorId === "exec-hostB")
>     assert(mapStatus1(2).location.host === "hostB")
>     val mapStatus2 = 
> mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses
>     assert(mapStatus2.count(_ != null) === 1)
>     assert(mapStatus2(2).location.executorId === "exec-hostB")
>     assert(mapStatus2(2).location.host === "hostB")
>   }
> {code}
> The error output is:
> {code}
> 3 did not equal 1
> ScalaTestFailureLocation: org.apache.spark.scheduler.DAGSchedulerSuite at 
> (DAGSchedulerSuite.scala:609)
> Expected :1
> Actual   :3
>  <Click to see difference>
> org.scalatest.exceptions.TestFailedException: 3 did not equal 1
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to