vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host URL: https://github.com/apache/spark/pull/25299#discussion_r315832831
########## File path: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ########## @@ -265,70 +277,82 @@ final class ShuffleBlockFetcherIterator( } } - private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = { - // Make remote requests at most maxBytesInFlight / 5 in length; the reason to keep them - // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5 - // nodes, rather than blocking on reading output from one node. - val targetRequestSize = math.max(maxBytesInFlight / 5, 1L) - logDebug("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " + targetRequestSize - + ", maxBlocksInFlightPerAddress: " + maxBlocksInFlightPerAddress) - - // Split local and remote blocks. Remote blocks are further split into FetchRequests of size - // at most maxBytesInFlight in order to limit the amount of data in flight. - val remoteRequests = new ArrayBuffer[FetchRequest] + private[this] def partitionBlocksByFetchMode(): ArrayBuffer[FetchRequest] = { + logDebug(s"maxBytesInFlight: $maxBytesInFlight, targetRemoteRequestSize: " + + s"$targetRemoteRequestSize, maxBlocksInFlightPerAddress: $maxBlocksInFlightPerAddress") + + // Partition to local, host-local and remote blocks. Remote blocks are further split into + // FetchRequests of size at most maxBytesInFlight in order to limit the amount of data in flight + val collectedRemoteRequests = new ArrayBuffer[FetchRequest] var localBlockBytes = 0L + var hostLocalBlockBytes = 0L var remoteBlockBytes = 0L + var numRemoteBlocks = 0 for ((address, blockInfos) <- blocksByAddress) { + numBlocksToFetch += blockInfos.size if (address.executorId == blockManager.blockManagerId.executorId) { - blockInfos.find(_._2 <= 0) match { - case Some((blockId, size)) if size < 0 => - throw new BlockException(blockId, "Negative block size " + size) - case Some((blockId, size)) if size == 0 => - throw new BlockException(blockId, "Zero-sized blocks should be excluded.") - case None => // do nothing. - } + checkBlockSizes(blockInfos) localBlocks ++= blockInfos.map(_._1) localBlockBytes += blockInfos.map(_._2).sum - numBlocksToFetch += localBlocks.size + } else if (enableHostLocalDiskReading && address.host == blockManager.blockManagerId.host) { Review comment: Could you add a comment here that there's a case where not all of these blocks can be read from the local disk (because of the limit in the cache size), and that is handled later by `fetchHostLocalBlocks`? That also means that the log message printed later may not be 100% accurate, although that's a minor issue. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org