mridulm commented on a change in pull request #33078: URL: https://github.com/apache/spark/pull/33078#discussion_r670657063
########## File path: common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java ########## @@ -821,20 +914,132 @@ public void testFailureAfterDuplicateBlockDoesNotInterfereActiveStream() throws } } + @Test(expected = IllegalArgumentException.class) + public void testBlockReceivedAfterNewAttemptRegistered() + throws IOException, InterruptedException { + Semaphore closed = new Semaphore(0); + pushResolver = new RemoteBlockPushResolver(conf) { + @Override + void closeAndDeletePartitionFilesIfNeeded( + AppShuffleInfo appShuffleInfo, + boolean cleanupLocalDirs) { + super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, cleanupLocalDirs); + closed.release(); + } + }; + String testApp = "updateLocalDirsTwiceWithTwoAttempts"; + Path[] attempt1LocalDirs = createLocalDirs(1); + registerExecutor(testApp, + prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_1), + MERGE_DIRECTORY_META_1); + ByteBuffer[] blocks = new ByteBuffer[]{ + ByteBuffer.wrap(new byte[4]), + ByteBuffer.wrap(new byte[5]) + }; + StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(testApp, 1, 0, 0, 0, 0)); + for (ByteBuffer block : blocks) { + stream1.onData(stream1.getID(), block); + } + stream1.onComplete(stream1.getID()); + RemoteBlockPushResolver.AppShuffleInfo appShuffleInfo = + pushResolver.validateAndGetAppShuffleInfo(testApp); + Map<Integer, Map<Integer, RemoteBlockPushResolver.AppShufflePartitionInfo>> partitions = + appShuffleInfo.getPartitions(); + for (Map<Integer, RemoteBlockPushResolver.AppShufflePartitionInfo> partitionMap : + partitions.values()) { + for (RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo : partitionMap.values()) { + assertTrue(partitionInfo.getDataChannel().isOpen()); + assertTrue(partitionInfo.getMetaFile().getChannel().isOpen()); + assertTrue(partitionInfo.getIndexFile().getChannel().isOpen()); + } + } + Path[] attempt2LocalDirs = createLocalDirs(2); + registerExecutor(testApp, + prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_2), + MERGE_DIRECTORY_META_2); + StreamCallbackWithID stream2 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(testApp, 2, 0, 1, 0, 0)); + for (ByteBuffer block : blocks) { + stream2.onData(stream2.getID(), block); + } + stream2.onComplete(stream2.getID()); + closed.acquire(); + // Check if all the file channels created for the first attempt are safely closed. + for (Map<Integer, RemoteBlockPushResolver.AppShufflePartitionInfo> partitionMap : + partitions.values()) { + for (RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo : partitionMap.values()) { + assertNull(partitionInfo.getDataChannel()); + assertNull(partitionInfo.getMetaFile()); + assertNull(partitionInfo.getIndexFile()); + } + } + try { + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(testApp, 1, 0, 1, 0, 0)); + } catch (IllegalArgumentException re) { + assertEquals( + "The attempt id 1 in this PushBlockStream message does not match " + + "with the current attempt id 2 stored in shuffle service for application " + + "updateLocalDirsTwiceWithTwoAttempts", re.getMessage()); + throw re; + } + } + + @Test(expected = NullPointerException.class) + public void testPushBlockStreamCallBackWhileNewAttemptRegistered() + throws IOException, InterruptedException { + Semaphore closed = new Semaphore(0); + pushResolver = new RemoteBlockPushResolver(conf) { + @Override + void closeAndDeletePartitionFilesIfNeeded( + AppShuffleInfo appShuffleInfo, + boolean cleanupLocalDirs) { + super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, cleanupLocalDirs); + closed.release(); + } + }; + String testApp = "testPushBlockStreamCallBackWhileNewAttemptRegisters"; + Path[] attempt1LocalDirs = createLocalDirs(1); + registerExecutor(testApp, + prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_1), + MERGE_DIRECTORY_META_1); + ByteBuffer[] blocks = new ByteBuffer[]{ + ByteBuffer.wrap(new byte[4]), + ByteBuffer.wrap(new byte[5]), + ByteBuffer.wrap(new byte[6]), + ByteBuffer.wrap(new byte[7]) + }; + StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(testApp, 1, 0, 0, 0, 0)); + // The onData callback should be called 4 times here before the onComplete callback. But a + // register executor message arrives in shuffle service after the 2nd onData callback. The 3rd + // onData callback should all throw NullPointerException as their channels are set to null. + stream1.onData(stream1.getID(), blocks[0]); + stream1.onData(stream1.getID(), blocks[1]); + Path[] attempt2LocalDirs = createLocalDirs(2); + registerExecutor(testApp, + prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_2), + MERGE_DIRECTORY_META_2); + closed.acquire(); + // Should throw NullPointerException here. Review comment: Ah ! Can pull that as a local variable in that method, check nonNull and then use local var within the method ? (note: pull out of `partitionInfo` only for the method, not across invocations). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org