XComp commented on code in PR #24246:
URL: https://github.com/apache/flink/pull/24246#discussion_r1478141469


##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java:
##########
@@ -411,7 +413,7 @@ public void 
testCheckpointRescalingWithKeyedAndNonPartitionedState() throws Exce
             // clear the CollectionSink set for the restarted job
             CollectionSink.clearElementsSet();
 
-            waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+            waitForNewCheckpoint(jobID, cluster.getMiniCluster());

Review Comment:
   I created a [PR](https://github.com/1996fanrui/flink/pull/9) to show what I 
had in mind for the code redundancy reduction



##########
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java:
##########
@@ -367,9 +367,11 @@ public static void waitForCheckpoint(JobID jobID, 
MiniCluster miniCluster, int n
                 });
     }
 
-    /** Wait for on more completed checkpoint. */
-    public static void waitForOneMoreCheckpoint(JobID jobID, MiniCluster 
miniCluster)
-            throws Exception {
+    /**
+     * Wait for a new completed checkpoint. Note: we wait for 2 or more 
checkpoint to ensure the
+     * latest checkpoint start after waitForTwoOrMoreCheckpoint is called.
+     */
+    public static void waitForNewCheckpoint(JobID jobID, MiniCluster 
miniCluster) throws Exception {

Review Comment:
   Good idea. Checking the trigger time is a better solution. I like that. :+1: 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java:
##########
@@ -367,9 +367,11 @@ public static void waitForCheckpoint(JobID jobID, 
MiniCluster miniCluster, int n
                 });
     }
 
-    /** Wait for on more completed checkpoint. */
-    public static void waitForOneMoreCheckpoint(JobID jobID, MiniCluster 
miniCluster)
-            throws Exception {
+    /**
+     * Wait for a new completed checkpoint. Note: we wait for 2 or more 
checkpoint to ensure the
+     * latest checkpoint start after waitForTwoOrMoreCheckpoint is called.
+     */
+    public static void waitForNewCheckpoint(JobID jobID, MiniCluster 
miniCluster) throws Exception {

Review Comment:
   About the redundant code:
   ```java
       /** Wait for (at least) the given number of successful checkpoints. */
       public static void waitForCheckpoint(JobID jobID, MiniCluster 
miniCluster, int numCheckpoints)
               throws Exception {
           waitForCheckpoints(
                   jobID,
                   miniCluster,
                   checkpointStatsSnapshot ->
                           checkpointStatsSnapshot != null
                                   && checkpointStatsSnapshot
                                                   .getCounts()
                                                   
.getNumberOfCompletedCheckpoints()
                                           >= numCheckpoints);
       }
   
       /**
        * Wait for a new completed checkpoint, the new checkpoint must be 
triggered after
        * waitForNewCheckpoint is called.
        */
       public static void waitForNewCheckpoint(JobID jobID, MiniCluster 
miniCluster) throws Exception {
           final long startTime = System.currentTimeMillis();
           waitForCheckpoints(
                   jobID,
                   miniCluster,
                   checkpointStatsSnapshot -> {
                       if (checkpointStatsSnapshot != null) {
                           final CompletedCheckpointStats 
latestCompletedCheckpoint =
                                   
checkpointStatsSnapshot.getHistory().getLatestCompletedCheckpoint();
                           return latestCompletedCheckpoint != null
                                   && 
latestCompletedCheckpoint.getTriggerTimestamp() > startTime;
                       }
   
                       return false;
                   });
       }
   
       private static void waitForCheckpoints(
               JobID jobId, MiniCluster miniCluster, 
Predicate<CheckpointStatsSnapshot> condition)
               throws Exception {
           waitUntilCondition(
                   () -> {
                       final AccessExecutionGraph graph = 
miniCluster.getExecutionGraph(jobId).get();
                       final CheckpointStatsSnapshot snapshot = 
graph.getCheckpointStatsSnapshot();
                       if (condition.test(snapshot)) {
                           return true;
                       } else if (graph.getState().isGloballyTerminalState()) {
                           checkState(
                                   graph.getFailureInfo() != null,
                                   "Job terminated (state=%s) before completing 
the requested checkpoint(s).",
                                   graph.getState());
                           throw graph.getFailureInfo().getException();
                       }
                       return false;
                   });
       }
   ```
   
   ...just to clarify what I meant. Feel free to ignore that one.



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java:
##########
@@ -261,7 +263,13 @@ public void testCheckpointRescalingKeyedState(boolean 
scaleOut) throws Exception
 
             waitForAllTaskRunning(cluster.getMiniCluster(), 
jobGraph.getJobID(), false);
 
-            waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+            // We must wait for a checkpoint that is triggered after calling 
waitForNewCheckpoint.
+            // This test will fail if the job recovers from a checkpoint 
triggered before
+            // `SubtaskIndexFlatMapper.workCompletedLatch.await` and after 
calling
+            // `waitForNewCheckpoint`. Because `SubtaskIndexFlatMapper` expects
+            // `ValueState<Integer> counter` and `ValueState<Integer> sum` 
after recovery from
+            // the checkpoint to be the count and sum of all data.

Review Comment:
   ```suggestion
               // We need to wait for a checkpoint to be completed that was 
triggered after all the data was processed. That ensures the entire data being 
flushed out of the Operator's network buffers to avoid reprocessing test data 
twice after the restore (see FLINK-34200).
   ```
   Just as a proposal to keep it shorter. Refering back to the jira issue for 
more context should be good enough. Additionally, adding markdown features 
might not add much value in JavaDoc. If you want to go for that you might want 
to use `{@code }` syntax.



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java:
##########
@@ -328,7 +330,7 @@ public void 
testCheckpointRescalingNonPartitionedStateCausesException() throws E
             // wait until the operator handles some data
             StateSourceBase.workStartedLatch.await();
 
-            waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+            waitForNewCheckpoint(jobID, cluster.getMiniCluster());

Review Comment:
   Sounds good :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to