Github user sameeragarwal commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14412#discussion_r101432219
  
    --- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
    @@ -195,17 +198,39 @@ class BlockManagerMasterEndpoint(
     
         // Remove it from blockManagerInfo and remove all the blocks.
         blockManagerInfo.remove(blockManagerId)
    +
         val iterator = info.blocks.keySet.iterator
         while (iterator.hasNext) {
           val blockId = iterator.next
           val locations = blockLocations.get(blockId)
           locations -= blockManagerId
           if (locations.size == 0) {
             blockLocations.remove(blockId)
    +        logWarning(s"No more replicas available for $blockId !")
    +      } else if (proactivelyReplicate && (blockId.isRDD || 
blockId.isInstanceOf[TestBlockId])) {
    +        // only RDD blocks store data that users explicitly cache so we 
only need to proactively
    --- End diff --
    
    Just a nit, but how about we split this comment into two separate ones for 
better readability around the two separate set of issues. My suggestion:
    
    ```scala
          // De-register the block if none of the block managers have it. 
Otherwise, if pro-active
          // replication is enabled, and a block is either an RDD or a test 
block (the latter is used
          // for unit testing), we send a message to a randomly chosen executor 
location to replicate
          // the given block. Note that we ignore other block types (such as 
broadcast/shuffle blocks
          // etc.) as replication doesn't make much sense in that context.
          if (locations.size == 0) {
            blockLocations.remove(blockId)
          } else if (proactivelyReplicate && (blockId.isRDD || 
blockId.isInstanceOf[TestBlockId])) {
          // As a heursitic, assume single executor failure to find out the 
number of replicas that
          // existed before failure
          val maxReplicas = locations.size + 1
    
          val i = (new Random(blockId.hashCode)).nextInt(locations.size)
          val blockLocations = locations.toSeq
          val candidateBMId = blockLocations(i)
          val blockManager = blockManagerInfo.get(candidateBMId)
          if(blockManager.isDefined) {
            val remainingLocations = locations.toSeq.filter(bm => bm != 
candidateBMId)
            val replicateMsg = ReplicateBlock(blockId, remainingLocations, 
maxReplicas)
            blockManager.get.slaveEndpoint.ask[Boolean](replicateMsg)
          }
        }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to