Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8180#discussion_r37789233
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -990,6 +999,110 @@ class DAGSchedulerSuite
         assert(stackTraceString.contains("org.scalatest.FunSuite"))
       }
     
    +  test("simple map stage submission") {
    +    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
    +    val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
    +    val shuffleId = shuffleDep.shuffleId
    +    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
    +
    +    // Submit a map stage by itself
    +    submitMapStage(shuffleDep)
    +    complete(taskSets(0), Seq(
    +      (Success, makeMapStatus("hostA", 1)),
    +      (Success, makeMapStatus("hostB", 1))))
    +    assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 
0).map(_._1).toSet ===
    +      HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
    +    assert(results === Map(0 -> null, 1 -> null))
    +    results.clear()
    +    assertDataStructuresEmpty()
    +
    +    // Submit a reduce job that depends on this map stage; it should 
directly do the reduce
    +    submit(reduceRdd, Array(0))
    +    complete(taskSets(1), Seq((Success, 42)))
    +    assert(results === Map(0 -> 42))
    +    results.clear()
    +    assertDataStructuresEmpty()
    +
    +    // Check that if we submit the map stage again, no tasks run
    +    submitMapStage(shuffleDep)
    +    assert(results === Map(0 -> null, 1 -> null))
    +    assertDataStructuresEmpty()
    +  }
    +
    +  test("map stage submission with reduce stage also depending on the 
data") {
    +    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
    +    val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
    +    val shuffleId = shuffleDep.shuffleId
    +    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
    +
    +    // Submit the map stage by itself
    +    submitMapStage(shuffleDep)
    +
    +    // Submit a reduce job that depends on this map stage
    +    submit(reduceRdd, Array(0))
    +
    +    // Complete tasks for the map stage
    +    complete(taskSets(0), Seq(
    +      (Success, makeMapStatus("hostA", 1)),
    +      (Success, makeMapStatus("hostB", 1))))
    +    assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 
0).map(_._1).toSet ===
    +      HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
    +    assert(results === Map(0 -> null, 1 -> null))
    +    results.clear()
    +
    +    // Complete tasks for the reduce stage
    +    complete(taskSets(1), Seq((Success, 42)))
    +    assert(results === Map(0 -> 42))
    +    results.clear()
    +    assertDataStructuresEmpty()
    +
    +    // Check that if we submit the map stage again, no tasks run
    +    submitMapStage(shuffleDep)
    +    assert(results === Map(0 -> null, 1 -> null))
    +    assertDataStructuresEmpty()
    +  }
    +
    +  test("map stage submission with fetch failure") {
    +    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
    +    val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
    +    val shuffleId = shuffleDep.shuffleId
    +    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
    +
    +    // Submit a map stage by itself
    +    submitMapStage(shuffleDep)
    +    complete(taskSets(0), Seq(
    +      (Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
    +      (Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
    +    assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 
0).map(_._1).toSet ===
    +      HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
    +    assert(results === Map(0 -> null, 1 -> null))
    +    results.clear()
    +    assertDataStructuresEmpty()
    +
    +    // Submit a reduce job that depends on this map stage, but where one 
reduce will fail a fetch
    +    submit(reduceRdd, Array(0, 1))
    +    complete(taskSets(1), Seq(
    +      (Success, 42),
    +      (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, 
"ignored"), null)))
    +    // Ask the scheduler to try it again; TaskSet 2 will rerun the map 
task that we couldn't fetch
    +    // from, then TaskSet 3 will run the reduce stage
    +    scheduler.resubmitFailedStages()
    +    complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 
reduceRdd.partitions.size))))
    --- End diff --
    
    as part of the SPARK-5945, @ilganeli is introducing a few util methods that 
make this a little more clear, eg. 
[`completeNextShuffleMapSuccessfully`](https://github.com/apache/spark/pull/5636/files#diff-f3b410b16818d8f34bb1eb4120a60d51R516).
  IMO that helps make these test cases a bit more readable -- right now its a 
bit hard for the reader to know what `taskSets(2)` is.  Also if someone makes a 
change that breaks things, its much clearer if they can see eg. that they never 
launched attempt 1 for stage 0, rather than just getting some strange behavior 
later on.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to