Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r143820628 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala --- @@ -237,6 +246,43 @@ class BlockManagerMasterEndpoint( blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) } + private def replicateOneBlock( + execId: String, + blockId: BlockId, + excludeExecutors: Seq[String], + context: RpcCallContext): Unit = { + logDebug(s"replicating block $blockId") + val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get) + val response: Option[Future[Boolean]] = for { + blockManagerId <- blockManagerIdByExecutor.get(execId) + info <- blockManagerInfo.get(blockManagerId) + replicaSet <- blockLocations.asScala.get(blockId) + replicas = replicaSet.toSeq + maxReps = replicaSet.size + 2 + } yield info.slaveEndpoint.ask[Boolean](ReplicateBlock(blockId, replicas, excluded, maxReps)) + + response.getOrElse(Future.successful(false)).foreach(context.reply) + } + + private def getCachedBlocks(executorId: String): collection.Set[BlockId] = { + val cachedBlocks = for { + blockManagerId <- blockManagerIdByExecutor.get(executorId) + info <- blockManagerInfo.get(blockManagerId) + } yield info.cachedBlocks + + cachedBlocks.getOrElse(Set.empty) + } + + private def getSizeOfBlocks(blockMap: Map[String, Set[RDDBlockId]]): Map[String, Long] = for { --- End diff -- Put the body in `{ }` - looks kinda odd without it.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org