Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15213#discussion_r80993655
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2105,6 +2106,61 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
         assert(scheduler.getShuffleDependencies(rddE) === Set(shuffleDepA, 
shuffleDepC))
       }
     
    +  test("SPARK-17644: After one stage is aborted for too many failed 
attempts, subsequent stages" +
    +    "still behave correctly on fetch failures") {
    +    // Runs a job that always encounters a fetch failure, so should 
eventually be aborted
    +    def runJobWithPersistentFetchFailure: Unit = {
    +      val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 
1)).groupByKey()
    +      val shuffleHandle =
    +        rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, 
_]].shuffleHandle
    +      rdd1.map {
    +        case (x, _) if (x == 1) =>
    +          throw new FetchFailedException(
    +            BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, 
"test")
    +        case (x, _) => x
    +      }.count()
    +    }
    +
    +    // Runs a job that encounters a single fetch failure but succeeds on 
the second attempt
    +    def runJobWithTemporaryFetchFailure: Unit = {
    +      object FailThisAttempt {
    +        val _fail = new AtomicBoolean(true)
    +      }
    +      val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 
1)).groupByKey()
    +      val shuffleHandle =
    +        rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, 
_]].shuffleHandle
    +      rdd1.map {
    +        case (x, _) if (x == 1) && FailThisAttempt._fail.getAndSet(false) 
=>
    +          throw new FetchFailedException(
    +            BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, 
"test")
    +      }
    +    }
    +
    +    failAfter(10.seconds) {
    +      val e = intercept[SparkException] {
    +        runJobWithPersistentFetchFailure
    +      }
    +      
assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException"))
    +    }
    +
    +    // Run a second job that will fail due to a fetch failure.
    +    // This job will hang without the fix for SPARK-17644.
    +    failAfter(10.seconds) {
    +      val e = intercept[SparkException] {
    +        runJobWithPersistentFetchFailure
    +      }
    +      
assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException"))
    +    }
    +
    +    failAfter(10.seconds) {
    +      try {
    +        runJobWithTemporaryFetchFailure
    +      } catch {
    +        case e: Throwable => fail("A job with one fetch failure should 
eventually succeed")
    --- End diff --
    
    nit: you don't need to specifically catch an exception and call `fail`, the 
test will automatically fail from an unhandled exception


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to