XComp commented on code in PR #24246: URL: https://github.com/apache/flink/pull/24246#discussion_r1474488421
########## flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java: ########## @@ -367,9 +367,11 @@ public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster, int n }); } - /** Wait for on more completed checkpoint. */ - public static void waitForOneMoreCheckpoint(JobID jobID, MiniCluster miniCluster) - throws Exception { + /** + * Wait for a new completed checkpoint. Note: we wait for 2 or more checkpoint to ensure the + * latest checkpoint start after waitForTwoOrMoreCheckpoint is called. + */ + public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception { Review Comment: ```suggestion public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster, int checkpointCount) throws Exception { ``` Can't we make the number of checkpoints to wait for configurable? That way, we can pass in `2` in the test implementation analogously to `waitForCheckpoint`. I also feel like we can remove some redundant code within the two methods. :thinking: ########## flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java: ########## @@ -328,7 +330,7 @@ public void testCheckpointRescalingNonPartitionedStateCausesException() throws E // wait until the operator handles some data StateSourceBase.workStartedLatch.await(); - waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); + waitForNewCheckpoint(jobID, cluster.getMiniCluster()); Review Comment: So far, we've only seen the issue in `#testCheckpointRescalingInKeyedState`. We don't need the two checkpoints here, actually, because we're not relying on elements in the test. We could keep the tests functionality if we make the `waitForOneMoreCheckpoint` configurable as suggested one of my previous comments. ########## flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java: ########## @@ -261,7 +263,7 @@ public void testCheckpointRescalingKeyedState(boolean scaleOut) throws Exception waitForAllTaskRunning(cluster.getMiniCluster(), jobGraph.getJobID(), false); - waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); + waitForNewCheckpoint(jobID, cluster.getMiniCluster()); Review Comment: We should add a comment here explaining why we need to wait for 2 instead of one checkpoint. ########## flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java: ########## @@ -513,7 +515,7 @@ public void testCheckpointRescalingPartitionedOperatorState( // wait until the operator handles some data StateSourceBase.workStartedLatch.await(); - waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); + waitForNewCheckpoint(jobID, cluster.getMiniCluster()); Review Comment: AFAIU, we don't need to change it here. ########## flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java: ########## @@ -411,7 +413,7 @@ public void testCheckpointRescalingWithKeyedAndNonPartitionedState() throws Exce // clear the CollectionSink set for the restarted job CollectionSink.clearElementsSet(); - waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); + waitForNewCheckpoint(jobID, cluster.getMiniCluster()); Review Comment: I guess the scenario can happen in this test as well because it's almost the same test implementation as in `#testCheckpointRescalingKeyedState` :thinking: ########## flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java: ########## @@ -411,7 +413,7 @@ public void testCheckpointRescalingWithKeyedAndNonPartitionedState() throws Exce // clear the CollectionSink set for the restarted job CollectionSink.clearElementsSet(); - waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); + waitForNewCheckpoint(jobID, cluster.getMiniCluster()); Review Comment: I'm wondering whether the redundant code could be removed here. But that's probably a bit out-of-scope for this issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org