Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r165488466 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala --- @@ -246,6 +251,38 @@ class BlockManagerMasterEndpoint( blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) } + private def recoverLatestRDDBlock( + execId: String, + excludeExecutors: Seq[String], + context: RpcCallContext): Unit = { + logDebug(s"Replicating first cached block on $execId") + val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get) + val response: Option[Future[Boolean]] = for { + blockManagerId <- blockManagerIdByExecutor.get(execId) + info <- blockManagerInfo.get(blockManagerId) + blocks = info.cachedBlocks.collect { case r: RDDBlockId => r } + // we assume blocks from the latest rdd are most relevant + firstBlock <- if (blocks.isEmpty) None else Some(blocks.max[RDDBlockId](Ordering.by(_.rddId))) + replicaSet <- blockLocations.asScala.get(firstBlock) + // Add 2 below because you need the number of replicas, plus one for the original, plus one + // for the new replica. + maxReps = replicaSet.size + 2 + } yield info.slaveEndpoint + .ask[Boolean](ReplicateBlock(firstBlock, replicaSet.toSeq, excluded, maxReps)) + .flatMap { success => + if (success) { + logTrace(s"Replicated block $firstBlock on executor $execId") + replicaSet -= blockManagerId + info.slaveEndpoint.ask[Boolean](RemoveBlock(firstBlock)) --- End diff -- if I understand right, in order for the next iteration to avoid trying to remove the same block over again, you need this call to update the `info.cachedBlocks` used above. But I think that update is async -- even after this future has completed, it doesn't mean `info.cachedBlocks` has been updated. I think what will happen in that case is, on the next time through, on the next iteration you'll try to remove the exact same block, which will fail on the executor because its already been removed, and then back on the driver you'll decide to stop trying to replicate the rest of the blocks because of this apparent failure.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org