This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 595e825  [SPARK-32922][SHUFFLE][CORE][FOLLOWUP] Fixes few issues when 
the executor tries to fetch push-merged blocks
595e825 is described below

commit 595e8251d16de6e5eb88bcad9b307f9aff88b842
Author: Chandni Singh <singh.chan...@gmail.com>
AuthorDate: Sat Jul 17 00:26:46 2021 -0500

    [SPARK-32922][SHUFFLE][CORE][FOLLOWUP] Fixes few issues when the executor 
tries to fetch push-merged blocks
    
    ### What changes were proposed in this pull request?
    Below 2 bugs were introduced with https://github.com/apache/spark/pull/32140
    1. Instead of requesting the local-dirs for push-merged-local blocks from 
the ESS, `PushBasedFetchHelper` requests it from other executors. Push-based 
shuffle is only enabled when the ESS is enabled so it should always fetch the 
dirs from the ESS and not from other executors which is not yet supported.
    2. The size of the push-merged blocks is logged incorrectly.
    
    ### Why are the changes needed?
    This fixes the above mentioned bugs and is needed for push-based shuffle to 
work properly.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Tested this by running an application on the cluster. The UTs mock the call 
`hostLocalDirManager.getHostLocalDirs` which is why didn't catch (1) with the 
UT. However, the fix is trivial and checking this in the UT will require a lot 
more effort so I haven't modified it in the UT.
    Logs of the executor with the bug
    ```
    21/07/15 15:42:46 WARN ExternalBlockStoreClient: Error while trying to get 
the host local dirs for [shuffle-push-merger]
    21/07/15 15:42:46 WARN PushBasedFetchHelper: Error while fetching the 
merged dirs for push-merged-local blocks: shuffle_0_-1_13. Fetch the original 
blocks instead
    java.lang.RuntimeException: java.lang.IllegalStateException: Invalid 
executor id: shuffle-push-merger, expected 92.
        at 
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:130)
        at 
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163)
    ```
    After the fix, the executors were able to fetch the local push-merged 
blocks.
    
    Closes #33378 from otterc/SPARK-32922-followup.
    
    Authored-by: Chandni Singh <singh.chan...@gmail.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
    (cherry picked from commit 6d2cbadcfe3b4badad9400c3bedf697ea6196ffa)
    Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
---
 .../org/apache/spark/storage/PushBasedFetchHelper.scala    | 14 +++++++++-----
 .../apache/spark/storage/ShuffleBlockFetcherIterator.scala |  6 +++---
 2 files changed, 12 insertions(+), 8 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala 
b/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala
index 63f42a0..096ea24 100644
--- a/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala
+++ b/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala
@@ -36,8 +36,8 @@ import org.apache.spark.storage.ShuffleBlockFetcherIterator._
  * Helper class for [[ShuffleBlockFetcherIterator]] that encapsulates all the 
push-based
  * functionality to fetch push-merged block meta and shuffle chunks.
  * A push-merged block contains multiple shuffle chunks where each shuffle 
chunk contains multiple
- * shuffle blocks that belong to the common reduce partition and were merged 
by the ESS to that
- * chunk.
+ * shuffle blocks that belong to the common reduce partition and were merged 
by the
+ * external shuffle service to that chunk.
  */
 private class PushBasedFetchHelper(
    private val iterator: ShuffleBlockFetcherIterator,
@@ -197,9 +197,13 @@ private class PushBasedFetchHelper(
           localShuffleMergerBlockMgrId)
       }
     } else {
-      logDebug(s"Asynchronous fetch the push-merged-local blocks without 
cached merged dirs")
-      hostLocalDirManager.getHostLocalDirs(localShuffleMergerBlockMgrId.host,
-        localShuffleMergerBlockMgrId.port, Array(SHUFFLE_MERGER_IDENTIFIER)) {
+      // Push-based shuffle is only enabled when the external shuffle service 
is enabled. If the
+      // external shuffle service is not enabled, then there will not be any 
push-merged blocks
+      // for the iterator to fetch.
+      logDebug(s"Asynchronous fetch the push-merged-local blocks without 
cached merged " +
+        s"dirs from the external shuffle service")
+      hostLocalDirManager.getHostLocalDirs(blockManager.blockManagerId.host,
+        blockManager.externalShuffleServicePort, 
Array(SHUFFLE_MERGER_IDENTIFIER)) {
         case Success(dirs) =>
           logDebug(s"Fetched merged dirs in " +
             s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startTimeNs)} ms")
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 094c3b5..d03f20a 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -386,7 +386,7 @@ final class ShuffleBlockFetcherIterator(
         if (address.host == blockManager.blockManagerId.host) {
           numBlocksToFetch += blockInfos.size
           pushMergedLocalBlocks ++= blockInfos.map(_._1)
-          pushMergedLocalBlockBytes += blockInfos.map(_._3).sum
+          pushMergedLocalBlockBytes += blockInfos.map(_._2).sum
         } else {
           collectFetchRequests(address, blockInfos, collectedRemoteRequests)
         }
@@ -886,8 +886,8 @@ final class ShuffleBlockFetcherIterator(
           //    blockId is a ShuffleBlockChunkId.
           // 2. Failure to read the push-merged-local meta. In this case, the 
blockId is
           //    ShuffleBlockId.
-          // 3. Failure to get the push-merged-local directories from the ESS. 
In this case, the
-          //    blockId is ShuffleBlockId.
+          // 3. Failure to get the push-merged-local directories from the 
external shuffle service.
+          //    In this case, the blockId is ShuffleBlockId.
           if (pushBasedFetchHelper.isRemotePushMergedBlockAddress(address)) {
             numBlocksInFlightPerAddress(address) = 
numBlocksInFlightPerAddress(address) - 1
             bytesInFlight -= size

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to