Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15213#discussion_r80845128
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2105,6 +2107,59 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
         assert(scheduler.getShuffleDependencies(rddE) === Set(shuffleDepA, 
shuffleDepC))
       }
     
    +  test("After one stage is aborted for too many failed attempts, 
subsequent stages" +
    +    "still behave correctly on fetch failures") {
    +    def fetchFailJob: Unit = {
    +      val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 
1)).groupByKey()
    +      val shuffleHandle =
    +        rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, 
_]].shuffleHandle
    +      rdd1.map {
    +        case (x, _) if (x == 1) =>
    +          throw new FetchFailedException(
    +            BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, 
"test")
    +        case (x, _) => x
    +      }.count()
    +    }
    +
    +    def successJob: Unit = {
    +      object FailThisAttempt {
    +        val _fail = new AtomicBoolean(true)
    +      }
    +      val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 
1)).groupByKey()
    +      val shuffleHandle =
    +        rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, 
_]].shuffleHandle
    +      rdd1.map {
    +        case (x, _) if (x == 1) && FailThisAttempt._fail.getAndSet(false) 
=>
    +          throw new FetchFailedException(
    +            BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, 
"test")
    +      }
    +    }
    +
    +    failAfter(60.seconds) {
    +      val e = intercept[SparkException] {
    +        fetchFailJob
    +      }
    +      
assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException"))
    +    }
    +
    +    // The following job that fails due to fetching failure will hang 
without
    +    // the fix for SPARK-17644
    +    failAfter(60.seconds) {
    --- End diff --
    
    I think a shorter timeout would be appropriate here to avoid slow-ness when 
this fails...maybe 10 seconds? That still seems plenty conservative since the 
resubmit timeout is 200 millis.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to