Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8180#discussion_r37677383
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -990,6 +999,110 @@ class DAGSchedulerSuite
         assert(stackTraceString.contains("org.scalatest.FunSuite"))
       }
     
    +  test("simple map stage submission") {
    +    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
    +    val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
    +    val shuffleId = shuffleDep.shuffleId
    +    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
    +
    +    // Submit a map stage by itself
    +    submitMapStage(shuffleDep)
    +    complete(taskSets(0), Seq(
    +      (Success, makeMapStatus("hostA", 1)),
    +      (Success, makeMapStatus("hostB", 1))))
    +    assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 
0).map(_._1).toSet ===
    +      HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
    +    assert(results === Map(0 -> null, 1 -> null))
    +    results.clear()
    +    assertDataStructuresEmpty()
    +
    +    // Submit a reduce job that depends on this map stage; it should 
directly do the reduce
    +    submit(reduceRdd, Array(0))
    +    complete(taskSets(1), Seq((Success, 42)))
    +    assert(results === Map(0 -> 42))
    +    results.clear()
    +    assertDataStructuresEmpty()
    +
    +    // Check that if we submit the map stage again, no tasks run
    +    submitMapStage(shuffleDep)
    +    assert(results === Map(0 -> null, 1 -> null))
    +    assertDataStructuresEmpty()
    +  }
    +
    +  test("map stage submission with reduce stage also depending on the 
data") {
    +    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
    +    val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
    +    val shuffleId = shuffleDep.shuffleId
    +    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
    +
    +    // Submit the map stage by itself
    +    submitMapStage(shuffleDep)
    +
    +    // Submit a reduce job that depends on this map stage
    +    submit(reduceRdd, Array(0))
    +
    +    // Complete tasks for the map stage
    +    complete(taskSets(0), Seq(
    +      (Success, makeMapStatus("hostA", 1)),
    +      (Success, makeMapStatus("hostB", 1))))
    +    assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 
0).map(_._1).toSet ===
    +      HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
    +    assert(results === Map(0 -> null, 1 -> null))
    +    results.clear()
    +
    +    // Complete tasks for the reduce stage
    +    complete(taskSets(1), Seq((Success, 42)))
    +    assert(results === Map(0 -> 42))
    +    results.clear()
    +    assertDataStructuresEmpty()
    +
    +    // Check that if we submit the map stage again, no tasks run
    +    submitMapStage(shuffleDep)
    +    assert(results === Map(0 -> null, 1 -> null))
    +    assertDataStructuresEmpty()
    +  }
    +
    +  test("map stage submission with fetch failure") {
    +    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
    +    val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
    +    val shuffleId = shuffleDep.shuffleId
    +    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
    +
    +    // Submit a map stage by itself
    +    submitMapStage(shuffleDep)
    +    complete(taskSets(0), Seq(
    +      (Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
    +      (Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
    +    assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 
0).map(_._1).toSet ===
    +      HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
    +    assert(results === Map(0 -> null, 1 -> null))
    +    results.clear()
    +    assertDataStructuresEmpty()
    +
    +    // Submit a reduce job that depends on this map stage, but where one 
reduce will fail a fetch
    +    submit(reduceRdd, Array(0, 1))
    +    complete(taskSets(1), Seq(
    +      (Success, 42),
    +      (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, 
"ignored"), null)))
    +    // Ask the scheduler to try it again; TaskSet 2 will rerun the map 
task that we couldn't fetch
    +    // from, then TaskSet 3 will run the reduce stage
    +    scheduler.resubmitFailedStages()
    +    complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 
reduceRdd.partitions.size))))
    +    complete(taskSets(3), Seq((Success, 43)))
    +    assert(results === Map(0 -> 42, 1 -> 43))
    +    results.clear()
    +    assertDataStructuresEmpty()
    +
    +    // Run another reduce job without a failure; this should just work
    +    submit(reduceRdd, Array(0, 1))
    +    complete(taskSets(4), Seq(
    +      (Success, 44),
    +      (Success, 45)))
    +    assert(results === Map(0 -> 44, 1 -> 45))
    +    results.clear()
    +    assertDataStructuresEmpty()
    +  }
    --- End diff --
    
    I think we'll need a bunch more cases around stage failure, as this has 
been one of the most error prone parts of the scheduler.  Things like:
    * direct execution of map stage, after the stage has already failed from a 
result job that depends on it (and vice versa)
    * map stage retry with zombie task completion, so we get double completion 
of shuffle map tasks
    * map state retry with zombie task completion, so map output is made 
available before any single task set has completed.  (This doesn't currently 
work for normal jobs either, see SPARK-5259 -- but after that fix is in, would 
be good to make sure it works with this case as well.)
    * map stage submission, which in turn depends on a lineage of more than one 
shuffle dependency.  failure in one of the fetches should result in the proper 
set of retries
    * map stage submission with a fetch failure, which results in a retry of a 
"skipped" stage.  (this is overdue a test case for regular job submission in 
any case -- I have been meaning to do this.  I can look at this and try to give 
you a test case for map stage submission as well.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to