Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19041#discussion_r143820628
  
    --- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
    @@ -237,6 +246,43 @@ class BlockManagerMasterEndpoint(
         blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
       }
     
    +  private def replicateOneBlock(
    +      execId: String,
    +      blockId: BlockId,
    +      excludeExecutors: Seq[String],
    +      context: RpcCallContext): Unit = {
    +    logDebug(s"replicating block $blockId")
    +    val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get)
    +    val response: Option[Future[Boolean]] = for {
    +      blockManagerId <- blockManagerIdByExecutor.get(execId)
    +      info <- blockManagerInfo.get(blockManagerId)
    +      replicaSet <- blockLocations.asScala.get(blockId)
    +      replicas = replicaSet.toSeq
    +      maxReps = replicaSet.size + 2
    +    } yield info.slaveEndpoint.ask[Boolean](ReplicateBlock(blockId, 
replicas, excluded, maxReps))
    +
    +    response.getOrElse(Future.successful(false)).foreach(context.reply)
    +  }
    +
    +  private def getCachedBlocks(executorId: String): collection.Set[BlockId] 
= {
    +    val cachedBlocks = for {
    +      blockManagerId <- blockManagerIdByExecutor.get(executorId)
    +      info <- blockManagerInfo.get(blockManagerId)
    +    } yield info.cachedBlocks
    +
    +    cachedBlocks.getOrElse(Set.empty)
    +  }
    +
    +  private def getSizeOfBlocks(blockMap: Map[String, Set[RDDBlockId]]): 
Map[String, Long] = for {
    --- End diff --
    
    Put the body in `{ }` - looks kinda odd without it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to