[ https://issues.apache.org/jira/browse/SPARK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14135666#comment-14135666 ]
Apache Spark commented on SPARK-3551: ------------------------------------- User 'sarutak' has created a pull request for this issue: https://github.com/apache/spark/pull/2413 > Remove redundant putting FetchResult which means Fetch Fail when Remote > fetching > -------------------------------------------------------------------------------- > > Key: SPARK-3551 > URL: https://issues.apache.org/jira/browse/SPARK-3551 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core > Affects Versions: 1.2.0 > Reporter: Kousuke Saruta > Priority: Minor > > In ShuffleBlockFetcherIterator#fetchLocalBlocks, when local fetch is failed, > only first fail is put in results (LinkedBlockingQueue). > {code} > private[this] def fetchLocalBlocks() { > // Get the local blocks while remote blocks are being fetched. Note that > it's okay to do > // these all at once because they will just memory-map some files, so > they won't consume > // any memory that might exceed our maxBytesInFlight > for (id <- localBlocks) { > try { > shuffleMetrics.localBlocksFetched += 1 > results.put(new FetchResult( > id, 0, () => blockManager.getLocalShuffleFromDisk(id, > serializer).get)) > logDebug("Got local block " + id) > } catch { > case e: Exception => > logError(s"Error occurred while fetching local blocks", e) > results.put(new FetchResult(id, -1, null)) > return > <-------------------------------- fail fast. > } > } > {code} > But, in ShuffleBlockFetcherIterator#sendRequest, all of failed results are > put in results. > {code} > override def onBlockFetchFailure(e: Throwable): Unit = { > logError("Failed to get block(s) from > ${req.address.host}:${req.address.port}", e) > // Note that there is a chance that some blocks have been fetched > successfully, but we > // still add them to the failed queue. This is fine because when > the caller see a > // FetchFailedException, it is going to fail the entire task > anyway. > for ((blockId, size) <- req.blocks) { > results.put(new FetchResult(blockId, -1, null)) > } > {code} > I think, putting all of failed results is useless because when > BlockStoreShuffleFetcher#unpackBlock meets first failed result, then it > throws FetchFailedException. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org