Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19041#discussion_r165488466
  
    --- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
    @@ -246,6 +251,38 @@ class BlockManagerMasterEndpoint(
         blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
       }
     
    +  private def recoverLatestRDDBlock(
    +      execId: String,
    +      excludeExecutors: Seq[String],
    +      context: RpcCallContext): Unit = {
    +    logDebug(s"Replicating first cached block on $execId")
    +    val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get)
    +    val response: Option[Future[Boolean]] = for {
    +      blockManagerId <- blockManagerIdByExecutor.get(execId)
    +      info <- blockManagerInfo.get(blockManagerId)
    +      blocks = info.cachedBlocks.collect { case r: RDDBlockId => r }
    +      // we assume blocks from the latest rdd are most relevant
    +      firstBlock <- if (blocks.isEmpty) None else 
Some(blocks.max[RDDBlockId](Ordering.by(_.rddId)))
    +      replicaSet <- blockLocations.asScala.get(firstBlock)
    +      // Add 2 below because you need the number of replicas, plus one for 
the original, plus one
    +      // for the new replica.
    +      maxReps = replicaSet.size + 2
    +    } yield info.slaveEndpoint
    +      .ask[Boolean](ReplicateBlock(firstBlock, replicaSet.toSeq, excluded, 
maxReps))
    +      .flatMap { success =>
    +        if (success) {
    +          logTrace(s"Replicated block $firstBlock on executor $execId")
    +          replicaSet -= blockManagerId
    +          info.slaveEndpoint.ask[Boolean](RemoveBlock(firstBlock))
    --- End diff --
    
    if I understand right, in order for the next iteration to avoid trying to 
remove the same block over again, you need this call to update the 
`info.cachedBlocks` used above.  But I think that update is async -- even after 
this future has completed, it doesn't mean `info.cachedBlocks` has been updated.
    
    I think what will happen in that case is, on the next time through, on the 
next iteration you'll try to remove the exact same block, which will fail on 
the executor because its already been removed, and then back on the driver 
you'll decide to stop trying to replicate the rest of the blocks because of 
this apparent failure.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to