XComp commented on code in PR #24246: URL: https://github.com/apache/flink/pull/24246#discussion_r1478141469
########## flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java: ########## @@ -411,7 +413,7 @@ public void testCheckpointRescalingWithKeyedAndNonPartitionedState() throws Exce // clear the CollectionSink set for the restarted job CollectionSink.clearElementsSet(); - waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); + waitForNewCheckpoint(jobID, cluster.getMiniCluster()); Review Comment: I created a [PR](https://github.com/1996fanrui/flink/pull/9) to show what I had in mind for the code redundancy reduction ########## flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java: ########## @@ -367,9 +367,11 @@ public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster, int n }); } - /** Wait for on more completed checkpoint. */ - public static void waitForOneMoreCheckpoint(JobID jobID, MiniCluster miniCluster) - throws Exception { + /** + * Wait for a new completed checkpoint. Note: we wait for 2 or more checkpoint to ensure the + * latest checkpoint start after waitForTwoOrMoreCheckpoint is called. + */ + public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception { Review Comment: Good idea. Checking the trigger time is a better solution. I like that. :+1: ########## flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java: ########## @@ -367,9 +367,11 @@ public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster, int n }); } - /** Wait for on more completed checkpoint. */ - public static void waitForOneMoreCheckpoint(JobID jobID, MiniCluster miniCluster) - throws Exception { + /** + * Wait for a new completed checkpoint. Note: we wait for 2 or more checkpoint to ensure the + * latest checkpoint start after waitForTwoOrMoreCheckpoint is called. + */ + public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception { Review Comment: About the redundant code: ```java /** Wait for (at least) the given number of successful checkpoints. */ public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster, int numCheckpoints) throws Exception { waitForCheckpoints( jobID, miniCluster, checkpointStatsSnapshot -> checkpointStatsSnapshot != null && checkpointStatsSnapshot .getCounts() .getNumberOfCompletedCheckpoints() >= numCheckpoints); } /** * Wait for a new completed checkpoint, the new checkpoint must be triggered after * waitForNewCheckpoint is called. */ public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception { final long startTime = System.currentTimeMillis(); waitForCheckpoints( jobID, miniCluster, checkpointStatsSnapshot -> { if (checkpointStatsSnapshot != null) { final CompletedCheckpointStats latestCompletedCheckpoint = checkpointStatsSnapshot.getHistory().getLatestCompletedCheckpoint(); return latestCompletedCheckpoint != null && latestCompletedCheckpoint.getTriggerTimestamp() > startTime; } return false; }); } private static void waitForCheckpoints( JobID jobId, MiniCluster miniCluster, Predicate<CheckpointStatsSnapshot> condition) throws Exception { waitUntilCondition( () -> { final AccessExecutionGraph graph = miniCluster.getExecutionGraph(jobId).get(); final CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot(); if (condition.test(snapshot)) { return true; } else if (graph.getState().isGloballyTerminalState()) { checkState( graph.getFailureInfo() != null, "Job terminated (state=%s) before completing the requested checkpoint(s).", graph.getState()); throw graph.getFailureInfo().getException(); } return false; }); } ``` ...just to clarify what I meant. Feel free to ignore that one. ########## flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java: ########## @@ -261,7 +263,13 @@ public void testCheckpointRescalingKeyedState(boolean scaleOut) throws Exception waitForAllTaskRunning(cluster.getMiniCluster(), jobGraph.getJobID(), false); - waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); + // We must wait for a checkpoint that is triggered after calling waitForNewCheckpoint. + // This test will fail if the job recovers from a checkpoint triggered before + // `SubtaskIndexFlatMapper.workCompletedLatch.await` and after calling + // `waitForNewCheckpoint`. Because `SubtaskIndexFlatMapper` expects + // `ValueState<Integer> counter` and `ValueState<Integer> sum` after recovery from + // the checkpoint to be the count and sum of all data. Review Comment: ```suggestion // We need to wait for a checkpoint to be completed that was triggered after all the data was processed. That ensures the entire data being flushed out of the Operator's network buffers to avoid reprocessing test data twice after the restore (see FLINK-34200). ``` Just as a proposal to keep it shorter. Refering back to the jira issue for more context should be good enough. Additionally, adding markdown features might not add much value in JavaDoc. If you want to go for that you might want to use `{@code }` syntax. ########## flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java: ########## @@ -328,7 +330,7 @@ public void testCheckpointRescalingNonPartitionedStateCausesException() throws E // wait until the operator handles some data StateSourceBase.workStartedLatch.await(); - waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); + waitForNewCheckpoint(jobID, cluster.getMiniCluster()); Review Comment: Sounds good :+1: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org