[ https://issues.apache.org/jira/browse/SPARK-36558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17402944#comment-17402944 ]
Min Shen commented on SPARK-36558: ---------------------------------- This is an issue we previously resolved internally. Seems that upstream version of the code diverged with our internal code again here. [~vsowrirajan], could you please take a look? > Stage has all tasks finished but with ongoing finalization can cause job hang > ----------------------------------------------------------------------------- > > Key: SPARK-36558 > URL: https://issues.apache.org/jira/browse/SPARK-36558 > Project: Spark > Issue Type: Sub-task > Components: Spark Core > Affects Versions: 3.2.0, 3.3.0 > Reporter: wuyi > Priority: Blocker > > > For a stage that all tasks are finished but with ongoing finalization can > lead to job hang. The problem is that such stage is considered as a "missing" > stage (see > [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).] > And it breaks the original assumption that a "missing" stage must have tasks > to run. > Normally, if stage A is the parent of (result) stage B and all tasks have > finished in stage A, stage A will be skipped directly when submitting stage > B. However, with this bug, stage A will be submitted, which leads to the job > hang in the end. > > The example to reproduce: > {code:java} > test("Job hang") { > initPushBasedShuffleConfs(conf) > conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5) > DAGSchedulerSuite.clearMergerLocs > DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", > "host5")) > val latch = new CountDownLatch(1) > val myDAGScheduler = new MyDAGScheduler( > sc, > sc.dagScheduler.taskScheduler, > sc.listenerBus, > sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], > sc.env.blockManager.master, > sc.env) { > override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = > { > // By this, we can mimic a stage with all tasks finished > // but finalization is incomplete. > latch.countDown() > } > } > sc.dagScheduler = myDAGScheduler > sc.taskScheduler.setDAGScheduler(myDAGScheduler) > val parts = 20 > val shuffleMapRdd = new MyRDD(sc, parts, Nil) > val shuffleDep = new ShuffleDependency(shuffleMapRdd, new > HashPartitioner(parts)) > val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = > mapOutputTracker) > reduceRdd1.countAsync() > latch.await() > // set _shuffleMergedFinalized to true can avoid the hang. > // shuffleDep._shuffleMergedFinalized = true > val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep)) > reduceRdd2.count() > } > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org