Github user sameeragarwal commented on a diff in the pull request: https://github.com/apache/spark/pull/14412#discussion_r101432219 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala --- @@ -195,17 +198,39 @@ class BlockManagerMasterEndpoint( // Remove it from blockManagerInfo and remove all the blocks. blockManagerInfo.remove(blockManagerId) + val iterator = info.blocks.keySet.iterator while (iterator.hasNext) { val blockId = iterator.next val locations = blockLocations.get(blockId) locations -= blockManagerId if (locations.size == 0) { blockLocations.remove(blockId) + logWarning(s"No more replicas available for $blockId !") + } else if (proactivelyReplicate && (blockId.isRDD || blockId.isInstanceOf[TestBlockId])) { + // only RDD blocks store data that users explicitly cache so we only need to proactively --- End diff -- Just a nit, but how about we split this comment into two separate ones for better readability around the two separate set of issues. My suggestion: ```scala // De-register the block if none of the block managers have it. Otherwise, if pro-active // replication is enabled, and a block is either an RDD or a test block (the latter is used // for unit testing), we send a message to a randomly chosen executor location to replicate // the given block. Note that we ignore other block types (such as broadcast/shuffle blocks // etc.) as replication doesn't make much sense in that context. if (locations.size == 0) { blockLocations.remove(blockId) } else if (proactivelyReplicate && (blockId.isRDD || blockId.isInstanceOf[TestBlockId])) { // As a heursitic, assume single executor failure to find out the number of replicas that // existed before failure val maxReplicas = locations.size + 1 val i = (new Random(blockId.hashCode)).nextInt(locations.size) val blockLocations = locations.toSeq val candidateBMId = blockLocations(i) val blockManager = blockManagerInfo.get(candidateBMId) if(blockManager.isDefined) { val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId) val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas) blockManager.get.slaveEndpoint.ask[Boolean](replicateMsg) } } ```
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org