mridulm commented on a change in pull request #33078:
URL: https://github.com/apache/spark/pull/33078#discussion_r670212415



##########
File path: 
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
##########
@@ -821,20 +914,132 @@ public void 
testFailureAfterDuplicateBlockDoesNotInterfereActiveStream() throws
     }
   }
 
+  @Test(expected = IllegalArgumentException.class)
+  public void testBlockReceivedAfterNewAttemptRegistered()
+      throws IOException, InterruptedException {
+    Semaphore closed = new Semaphore(0);
+    pushResolver = new RemoteBlockPushResolver(conf) {
+      @Override
+      void closeAndDeletePartitionFilesIfNeeded(
+        AppShuffleInfo appShuffleInfo,
+        boolean cleanupLocalDirs) {
+        super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, 
cleanupLocalDirs);
+        closed.release();
+      }
+    };
+    String testApp = "updateLocalDirsTwiceWithTwoAttempts";
+    Path[] attempt1LocalDirs = createLocalDirs(1);
+    registerExecutor(testApp,
+      prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_1),
+      MERGE_DIRECTORY_META_1);
+    ByteBuffer[] blocks = new ByteBuffer[]{
+      ByteBuffer.wrap(new byte[4]),
+      ByteBuffer.wrap(new byte[5])
+    };
+    StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
+      new PushBlockStream(testApp, 1, 0, 0, 0, 0));
+    for (ByteBuffer block : blocks) {
+      stream1.onData(stream1.getID(), block);
+    }
+    stream1.onComplete(stream1.getID());
+    RemoteBlockPushResolver.AppShuffleInfo appShuffleInfo =
+      pushResolver.validateAndGetAppShuffleInfo(testApp);
+    Map<Integer, Map<Integer, 
RemoteBlockPushResolver.AppShufflePartitionInfo>> partitions =
+      appShuffleInfo.getPartitions();
+    for (Map<Integer, RemoteBlockPushResolver.AppShufflePartitionInfo> 
partitionMap :
+        partitions.values()) {
+      for (RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo : 
partitionMap.values()) {
+        assertTrue(partitionInfo.getDataChannel().isOpen());
+        assertTrue(partitionInfo.getMetaFile().getChannel().isOpen());
+        assertTrue(partitionInfo.getIndexFile().getChannel().isOpen());
+      }
+    }
+    Path[] attempt2LocalDirs = createLocalDirs(2);
+    registerExecutor(testApp,
+      prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_2),
+      MERGE_DIRECTORY_META_2);
+    StreamCallbackWithID stream2 = pushResolver.receiveBlockDataAsStream(
+      new PushBlockStream(testApp, 2, 0, 1, 0, 0));
+    for (ByteBuffer block : blocks) {
+      stream2.onData(stream2.getID(), block);
+    }
+    stream2.onComplete(stream2.getID());
+    closed.acquire();
+    // Check if all the file channels created for the first attempt are safely 
closed.
+    for (Map<Integer, RemoteBlockPushResolver.AppShufflePartitionInfo> 
partitionMap :
+        partitions.values()) {
+      for (RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo : 
partitionMap.values()) {
+        assertNull(partitionInfo.getDataChannel());
+        assertNull(partitionInfo.getMetaFile());
+        assertNull(partitionInfo.getIndexFile());
+      }
+    }
+    try {
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(testApp, 1, 0, 1, 0, 0));
+    } catch (IllegalArgumentException re) {
+      assertEquals(
+        "The attempt id 1 in this PushBlockStream message does not match " +
+          "with the current attempt id 2 stored in shuffle service for 
application " +
+          "updateLocalDirsTwiceWithTwoAttempts", re.getMessage());
+      throw re;
+    }
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testPushBlockStreamCallBackWhileNewAttemptRegistered()
+    throws IOException, InterruptedException {
+    Semaphore closed = new Semaphore(0);
+    pushResolver = new RemoteBlockPushResolver(conf) {
+      @Override
+      void closeAndDeletePartitionFilesIfNeeded(
+        AppShuffleInfo appShuffleInfo,
+        boolean cleanupLocalDirs) {
+        super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, 
cleanupLocalDirs);
+        closed.release();
+      }
+    };
+    String testApp = "testPushBlockStreamCallBackWhileNewAttemptRegisters";
+    Path[] attempt1LocalDirs = createLocalDirs(1);
+    registerExecutor(testApp,
+      prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_1),
+      MERGE_DIRECTORY_META_1);
+    ByteBuffer[] blocks = new ByteBuffer[]{
+      ByteBuffer.wrap(new byte[4]),
+      ByteBuffer.wrap(new byte[5]),
+      ByteBuffer.wrap(new byte[6]),
+      ByteBuffer.wrap(new byte[7])
+    };
+    StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
+      new PushBlockStream(testApp, 1, 0, 0, 0, 0));
+    // The onData callback should be called 4 times here before the onComplete 
callback. But a
+    // register executor message arrives in shuffle service after the 2nd 
onData callback. The 3rd
+    // onData callback should all throw NullPointerException as their channels 
are set to null.
+    stream1.onData(stream1.getID(), blocks[0]);
+    stream1.onData(stream1.getID(), blocks[1]);
+    Path[] attempt2LocalDirs = createLocalDirs(2);
+    registerExecutor(testApp,
+      prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_2),
+      MERGE_DIRECTORY_META_2);
+    closed.acquire();
+    // Should throw NullPointerException here.

Review comment:
       The NPE is coming from precondition validation failing - not actual NPE 
due to dereferencing a null, right ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to