This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ddd867056f9 [SPARK-33573][CORE][FOLLOW-UP] Enhance ignoredBlockBytes in pushMergeMetrics to cover more scenarios ddd867056f9 is described below commit ddd867056f9f4f81d158f2699533152d6dc4c3d7 Author: Minchu Yang <miny...@minyang-mn1.linkedin.biz> AuthorDate: Fri Jan 27 04:15:15 2023 -0600 [SPARK-33573][CORE][FOLLOW-UP] Enhance ignoredBlockBytes in pushMergeMetrics to cover more scenarios ### What changes were proposed in this pull request? Currently, the `ignoredBlockBytes` of server side metrics for push-based shuffle does not fully capture the block bytes that were received by ESS but didn't get merged. This PR tries to enhance the logic of incrementing `ignoredBlockBytes`, to capture those bytes being ignored by ESS. Specifically, the bytes being considered as ignored should be 1. received after the shuffle file is finalized; 2. when a request is for a duplicate block; 3. the bytes ESS received but failed to write. ### Why are the changes needed? This would enhance the `ignoredBlockBytes` of server side metrics for push-based shuffle to be more accurate. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Modified existing UTs to capture `ignoredBlockBytes`. Closes #39725 from rmcyang/SPARK-33573-FOLLOWUP. Authored-by: Minchu Yang <miny...@minyang-mn1.linkedin.biz> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> --- .../network/shuffle/RemoteBlockPushResolver.java | 28 +++++++++++++++++++--- .../shuffle/RemoteBlockPushResolverSuite.java | 18 ++++++++------ docs/monitoring.md | 17 ++++++------- 3 files changed, 45 insertions(+), 18 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index a2e8219228a..df2d1fa12d1 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -701,7 +701,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { public void onData(String streamId, ByteBuffer buf) { // Ignore the requests. It reaches here either when a request is received after the // shuffle file is finalized or when a request is for a duplicate block. - pushMergeMetrics.ignoredBlockBytes.mark(buf.limit()); + pushMergeMetrics.ignoredBlockBytes.mark(buf.remaining()); } @Override @@ -1211,6 +1211,10 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { // Use on-heap instead of direct ByteBuffer since these buffers will be GC'ed very quickly private List<ByteBuffer> deferredBufs; + // This collects the total pushed block bytes received in the onData method. Once these bytes + // are not being used, we add them to the ignoredBlockBytes of the pushMergeMetrics. + private long receivedBytes = 0; + private PushBlockStreamCallback( RemoteBlockPushResolver mergeManager, AppShuffleInfo appShuffleInfo, @@ -1322,6 +1326,16 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { } } + /** + * Update ignoredBlockBytes in pushMergeMetrics. + */ + private void updateIgnoredBlockBytes() { + if (receivedBytes > 0) { + mergeManager.pushMergeMetrics.ignoredBlockBytes.mark(receivedBytes); + receivedBytes = 0; + } + } + /** * This increments the number of IOExceptions and throws RuntimeException if it exceeds the * threshold which will abort the merge of a particular shuffle partition. @@ -1358,6 +1372,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { @Override public void onData(String streamId, ByteBuffer buf) throws IOException { + receivedBytes += buf.remaining(); // When handling the block data using StreamInterceptor, it can help to reduce the amount // of data that needs to be buffered in memory since it does not wait till the completion // of the frame before handling the message, thus releasing the ByteBuf earlier. However, @@ -1481,6 +1496,9 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { // the client in cases of duplicate even though no data is written. if (isDuplicateBlock()) { freeDeferredBufs(); + // Since we just return without throwing exception, and the received bytes are ignored, + // thus we need to add them to ignoredBlockBytes in pushMergeMetrics. + updateIgnoredBlockBytes(); return; } if (partitionInfo.getCurrentMapIndex() < 0) { @@ -1538,6 +1556,9 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { } else { logger.debug("Encountered issue when merging {}", streamId, throwable); } + // The block was received by ESS but didn't get merged, so it is considered as "ignored". + // Capturing them in ignoredBlockBytes would help measure any server side improvement. + updateIgnoredBlockBytes(); // Only update partitionInfo if the failure corresponds to a valid request. If the // request is too late, i.e. received after shuffle merge finalize or stale block push, // #onFailure will also be triggered, and we can just ignore. Also, if we couldn't find @@ -2126,8 +2147,9 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { static final String DEFERRED_BLOCKS_METRIC = "deferredBlocks"; // staleBlockPushes tracks the number of stale shuffle block push requests static final String STALE_BLOCK_PUSHES_METRIC = "staleBlockPushes"; - // ignoredBlockBytes tracks the size of the blocks that are ignored after the shuffle file is - // finalized or when a request is for a duplicate block + // ignoredBlockBytes tracks the size of the blocks that are ignored. The pushed block data are + // considered as ignored for these cases: 1. received after the shuffle file is finalized; + // 2. when a request is for a duplicate block; 3. the part that ESS failed to write. static final String IGNORED_BLOCK_BYTES_METRIC = "ignoredBlockBytes"; private final Map<String, Metric> allMetrics; diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java index 630a651d243..2526a94f429 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java @@ -278,7 +278,7 @@ public class RemoteBlockPushResolverSuite { pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0); validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{4}, new int[][]{{0}}); - verifyMetrics(4, 0, 0, 0, 0, 0, 0); + verifyMetrics(4, 0, 0, 0, 0, 0, 4); } @Test @@ -291,7 +291,7 @@ public class RemoteBlockPushResolverSuite { pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0); assertEquals("num-chunks", 0, blockMeta.getNumChunks()); - verifyMetrics(4, 0, 0, 0, 0, 0, 0); + verifyMetrics(4, 0, 0, 0, 0, 0, 4); } @Test @@ -306,7 +306,7 @@ public class RemoteBlockPushResolverSuite { pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0); assertEquals("num-chunks", 0, blockMeta.getNumChunks()); - verifyMetrics(9, 0, 0, 0, 0, 0, 0); + verifyMetrics(9, 0, 0, 0, 0, 0, 9); } @Test @@ -322,7 +322,7 @@ public class RemoteBlockPushResolverSuite { pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0); validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{9}, new int[][]{{0}}); - verifyMetrics(9, 0, 0, 0, 0, 0, 0); + verifyMetrics(9, 0, 0, 0, 0, 0, 9); } @Test @@ -385,7 +385,7 @@ public class RemoteBlockPushResolverSuite { FileSegmentManagedBuffer mb = (FileSegmentManagedBuffer) pushResolver.getMergedBlockData(TEST_APP, 0, 0, 0, 0); assertArrayEquals(expectedBytes, mb.nioByteBuffer().array()); - verifyMetrics(14, 0, 0, 0, 0, 0, 0); + verifyMetrics(14, 0, 0, 0, 0, 0, 10); } @Test @@ -1066,6 +1066,8 @@ public class RemoteBlockPushResolverSuite { stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); BlockPushNonFatalFailure e = assertThrows(BlockPushNonFatalFailure.class, () -> stream1.onComplete(stream1.getID())); + // Trigger onFailure so that the stale bytes would be added into ignoredBytes + stream1.onFailure(stream1.getID(), new RuntimeException("Forced Failure")); BlockPushReturnCode errorCode = (BlockPushReturnCode) BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse()); assertEquals(BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH.id(), @@ -1076,7 +1078,7 @@ public class RemoteBlockPushResolverSuite { pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 2)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 2, 0); validateChunks(TEST_APP, 0, 2, 0, blockMeta, new int[]{4}, new int[][]{{0}}); - verifyMetrics(6, 0, 0, 0, 0, 2, 0); + verifyMetrics(6, 0, 0, 0, 0, 2, 4); } @Test @@ -1094,6 +1096,8 @@ public class RemoteBlockPushResolverSuite { // stream 1 push should be rejected as it is from an older shuffleMergeId BlockPushNonFatalFailure e = assertThrows(BlockPushNonFatalFailure.class, () -> stream1.onComplete(stream1.getID())); + // Trigger onFailure so that the stale bytes would be added into ignoredBytes + stream1.onFailure(stream1.getID(), new RuntimeException("Forced Failure")); BlockPushReturnCode errorCode = (BlockPushReturnCode) BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse()); assertEquals(BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH.id(), @@ -1111,7 +1115,7 @@ public class RemoteBlockPushResolverSuite { MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 2, 0); validateChunks(TEST_APP, 0, 2, 0, blockMeta, new int[]{4}, new int[][]{{0}}); - verifyMetrics(6, 0, 0, 0, 0, 2, 0); + verifyMetrics(6, 0, 0, 0, 0, 2, 4); } @Test diff --git a/docs/monitoring.md b/docs/monitoring.md index dbc2c14aea6..1f7acf4dece 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -1425,16 +1425,17 @@ Note: applies to the shuffle service - **note:** the metrics below apply when the server side configuration `spark.shuffle.push.server.mergedShuffleFileManagerImpl` is set to `org.apache.spark.network.shuffle.MergedShuffleFileManager` for Push-Based Shuffle -- blockBytesWritten - the size of the pushed block data written to file in bytes -- blockAppendCollisions - the number of shuffle push blocks collided in shuffle services +- blockBytesWritten - size of the pushed block data written to file in bytes +- blockAppendCollisions - number of shuffle push blocks collided in shuffle services as another block for the same reduce partition were being written -- lateBlockPushes - the number of shuffle push blocks that are received in shuffle service +- lateBlockPushes - number of shuffle push blocks that are received in shuffle service after the specific shuffle merge has been finalized -- deferredBlocks - the number of the current deferred block parts buffered in memory -- deferredBlockBytes - the size of the current deferred block parts buffered in memory -- staleBlockPushes - the number of stale shuffle block push requests -- ignoredBlockBytes - the size of the pushed block data that are ignored after the shuffle - file is finalized or when a request is for a duplicate block +- deferredBlocks - number of the current deferred block parts buffered in memory +- deferredBlockBytes - size of the current deferred block parts buffered in memory +- staleBlockPushes - number of stale shuffle block push requests +- ignoredBlockBytes - size of the pushed block data that was transferred to ESS, but ignored. + The pushed block data are considered as ignored when: 1. it was received after the shuffle + was finalized; 2. when a push request is for a duplicate block; 3. ESS was unable to write the block. # Advanced Instrumentation --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org