vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r315832831
 
 

 ##########
 File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 ##########
 @@ -265,70 +277,82 @@ final class ShuffleBlockFetcherIterator(
     }
   }
 
-  private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
-    // Make remote requests at most maxBytesInFlight / 5 in length; the reason 
to keep them
-    // smaller than maxBytesInFlight is to allow multiple, parallel fetches 
from up to 5
-    // nodes, rather than blocking on reading output from one node.
-    val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)
-    logDebug("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " 
+ targetRequestSize
-      + ", maxBlocksInFlightPerAddress: " + maxBlocksInFlightPerAddress)
-
-    // Split local and remote blocks. Remote blocks are further split into 
FetchRequests of size
-    // at most maxBytesInFlight in order to limit the amount of data in flight.
-    val remoteRequests = new ArrayBuffer[FetchRequest]
+  private[this] def partitionBlocksByFetchMode(): ArrayBuffer[FetchRequest] = {
+    logDebug(s"maxBytesInFlight: $maxBytesInFlight, targetRemoteRequestSize: "
+      + s"$targetRemoteRequestSize, maxBlocksInFlightPerAddress: 
$maxBlocksInFlightPerAddress")
+
+    // Partition to local, host-local and remote blocks. Remote blocks are 
further split into
+    // FetchRequests of size at most maxBytesInFlight in order to limit the 
amount of data in flight
+    val collectedRemoteRequests = new ArrayBuffer[FetchRequest]
     var localBlockBytes = 0L
+    var hostLocalBlockBytes = 0L
     var remoteBlockBytes = 0L
+    var numRemoteBlocks = 0
 
     for ((address, blockInfos) <- blocksByAddress) {
+      numBlocksToFetch += blockInfos.size
       if (address.executorId == blockManager.blockManagerId.executorId) {
-        blockInfos.find(_._2 <= 0) match {
-          case Some((blockId, size)) if size < 0 =>
-            throw new BlockException(blockId, "Negative block size " + size)
-          case Some((blockId, size)) if size == 0 =>
-            throw new BlockException(blockId, "Zero-sized blocks should be 
excluded.")
-          case None => // do nothing.
-        }
+        checkBlockSizes(blockInfos)
         localBlocks ++= blockInfos.map(_._1)
         localBlockBytes += blockInfos.map(_._2).sum
-        numBlocksToFetch += localBlocks.size
+      } else if (enableHostLocalDiskReading && address.host == 
blockManager.blockManagerId.host) {
 
 Review comment:
   Could you add a comment here that there's a case where not all of these 
blocks can be read from the local disk (because of the limit in the cache 
size), and that is handled later by `fetchHostLocalBlocks`?
   
   That also means that the log message printed later may not be 100% accurate, 
although that's a minor issue.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to