[ https://issues.apache.org/jira/browse/SPARK-40987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-40987: ------------------------------------ Assignee: Apache Spark > Avoid creating a directory when deleting a block, causing DAGScheduler to not > work > ---------------------------------------------------------------------------------- > > Key: SPARK-40987 > URL: https://issues.apache.org/jira/browse/SPARK-40987 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.2.2, 3.3.1 > Reporter: dzcxzl > Assignee: Apache Spark > Priority: Minor > > When the driver submits a job, DAGScheduler calls > sc.broadcast(taskBinaryBytes). > TorrentBroadcast#writeBlocks may fail due to disk problems during > blockManager#putBytes. > BlockManager#doPut calls BlockManager#removeBlockInternal to clean up the > block. > BlockManager#removeBlockInternal calls DiskStore#remove to clean up blocks on > disk. > DiskStore#remove will try to create the directory because the directory does > not exist, and an exception will be thrown at this time. > BlockInfoManager#blockInfoWrappers block info and lock not removed. > The catch block in TorrentBroadcast#writeBlocks will call > blockManager.removeBroadcast to clean up the broadcast. > Because the block lock in BlockInfoManager#blockInfoWrappers is not released, > the dag-scheduler-event-loop thread of DAGScheduler will wait forever. > > > {code:java} > 22/11/01 18:27:48 WARN BlockManager: Putting block broadcast_0_piece0 failed > due to exception java.io.IOException: XXXXX. > 22/11/01 18:27:48 ERROR TorrentBroadcast: Store broadcast broadcast_0 fail, > remove all pieces of the broadcast {code} > > > > {code:java} > "dag-scheduler-event-loop" #54 daemon prio=5 os_prio=31 > tid=0x00007fc98e3fa800 nid=0x7203 waiting on condition [0x0000700008c1e000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000007add3d8c8> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > at > org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1(BlockInfoManager.scala:221) > at > org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1$adapted(BlockInfoManager.scala:214) > at > org.apache.spark.storage.BlockInfoManager$$Lambda$3038/1307533457.apply(Unknown > Source) > at > org.apache.spark.storage.BlockInfoWrapper.withLock(BlockInfoManager.scala:105) > at > org.apache.spark.storage.BlockInfoManager.acquireLock(BlockInfoManager.scala:214) > at > org.apache.spark.storage.BlockInfoManager.lockForWriting(BlockInfoManager.scala:293) > at > org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1979) > at > org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3(BlockManager.scala:1970) > at > org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3$adapted(BlockManager.scala:1970) > at > org.apache.spark.storage.BlockManager$$Lambda$3092/1241801156.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at > org.apache.spark.storage.BlockManager.removeBroadcast(BlockManager.scala:1970) > at > org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:179) > at > org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99) > at > org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38) > at > org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78) > at > org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1538) > at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1520) > at > org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1539) > at > org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1355) > at > org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1297) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2929) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2921) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2910) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org