[ https://issues.apache.org/jira/browse/SPARK-36558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17403507#comment-17403507 ]
Venkata krishnan Sowrirajan commented on SPARK-36558: ----------------------------------------------------- [~Ngone51] I am not sure if you can run the tests that way, because then the `DAGSchedulerSuite` custom `DAGScheduler` or `DAGSchedulerEventLoopTester` won't be used therefore it submits the job with the default `DAGScheduler` and waits in the `finalizeShuffleMerge` which cannot complete because of our mocked up merger locations. Btw below is the test code along with the above changes you mentioned added to `MyRDD`. Am I missing something? {code:java} test("Job hang") { initPushBasedShuffleConfs(conf) conf.set("spark.shuffle.push.mergerLocations.minThreshold", "5") DAGSchedulerSuite.clearMergerLocs DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) val latch = new CountDownLatch(1) scheduler = new MyDAGScheduler( sc, taskScheduler, sc.listenerBus, mapOutputTracker, blockManagerMaster, sc.env) { override private[spark] def scheduleShuffleMergeFinalize( stage: ShuffleMapStage): Unit = { // By this, we can mimic a stage with all tasks finished // but finalization is incomplete. latch.countDown() super.scheduleShuffleMergeFinalize(stage) } } dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(scheduler) val parts = 5 val shuffleMapRdd = new MyRDD(sc, parts, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) reduceRdd1.count() latch.await() // scalastyle:off println("=========after wait==========") // set _shuffleMergedFinalized to true can avoid the hang. val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep)) reduceRdd2.count() } {code} > Stage has all tasks finished but with ongoing finalization can cause job hang > ----------------------------------------------------------------------------- > > Key: SPARK-36558 > URL: https://issues.apache.org/jira/browse/SPARK-36558 > Project: Spark > Issue Type: Sub-task > Components: Spark Core > Affects Versions: 3.2.0, 3.3.0 > Reporter: wuyi > Priority: Blocker > > > For a stage that all tasks are finished but with ongoing finalization can > lead to job hang. The problem is that such stage is considered as a "missing" > stage (see > [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).] > And it breaks the original assumption that a "missing" stage must have tasks > to run. > Normally, if stage A is the parent of (result) stage B and all tasks have > finished in stage A, stage A will be skipped directly when submitting stage > B. However, with this bug, stage A will be submitted. And submitting a stage > with no tasks to run would not be able to add its child stage into the > waiting stage list, which leads to the job hang in the end. > > The example to reproduce: > First, change `MyRDD` to allow it to compute: > {code:java} > override def compute(split: Partition, context: TaskContext): Iterator[(Int, > Int)] = { > Iterator.single((1, 1)) > }{code} > Then run this test: > {code:java} > test("Job hang") { > initPushBasedShuffleConfs(conf) > conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5) > DAGSchedulerSuite.clearMergerLocs > DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", > "host5")) > val latch = new CountDownLatch(1) > val myDAGScheduler = new MyDAGScheduler( > sc, > sc.dagScheduler.taskScheduler, > sc.listenerBus, > sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], > sc.env.blockManager.master, > sc.env) { > override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = > { > // By this, we can mimic a stage with all tasks finished > // but finalization is incomplete. > latch.countDown() > } > } > sc.dagScheduler = myDAGScheduler > sc.taskScheduler.setDAGScheduler(myDAGScheduler) > val parts = 20 > val shuffleMapRdd = new MyRDD(sc, parts, Nil) > val shuffleDep = new ShuffleDependency(shuffleMapRdd, new > HashPartitioner(parts)) > val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = > mapOutputTracker) > reduceRdd1.countAsync() > latch.await() > // scalastyle:off > println("=========after wait==========") > // set _shuffleMergedFinalized to true can avoid the hang. > // shuffleDep._shuffleMergedFinalized = true > val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep)) > reduceRdd2.count() > } > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org