Kousuke Saruta created SPARK-3551:
-------------------------------------

             Summary: Remove redundant putting FetchResult which means Fetch 
Fail when Remote fetching
                 Key: SPARK-3551
                 URL: https://issues.apache.org/jira/browse/SPARK-3551
             Project: Spark
          Issue Type: Improvement
          Components: Shuffle, Spark Core
    Affects Versions: 1.2.0
            Reporter: Kousuke Saruta
            Priority: Minor


In ShuffleBlockFetcherIterator#fetchLocalBlocks, when local fetch is failed, 
only first fail is put in results (LinkedBlockingQueue).

{code}
  private[this] def fetchLocalBlocks() {
    // Get the local blocks while remote blocks are being fetched. Note that 
it's okay to do
    // these all at once because they will just memory-map some files, so they 
won't consume
    // any memory that might exceed our maxBytesInFlight
    for (id <- localBlocks) {
      try {
        shuffleMetrics.localBlocksFetched += 1
        results.put(new FetchResult(
          id, 0, () => blockManager.getLocalShuffleFromDisk(id, 
serializer).get))
        logDebug("Got local block " + id)
      } catch {
        case e: Exception =>
          logError(s"Error occurred while fetching local blocks", e)
          results.put(new FetchResult(id, -1, null))
          return                              <-------------------------------- 
fail fast.
      }
    }
{code}

But, in ShuffleBlockFetcherIterator#sendRequest, all of failed results are put 
in results.

{code}
        override def onBlockFetchFailure(e: Throwable): Unit = {
          logError("Failed to get block(s) from 
${req.address.host}:${req.address.port}", e)
           // Note that there is a chance that some blocks have been fetched 
successfully, but we
           // still add them to the failed queue. This is fine because when the 
caller see a
           // FetchFailedException, it is going to fail the entire task anyway.
          for ((blockId, size) <- req.blocks) {
            results.put(new FetchResult(blockId, -1, null))
          }
{code}

I think, putting all of failed results is useless because when 
BlockStoreShuffleFetcher#unpackBlock meets first failed result, then it throws 
FetchFailedException.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to