vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r346979781
 
 

 ##########
 File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 ##########
 @@ -420,12 +447,88 @@ final class ShuffleBlockFetcherIterator(
     }
   }
 
+  private[this] def fetchHostLocalBlock(
+      blockId: BlockId,
+      mapIndex: Int,
+      localDirs: Array[String],
+      blockManagerId: BlockManagerId): Boolean = {
+    try {
+      val buf = blockManager.getHostLocalShuffleData(blockId, localDirs)
+      buf.retain()
+      results.put(SuccessFetchResult(blockId, mapIndex, blockManagerId, 
buf.size(), buf,
+        isNetworkReqDone = false))
+      true
+    } catch {
+      case e: Exception =>
+        // If we see an exception, stop immediately.
+        logError(s"Error occurred while fetching local blocks", e)
+        results.put(FailureFetchResult(blockId, mapIndex, blockManagerId, e))
+        false
+    }
+  }
+
+  /**
+   * Fetch the host-local blocks while we are fetching remote blocks. This is 
ok because
+   * `ManagedBuffer`'s memory is allocated lazily when we create the input 
stream, so all we
+   * track in-memory are the ManagedBuffer references themselves.
+   */
+  private[this] def fetchHostLocalBlocks(hostLocalDirManager: 
HostLocalDirManager): Unit = {
+    val cachedDirsByExec = hostLocalDirManager.getCachedHostLocalDirs()
+    val (hostLocalBlocksWithCachedDirs, hostLocalBlocksWithMissingDirs) =
+      hostLocalBlocksByExecutor
+        .map { case (hostLocalBmId, bmInfos) =>
+          (hostLocalBmId, bmInfos, 
cachedDirsByExec.get(hostLocalBmId.executorId))
+        }.partition(_._3.isDefined)
+    val bmId = blockManager.blockManagerId
+    val immutableHostLocalBlocksWithoutDirs =
+      hostLocalBlocksWithMissingDirs.map { case (hostLocalBmId, bmInfos, _) =>
+        hostLocalBmId -> bmInfos
+      }.toMap
+    if (immutableHostLocalBlocksWithoutDirs.nonEmpty) {
+      logDebug(s"Asynchronous fetching host-local blocks without cached 
executors' dir: " +
+        s"${immutableHostLocalBlocksWithoutDirs.mkString(", ")}")
+      val hostLocalDirsCompletable = new CompletableFuture[util.Map[String, 
Array[String]]]()
+      hostLocalDirManager.getHostLocalDirs(
+        immutableHostLocalBlocksWithoutDirs.keys.map(_.executorId).toArray,
+        hostLocalDirsCompletable)
+      hostLocalDirsCompletable.whenComplete((dirs, throwable) => {
+        if (dirs != null ) {
+          immutableHostLocalBlocksWithoutDirs.foreach { case (hostLocalBmId, 
blockInfos) =>
+            blockInfos.takeWhile { case (blockId, _, mapIndex) =>
+              fetchHostLocalBlock(
+                blockId,
+                mapIndex,
+                dirs.get(hostLocalBmId.executorId),
+                hostLocalBmId)
+            }
+          }
+        } else {
+          logError(s"Error occurred while fetching host local blocks", 
throwable)
+          val (hostLocalBmId, blockInfoSeq) = 
immutableHostLocalBlocksWithoutDirs.head
+          val (blockId, _, mapIndex) = blockInfoSeq.head
+          results.put(FailureFetchResult(blockId, mapIndex, hostLocalBmId, 
throwable))
 
 Review comment:
   If I'm following the code correctly, you're requiring that when this feature 
is on, that all reads from the same host are made from disk, right?
   
   Wouldn't it be better to allow remote requests if you detect that you're 
missing this information, and replace pending remote requests with local ones 
when this RPC finishes? Not sure how easy that would be though.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to