holdenk commented on a change in pull request #28370: URL: https://github.com/apache/spark/pull/28370#discussion_r425941258
########## File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ########## @@ -1829,7 +1901,58 @@ private[spark] class BlockManager( data.dispose() } + /** + * Class to handle block manager decommissioning retries + * It creates a Thread to retry offloading all RDD cache blocks + */ + private class BlockManagerDecommissionManager(conf: SparkConf) { + @volatile private var stopped = false + private val sleepInterval = conf.get( + config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL) + + private val blockReplicationThread = new Thread { + override def run(): Unit = { + var failures = 0 + while (blockManagerDecommissioning + && !stopped + && !Thread.interrupted() + && failures < 20) { + try { + logDebug("Attempting to replicate all cached RDD blocks") + decommissionRddCacheBlocks() Review comment: We don't set `stop=true` here because we loop through this multiple times. It is possible that not all blocks will replicate in the first iteration and also possible that more blocks are stored while were decommissioning (e.g. any existing running tasks which have a persist). ########## File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ########## @@ -1551,30 +1555,36 @@ private[spark] class BlockManager( } /** - * Called for pro-active replenishment of blocks lost due to executor failures + * Replicates a block to peer block managers based on existingReplicas and maxReplicas * * @param blockId blockId being replicate * @param existingReplicas existing block managers that have a replica * @param maxReplicas maximum replicas needed + * @param maxReplicationFailures number of replication failures to tolerate before + * giving up. + * @return whether block was successfully replicated or not */ def replicateBlock( blockId: BlockId, existingReplicas: Set[BlockManagerId], - maxReplicas: Int): Unit = { + maxReplicas: Int, + maxReplicationFailures: Option[Int] = None): Boolean = { logInfo(s"Using $blockManagerId to pro-actively replicate $blockId") - blockInfoManager.lockForReading(blockId).foreach { info => + blockInfoManager.lockForReading(blockId).forall { info => Review comment: Using `map` would give us back an `Option[Boolean]` and we just want a `boolean` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org