wuyi created SPARK-36558: ---------------------------- Summary: Stage has all tasks finished but with ongoing finalization can cause job hang Key: SPARK-36558 URL: https://issues.apache.org/jira/browse/SPARK-36558 Project: Spark Issue Type: Sub-task Components: Spark Core Affects Versions: 3.2.0, 3.3.0 Reporter: wuyi
For a stage that all tasks are finished but with ongoing finalization can lead to job hang. The problem is that such stage is considered as a "missing" stage (see [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).] And it breaks the original assumption that a "missing" stage must have tasks to run. Normally, if stage A is the parent of (result) stage B and all tasks have finished in stage A, stage A will be skipped directly when submitting stage B. However, with this bug, stage A will be submitted, which leads to the job hang in the end. The example to reproduce: {code:java} test("Job hang") { initPushBasedShuffleConfs(conf) conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5) DAGSchedulerSuite.clearMergerLocs DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) val latch = new CountDownLatch(1) val myDAGScheduler = new MyDAGScheduler( sc, sc.dagScheduler.taskScheduler, sc.listenerBus, sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], sc.env.blockManager.master, sc.env) { override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = { // By this, we can mimic a stage with all tasks finished // but finalization is incomplete. latch.countDown() } } sc.dagScheduler = myDAGScheduler sc.taskScheduler.setDAGScheduler(myDAGScheduler) val parts = 20 val shuffleMapRdd = new MyRDD(sc, parts, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) reduceRdd1.countAsync() latch.await() // set _shuffleMergedFinalized to true can avoid the hang. // shuffleDep._shuffleMergedFinalized = true val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep)) reduceRdd2.count() } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org