This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 595e825 [SPARK-32922][SHUFFLE][CORE][FOLLOWUP] Fixes few issues when the executor tries to fetch push-merged blocks 595e825 is described below commit 595e8251d16de6e5eb88bcad9b307f9aff88b842 Author: Chandni Singh <singh.chan...@gmail.com> AuthorDate: Sat Jul 17 00:26:46 2021 -0500 [SPARK-32922][SHUFFLE][CORE][FOLLOWUP] Fixes few issues when the executor tries to fetch push-merged blocks ### What changes were proposed in this pull request? Below 2 bugs were introduced with https://github.com/apache/spark/pull/32140 1. Instead of requesting the local-dirs for push-merged-local blocks from the ESS, `PushBasedFetchHelper` requests it from other executors. Push-based shuffle is only enabled when the ESS is enabled so it should always fetch the dirs from the ESS and not from other executors which is not yet supported. 2. The size of the push-merged blocks is logged incorrectly. ### Why are the changes needed? This fixes the above mentioned bugs and is needed for push-based shuffle to work properly. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Tested this by running an application on the cluster. The UTs mock the call `hostLocalDirManager.getHostLocalDirs` which is why didn't catch (1) with the UT. However, the fix is trivial and checking this in the UT will require a lot more effort so I haven't modified it in the UT. Logs of the executor with the bug ``` 21/07/15 15:42:46 WARN ExternalBlockStoreClient: Error while trying to get the host local dirs for [shuffle-push-merger] 21/07/15 15:42:46 WARN PushBasedFetchHelper: Error while fetching the merged dirs for push-merged-local blocks: shuffle_0_-1_13. Fetch the original blocks instead java.lang.RuntimeException: java.lang.IllegalStateException: Invalid executor id: shuffle-push-merger, expected 92. at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:130) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163) ``` After the fix, the executors were able to fetch the local push-merged blocks. Closes #33378 from otterc/SPARK-32922-followup. Authored-by: Chandni Singh <singh.chan...@gmail.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit 6d2cbadcfe3b4badad9400c3bedf697ea6196ffa) Signed-off-by: Mridul Muralidharan <mridulatgmail.com> --- .../org/apache/spark/storage/PushBasedFetchHelper.scala | 14 +++++++++----- .../apache/spark/storage/ShuffleBlockFetcherIterator.scala | 6 +++--- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala b/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala index 63f42a0..096ea24 100644 --- a/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala +++ b/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala @@ -36,8 +36,8 @@ import org.apache.spark.storage.ShuffleBlockFetcherIterator._ * Helper class for [[ShuffleBlockFetcherIterator]] that encapsulates all the push-based * functionality to fetch push-merged block meta and shuffle chunks. * A push-merged block contains multiple shuffle chunks where each shuffle chunk contains multiple - * shuffle blocks that belong to the common reduce partition and were merged by the ESS to that - * chunk. + * shuffle blocks that belong to the common reduce partition and were merged by the + * external shuffle service to that chunk. */ private class PushBasedFetchHelper( private val iterator: ShuffleBlockFetcherIterator, @@ -197,9 +197,13 @@ private class PushBasedFetchHelper( localShuffleMergerBlockMgrId) } } else { - logDebug(s"Asynchronous fetch the push-merged-local blocks without cached merged dirs") - hostLocalDirManager.getHostLocalDirs(localShuffleMergerBlockMgrId.host, - localShuffleMergerBlockMgrId.port, Array(SHUFFLE_MERGER_IDENTIFIER)) { + // Push-based shuffle is only enabled when the external shuffle service is enabled. If the + // external shuffle service is not enabled, then there will not be any push-merged blocks + // for the iterator to fetch. + logDebug(s"Asynchronous fetch the push-merged-local blocks without cached merged " + + s"dirs from the external shuffle service") + hostLocalDirManager.getHostLocalDirs(blockManager.blockManagerId.host, + blockManager.externalShuffleServicePort, Array(SHUFFLE_MERGER_IDENTIFIER)) { case Success(dirs) => logDebug(s"Fetched merged dirs in " + s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms") diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 094c3b5..d03f20a 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -386,7 +386,7 @@ final class ShuffleBlockFetcherIterator( if (address.host == blockManager.blockManagerId.host) { numBlocksToFetch += blockInfos.size pushMergedLocalBlocks ++= blockInfos.map(_._1) - pushMergedLocalBlockBytes += blockInfos.map(_._3).sum + pushMergedLocalBlockBytes += blockInfos.map(_._2).sum } else { collectFetchRequests(address, blockInfos, collectedRemoteRequests) } @@ -886,8 +886,8 @@ final class ShuffleBlockFetcherIterator( // blockId is a ShuffleBlockChunkId. // 2. Failure to read the push-merged-local meta. In this case, the blockId is // ShuffleBlockId. - // 3. Failure to get the push-merged-local directories from the ESS. In this case, the - // blockId is ShuffleBlockId. + // 3. Failure to get the push-merged-local directories from the external shuffle service. + // In this case, the blockId is ShuffleBlockId. if (pushBasedFetchHelper.isRemotePushMergedBlockAddress(address)) { numBlocksInFlightPerAddress(address) = numBlocksInFlightPerAddress(address) - 1 bytesInFlight -= size --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org