This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 85d41757e85 [SPARK-44215][SHUFFLE] If num chunks are 0, then server should throw a RuntimeException 85d41757e85 is described below commit 85d41757e855d97dee0f24f281f82249161c3d29 Author: Chandni Singh <singh.chan...@gmail.com> AuthorDate: Tue Jul 4 18:46:18 2023 -0500 [SPARK-44215][SHUFFLE] If num chunks are 0, then server should throw a RuntimeException ### What changes were proposed in this pull request? The executor expects `numChunks` to be > 0. If it is zero, then we see that the executor fails with ``` 23/06/20 19:07:37 ERROR task 2031.0 in stage 47.0 (TID 25018) Executor: Exception in task 2031.0 in stage 47.0 (TID 25018) java.lang.ArithmeticException: / by zero at org.apache.spark.storage.PushBasedFetchHelper.createChunkBlockInfosFromMetaResponse(PushBasedFetchHelper.scala:128) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:1047) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:90) at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) ``` Because this is an `ArithmeticException`, the executor doesn't fallback. It's not a `FetchFailure` either, so the stage is not retried and the application fails. ### Why are the changes needed? The executor should fallback to fetch original blocks and not fail because this suggests that there is an issue with push-merged block. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Modified the existing UTs to validate that RuntimeException is thrown when numChunks are 0. Closes #41762 from otterc/SPARK-44215. Authored-by: Chandni Singh <singh.chan...@gmail.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit 3e72806bb421b103bf6e73518b80200ccdd58ce5) Signed-off-by: Mridul Muralidharan <mridulatgmail.com> --- .../network/shuffle/RemoteBlockPushResolver.java | 4 ++++ .../shuffle/RemoteBlockPushResolverSuite.java | 24 ++++++++++++++-------- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index df2d1fa12d1..04935cfd932 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -328,6 +328,10 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { int size = (int) indexFile.length(); // First entry is the zero offset int numChunks = (size / Long.BYTES) - 1; + if (numChunks <= 0) { + throw new RuntimeException(String.format( + "Merged shuffle index file %s is empty", indexFile.getPath())); + } File metaFile = appShuffleInfo.getMergedShuffleMetaFile(shuffleId, shuffleMergeId, reduceId); if (!metaFile.exists()) { throw new RuntimeException(String.format("Merged shuffle meta file %s not found", diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java index 2526a94f429..0847121b0cc 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java @@ -281,7 +281,7 @@ public class RemoteBlockPushResolverSuite { verifyMetrics(4, 0, 0, 0, 0, 0, 4); } - @Test + @Test(expected = RuntimeException.class) public void testFailureAfterData() throws IOException { StreamCallbackWithID stream = pushResolver.receiveBlockDataAsStream( @@ -289,12 +289,16 @@ public class RemoteBlockPushResolverSuite { stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4])); stream.onFailure(stream.getID(), new RuntimeException("Forced Failure")); pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0)); - MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0); - assertEquals("num-chunks", 0, blockMeta.getNumChunks()); - verifyMetrics(4, 0, 0, 0, 0, 0, 4); + try { + pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0); + } catch (RuntimeException e) { + assertTrue(e.getMessage().contains("is empty")); + verifyMetrics(4, 0, 0, 0, 0, 0, 4); + throw e; + } } - @Test + @Test(expected = RuntimeException.class) public void testFailureAfterMultipleDataBlocks() throws IOException { StreamCallbackWithID stream = pushResolver.receiveBlockDataAsStream( @@ -304,9 +308,13 @@ public class RemoteBlockPushResolverSuite { stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4])); stream.onFailure(stream.getID(), new RuntimeException("Forced Failure")); pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0)); - MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0); - assertEquals("num-chunks", 0, blockMeta.getNumChunks()); - verifyMetrics(9, 0, 0, 0, 0, 0, 9); + try { + pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0); + } catch (RuntimeException e) { + assertTrue(e.getMessage().contains("is empty")); + verifyMetrics(9, 0, 0, 0, 0, 0, 9); + throw e; + } } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org