This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new c106e01 [SPARK-34840][SHUFFLE] Fixes cases of corruption in merged shuffle … c106e01 is described below commit c106e01125b8f147615afd7ffeb3fca8210b339c Author: Chandni Singh <singh.chan...@gmail.com> AuthorDate: Thu Mar 25 12:47:46 2021 -0500 [SPARK-34840][SHUFFLE] Fixes cases of corruption in merged shuffle … ### What changes were proposed in this pull request? This PR fixes bugs that causes corruption of push-merged blocks when a client terminates while pushing block. `RemoteBlockPushResolver` was introduced in #30062 (SPARK-32916). There are 2 scenarios where the merged blocks get corrupted: 1. `StreamCallback.onFailure()` is called more than once. Initially we assumed that the onFailure callback will be called just once per stream. However, we observed that this is called twice when a client connection is reset. When the client connection is reset then there are 2 events that get triggered in this order. - `exceptionCaught`. This event is propagated to `StreamInterceptor`. `StreamInterceptor.exceptionCaught()` invokes `callback.onFailure(streamId, cause)`. This is the first time StreamCallback.onFailure() will be invoked. - `channelInactive`. Since the channel closes, the `channelInactive` event gets triggered which again is propagated to `StreamInterceptor`. `StreamInterceptor.channelInactive()` invokes `callback.onFailure(streamId, new ClosedChannelException())`. This is the second time StreamCallback.onFailure() will be invoked. 2. The flag `isWriting` is set prematurely to true. This introduces an edge case where a stream that is trying to merge a duplicate block (created because of a speculative task) may interfere with an active stream if the duplicate stream fails. Also adding additional changes that improve the code. 1. Using positional writes all the time because this simplifies the code and with microbenchmarking haven't seen any performance impact. 2. Additional minor changes suggested by mridulm during an internal review. ### Why are the changes needed? These are bug fixes and simplify the code. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit tests. I have also tested these changes in Linkedin's internal fork on a cluster. Co-authored-by: Chandni Singh chsinghlinkedin.com Co-authored-by: Min Shen mshenlinkedin.com Closes #31934 from otterc/SPARK-32916-followup. Lead-authored-by: Chandni Singh <singh.chan...@gmail.com> Co-authored-by: Min Shen <ms...@linkedin.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit 6d88212f79a67fa070a91a0123c5bb34683ddc3a) Signed-off-by: Mridul Muralidharan <mridulatgmail.com> --- .../network/shuffle/RemoteBlockPushResolver.java | 53 ++++--------- .../shuffle/RemoteBlockPushResolverSuite.java | 87 +++++++++++++++++++++- 2 files changed, 97 insertions(+), 43 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 0e23556..9363efc5 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -30,7 +30,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; @@ -445,9 +444,9 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { private final AppShufflePartitionInfo partitionInfo; private int length = 0; // This indicates that this stream got the opportunity to write the blocks to the merged file. - // Once this is set to true and the stream encounters a failure then it will take necessary - // action to overwrite any partial written data. This is reset to false when the stream - // completes without any failures. + // Once this is set to true and the stream encounters a failure then it will unset the + // currentMapId of the partition so that another stream can start merging the blocks to the + // partition. This is reset to false when the stream completes. private boolean isWriting = false; // Use on-heap instead of direct ByteBuffer since these buffers will be GC'ed very quickly private List<ByteBuffer> deferredBufs; @@ -477,16 +476,11 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { */ private void writeBuf(ByteBuffer buf) throws IOException { while (buf.hasRemaining()) { - if (partitionInfo.isEncounteredFailure()) { - long updatedPos = partitionInfo.getDataFilePos() + length; - logger.debug( - "{} shuffleId {} reduceId {} encountered failure current pos {} updated pos {}", - partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId, - partitionInfo.reduceId, partitionInfo.getDataFilePos(), updatedPos); - length += partitionInfo.dataChannel.write(buf, updatedPos); - } else { - length += partitionInfo.dataChannel.write(buf); - } + long updatedPos = partitionInfo.getDataFilePos() + length; + logger.debug("{} shuffleId {} reduceId {} current pos {} updated pos {}", + partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId, + partitionInfo.reduceId, partitionInfo.getDataFilePos(), updatedPos); + length += partitionInfo.dataChannel.write(buf, updatedPos); } } @@ -581,7 +575,6 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { } // Check whether we can write to disk if (allowedToWrite()) { - isWriting = true; // Identify duplicate block generated by speculative tasks. We respond success to // the client in cases of duplicate even though no data is written. if (isDuplicateBlock()) { @@ -598,6 +591,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { // If we got here, it's safe to write the block data to the merged shuffle file. We // first write any deferred block. + isWriting = true; try { if (deferredBufs != null && !deferredBufs.isEmpty()) { writeDeferredBufs(); @@ -609,16 +603,6 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { // back to the client so the block could be retried. throw ioe; } - // If we got here, it means we successfully write the current chunk of block to merged - // shuffle file. If we encountered failure while writing the previous block, we should - // reset the file channel position and the status of partitionInfo to indicate that we - // have recovered from previous disk write failure. However, we do not update the - // position tracked by partitionInfo here. That is only updated while the entire block - // is successfully written to merged shuffle file. - if (partitionInfo.isEncounteredFailure()) { - partitionInfo.dataChannel.position(partitionInfo.getDataFilePos() + length); - partitionInfo.setEncounteredFailure(false); - } } else { logger.trace("{} shuffleId {} reduceId {} onData deferred", partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId, @@ -639,7 +623,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { // written to disk due to this reason. We thus decide to optimize for server // throughput and memory usage. if (deferredBufs == null) { - deferredBufs = new LinkedList<>(); + deferredBufs = new ArrayList<>(); } // Write the buffer to the in-memory deferred cache. Since buf is a slice of a larger // byte buffer, we cache only the relevant bytes not the entire large buffer to save @@ -670,7 +654,6 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { } // Check if we can commit this block if (allowedToWrite()) { - isWriting = true; // Identify duplicate block generated by speculative tasks. We respond success to // the client in cases of duplicate even though no data is written. if (isDuplicateBlock()) { @@ -681,6 +664,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { try { if (deferredBufs != null && !deferredBufs.isEmpty()) { abortIfNecessary(); + isWriting = true; writeDeferredBufs(); } } catch (IOException ioe) { @@ -738,14 +722,14 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { Map<Integer, AppShufflePartitionInfo> shufflePartitions = mergeManager.partitions.get(partitionInfo.appShuffleId); if (shufflePartitions != null && shufflePartitions.containsKey(partitionInfo.reduceId)) { - logger.debug("{} shuffleId {} reduceId {} set encountered failure", + logger.debug("{} shuffleId {} reduceId {} encountered failure", partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId, partitionInfo.reduceId); partitionInfo.setCurrentMapIndex(-1); - partitionInfo.setEncounteredFailure(true); } } } + isWriting = false; } @VisibleForTesting @@ -802,8 +786,6 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { public FileChannel dataChannel; // Location offset of the last successfully merged block for this shuffle partition private long dataFilePos; - // Indicating whether failure was encountered when merging the previous block - private boolean encounteredFailure; // Track the map index whose block is being merged for this shuffle partition private int currentMapIndex; // Bitmap tracking which mapper's blocks have been merged for this shuffle partition @@ -836,7 +818,6 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { // Writing 0 offset so that we can reuse ShuffleIndexInformation.getIndex() updateChunkInfo(0L, -1); this.dataFilePos = 0; - this.encounteredFailure = false; this.mapTracker = new RoaringBitmap(); this.chunkTracker = new RoaringBitmap(); } @@ -851,14 +832,6 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { this.dataFilePos = dataFilePos; } - boolean isEncounteredFailure() { - return encounteredFailure; - } - - void setEncounteredFailure(boolean encounteredFailure) { - this.encounteredFailure = encounteredFailure; - } - int getCurrentMapIndex() { return currentMapIndex; } diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java index 8c6f743..565d433 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java @@ -28,6 +28,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadLocalRandom; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; @@ -292,18 +293,32 @@ public class RemoteBlockPushResolverSuite { @Test public void testIncompleteStreamsAreOverwritten() throws IOException { registerExecutor(TEST_APP, prepareLocalDirs(localDirs)); + byte[] expectedBytes = new byte[4]; + ThreadLocalRandom.current().nextBytes(expectedBytes); + StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); - stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[4])); + byte[] data = new byte[10]; + ThreadLocalRandom.current().nextBytes(data); + stream1.onData(stream1.getID(), ByteBuffer.wrap(data)); // There is a failure stream1.onFailure(stream1.getID(), new RuntimeException("forced error")); StreamCallbackWithID stream2 = pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0)); - stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[5])); + ByteBuffer nextBuf= ByteBuffer.wrap(expectedBytes, 0, 2); + stream2.onData(stream2.getID(), nextBuf); stream2.onComplete(stream2.getID()); + StreamCallbackWithID stream3 = + pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 2, 0, 0)); + nextBuf = ByteBuffer.wrap(expectedBytes, 2, 2); + stream3.onData(stream3.getID(), nextBuf); + stream3.onComplete(stream3.getID()); pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); - validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{5}, new int[][]{{1}}); + validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4}, new int[][]{{1, 2}}); + FileSegmentManagedBuffer mb = + (FileSegmentManagedBuffer) pushResolver.getMergedBlockData(TEST_APP, 0, 0, 0); + assertArrayEquals(expectedBytes, mb.nioByteBuffer().array()); } @Test (expected = RuntimeException.class) @@ -740,6 +755,72 @@ public class RemoteBlockPushResolverSuite { validateChunks(TEST_APP, 0, 1, meta, new int[]{5, 3}, new int[][]{{0},{1}}); } + @Test + public void testOnFailureInvokedMoreThanOncePerBlock() throws IOException { + StreamCallbackWithID stream1 = + pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); + stream1.onFailure(stream1.getID(), new RuntimeException("forced error")); + StreamCallbackWithID stream2 = + pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[5])); + // On failure on stream1 gets invoked again and should cause no interference + stream1.onFailure(stream1.getID(), new RuntimeException("2nd forced error")); + StreamCallbackWithID stream3 = + pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 3, 0, 0)); + // This should be deferred as stream 2 is still the active stream + stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[2])); + // Stream 2 writes more and completes + stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[4])); + stream2.onComplete(stream2.getID()); + stream3.onComplete(stream3.getID()); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); + validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {9, 2}, new int[][] {{1},{3}}); + removeApplication(TEST_APP); + } + + @Test (expected = RuntimeException.class) + public void testFailureAfterDuplicateBlockDoesNotInterfereActiveStream() throws IOException { + StreamCallbackWithID stream1 = + pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + StreamCallbackWithID stream1Duplicate = + pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); + stream1.onComplete(stream1.getID()); + stream1Duplicate.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); + + StreamCallbackWithID stream2 = + pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[5])); + // Should not change the current map id of the reduce partition + stream1Duplicate.onFailure(stream2.getID(), new RuntimeException("forced error")); + + StreamCallbackWithID stream3 = + pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 2, 0, 0)); + // This should be deferred as stream 2 is still the active stream + stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[2])); + RuntimeException failedEx = null; + try { + stream3.onComplete(stream3.getID()); + } catch (RuntimeException re) { + assertEquals( + "Couldn't find an opportunity to write block shufflePush_0_2_0 to merged shuffle", + re.getMessage()); + failedEx = re; + } + // Stream 2 writes more and completes + stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[4])); + stream2.onComplete(stream2.getID()); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); + validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {11}, new int[][] {{0, 1}}); + removeApplication(TEST_APP); + if (failedEx != null) { + throw failedEx; + } + } + private void useTestFiles(boolean useTestIndexFile, boolean useTestMetaFile) throws IOException { pushResolver = new RemoteBlockPushResolver(conf) { @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org