This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 5d6b69fbc46 [SPARK-41792][SHUFFLE] Fix DB update for push based shuffle when newer shuffle merge is received 5d6b69fbc46 is described below commit 5d6b69fbc46ea52852dbe86f144ab87495af64a8 Author: Mridul Muralidharan <mridulatgmail.com> AuthorDate: Sun Jan 1 13:41:35 2023 -0600 [SPARK-41792][SHUFFLE] Fix DB update for push based shuffle when newer shuffle merge is received ### What changes were proposed in this pull request? Incorrect merge id is removed from the DB when a newer shuffle merge id is received. ### Why are the changes needed? Bug fix ### Does this PR introduce _any_ user-facing change? No, fixes a corner case bug ### How was this patch tested? Unit test updated Closes #39316 from mridulm/SPARK-41792. Authored-by: Mridul Muralidharan <mridulatgmail.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> --- .../network/shuffle/RemoteBlockPushResolver.java | 18 ++++++++++-------- .../network/shuffle/RemoteBlockPushResolverSuite.java | 19 +++++++++++++++++++ .../spark/network/yarn/YarnShuffleServiceSuite.scala | 2 +- 3 files changed, 30 insertions(+), 9 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 816d1082850..c3a2e9a883a 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -227,15 +227,15 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { // Higher shuffleMergeId seen for the shuffle ID meaning new stage attempt is being // run for the shuffle ID. Close and clean up old shuffleMergeId files, // happens in the indeterminate stage retries - AppAttemptShuffleMergeId appAttemptShuffleMergeId = - new AppAttemptShuffleMergeId( - appShuffleInfo.appId, appShuffleInfo.attemptId, shuffleId, shuffleMergeId); + AppAttemptShuffleMergeId currrentAppAttemptShuffleMergeId = + new AppAttemptShuffleMergeId(appShuffleInfo.appId, appShuffleInfo.attemptId, + shuffleId, latestShuffleMergeId); logger.info("{}: creating a new shuffle merge metadata since received " + - "shuffleMergeId is higher than latest shuffleMergeId {}", - appAttemptShuffleMergeId, latestShuffleMergeId); + "shuffleMergeId {} is higher than latest shuffleMergeId {}", + currrentAppAttemptShuffleMergeId, shuffleMergeId, latestShuffleMergeId); submitCleanupTask(() -> - closeAndDeleteOutdatedPartitions( - appAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions)); + closeAndDeleteOutdatedPartitions(currrentAppAttemptShuffleMergeId, + mergePartitionsInfo.shuffleMergePartitions)); return new AppShuffleMergePartitionsInfo(shuffleMergeId, false); } else { // The request is for block with same shuffleMergeId as the latest shuffleMergeId @@ -653,9 +653,11 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) { // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId then return // empty MergeStatuses but cleanup the older shuffleMergeId files. + AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId = new AppAttemptShuffleMergeId( + msg.appId, msg.appAttemptId, msg.shuffleId, mergePartitionsInfo.shuffleMergeId); submitCleanupTask(() -> closeAndDeleteOutdatedPartitions( - appAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions)); + currentAppAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions)); } else { // This block covers: // 1. finalization of determinate stage diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java index eb2c1d9fa5c..6a595ee346d 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java @@ -28,7 +28,9 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; +import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; @@ -1146,11 +1148,14 @@ public class RemoteBlockPushResolverSuite { @Test public void testCleanupOlderShuffleMergeId() throws IOException, InterruptedException { Semaphore closed = new Semaphore(0); + List<RemoteBlockPushResolver.AppAttemptShuffleMergeId> removedIds = + new CopyOnWriteArrayList<>(); pushResolver = new RemoteBlockPushResolver(conf, null) { @Override void closeAndDeleteOutdatedPartitions( AppAttemptShuffleMergeId appAttemptShuffleMergeId, Map<Integer, AppShufflePartitionInfo> partitions) { + removedIds.add(appAttemptShuffleMergeId); super.closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId, partitions); closed.release(); } @@ -1167,6 +1172,10 @@ public class RemoteBlockPushResolverSuite { RemoteBlockPushResolver.AppShuffleInfo appShuffleInfo = pushResolver.validateAndGetAppShuffleInfo(testApp); closed.acquire(); + assertEquals(1, removedIds.size()); + // For the previous merge id + assertEquals(1, removedIds.iterator().next().shuffleMergeId); + removedIds.clear(); assertFalse("Data files on the disk should be cleaned up", appShuffleInfo.getMergedShuffleDataFile(0, 1, 0).exists()); assertFalse("Meta files on the disk should be cleaned up", @@ -1186,6 +1195,9 @@ public class RemoteBlockPushResolverSuite { pushResolver.receiveBlockDataAsStream( new PushBlockStream(testApp, NO_ATTEMPT_ID, 0, 3, 0, 0, 0)); closed.acquire(); + assertEquals(1, removedIds.size()); + assertEquals(2, removedIds.iterator().next().shuffleMergeId); + removedIds.clear(); stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[2])); stream3.onComplete(stream3.getID()); pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 0, 3)); @@ -1196,6 +1208,9 @@ public class RemoteBlockPushResolverSuite { pushResolver.receiveBlockDataAsStream( new PushBlockStream(testApp, NO_ATTEMPT_ID, 0, 4, 0, 0, 0)); closed.acquire(); + assertEquals(1, removedIds.size()); + assertEquals(3, removedIds.iterator().next().shuffleMergeId); + removedIds.clear(); // Do not finalize shuffleMergeId 4 can happen during stage cancellation. stream4.onData(stream4.getID(), ByteBuffer.wrap(new byte[2])); stream4.onComplete(stream4.getID()); @@ -1204,6 +1219,10 @@ public class RemoteBlockPushResolverSuite { // but no blocks pushed for that shuffleMergeId pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 0, 5)); closed.acquire(); + assertEquals(1, removedIds.size()); + // For the previous merge id - here the cleanup is from finalizeShuffleMerge + assertEquals(4, removedIds.iterator().next().shuffleMergeId); + removedIds.clear(); assertFalse("MergedBlock meta file for shuffle 0 and shuffleMergeId 4 should be cleaned" + " up", appShuffleInfo.getMergedShuffleMetaFile(0, 4, 0).exists()); assertFalse("MergedBlock index file for shuffle 0 and shuffleMergeId 4 should be cleaned" diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 16fa4205692..075a21c399e 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -956,7 +956,7 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { assert(ShuffleTestAccessor.getOutdatedAppPathInfoCountDuringDBReload( mergeManager2, mergeManager2DB) == 1) assert(ShuffleTestAccessor.getOutdatedFinalizedShuffleCountDuringDBReload( - mergeManager2, mergeManager2DB) == 2) + mergeManager2, mergeManager2DB) == 1) s2.stop() // Yarn Shuffle service comes back up without custom mergeManager --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org