[ 
https://issues.apache.org/jira/browse/SPARK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14135666#comment-14135666
 ] 

Apache Spark commented on SPARK-3551:
-------------------------------------

User 'sarutak' has created a pull request for this issue:
https://github.com/apache/spark/pull/2413

> Remove redundant putting FetchResult which means Fetch Fail when Remote 
> fetching
> --------------------------------------------------------------------------------
>
>                 Key: SPARK-3551
>                 URL: https://issues.apache.org/jira/browse/SPARK-3551
>             Project: Spark
>          Issue Type: Improvement
>          Components: Shuffle, Spark Core
>    Affects Versions: 1.2.0
>            Reporter: Kousuke Saruta
>            Priority: Minor
>
> In ShuffleBlockFetcherIterator#fetchLocalBlocks, when local fetch is failed, 
> only first fail is put in results (LinkedBlockingQueue).
> {code}
>   private[this] def fetchLocalBlocks() {
>     // Get the local blocks while remote blocks are being fetched. Note that 
> it's okay to do
>     // these all at once because they will just memory-map some files, so 
> they won't consume
>     // any memory that might exceed our maxBytesInFlight
>     for (id <- localBlocks) {
>       try {
>         shuffleMetrics.localBlocksFetched += 1
>         results.put(new FetchResult(
>           id, 0, () => blockManager.getLocalShuffleFromDisk(id, 
> serializer).get))
>         logDebug("Got local block " + id)
>       } catch {
>         case e: Exception =>
>           logError(s"Error occurred while fetching local blocks", e)
>           results.put(new FetchResult(id, -1, null))
>           return                              
> <-------------------------------- fail fast.
>       }
>     }
> {code}
> But, in ShuffleBlockFetcherIterator#sendRequest, all of failed results are 
> put in results.
> {code}
>         override def onBlockFetchFailure(e: Throwable): Unit = {
>           logError("Failed to get block(s) from 
> ${req.address.host}:${req.address.port}", e)
>            // Note that there is a chance that some blocks have been fetched 
> successfully, but we
>            // still add them to the failed queue. This is fine because when 
> the caller see a
>            // FetchFailedException, it is going to fail the entire task 
> anyway.
>           for ((blockId, size) <- req.blocks) {
>             results.put(new FetchResult(blockId, -1, null))
>           }
> {code}
> I think, putting all of failed results is useless because when 
> BlockStoreShuffleFetcher#unpackBlock meets first failed result, then it 
> throws FetchFailedException.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to