XComp commented on code in PR #24246:
URL: https://github.com/apache/flink/pull/24246#discussion_r1474488421


##########
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java:
##########
@@ -367,9 +367,11 @@ public static void waitForCheckpoint(JobID jobID, 
MiniCluster miniCluster, int n
                 });
     }
 
-    /** Wait for on more completed checkpoint. */
-    public static void waitForOneMoreCheckpoint(JobID jobID, MiniCluster 
miniCluster)
-            throws Exception {
+    /**
+     * Wait for a new completed checkpoint. Note: we wait for 2 or more 
checkpoint to ensure the
+     * latest checkpoint start after waitForTwoOrMoreCheckpoint is called.
+     */
+    public static void waitForNewCheckpoint(JobID jobID, MiniCluster 
miniCluster) throws Exception {

Review Comment:
   ```suggestion
       public static void waitForNewCheckpoint(JobID jobID, MiniCluster 
miniCluster, int checkpointCount) throws Exception {
   ```
   Can't we make the number of checkpoints to wait for configurable? That way, 
we can pass in `2` in the test implementation analogously to 
`waitForCheckpoint`. I also feel like we can remove some redundant code within 
the two methods. :thinking: 



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java:
##########
@@ -328,7 +330,7 @@ public void 
testCheckpointRescalingNonPartitionedStateCausesException() throws E
             // wait until the operator handles some data
             StateSourceBase.workStartedLatch.await();
 
-            waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+            waitForNewCheckpoint(jobID, cluster.getMiniCluster());

Review Comment:
   So far, we've only seen the issue in `#testCheckpointRescalingInKeyedState`. 
We don't need the two checkpoints here, actually, because we're not relying on 
elements in the test. We could keep the tests functionality if we make the 
`waitForOneMoreCheckpoint` configurable as suggested one of my previous 
comments.



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java:
##########
@@ -261,7 +263,7 @@ public void testCheckpointRescalingKeyedState(boolean 
scaleOut) throws Exception
 
             waitForAllTaskRunning(cluster.getMiniCluster(), 
jobGraph.getJobID(), false);
 
-            waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+            waitForNewCheckpoint(jobID, cluster.getMiniCluster());

Review Comment:
   We should add a comment here explaining why we need to wait for 2 instead of 
one checkpoint.



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java:
##########
@@ -513,7 +515,7 @@ public void testCheckpointRescalingPartitionedOperatorState(
         // wait until the operator handles some data
         StateSourceBase.workStartedLatch.await();
 
-        waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+        waitForNewCheckpoint(jobID, cluster.getMiniCluster());

Review Comment:
   AFAIU, we don't need to change it here.



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java:
##########
@@ -411,7 +413,7 @@ public void 
testCheckpointRescalingWithKeyedAndNonPartitionedState() throws Exce
             // clear the CollectionSink set for the restarted job
             CollectionSink.clearElementsSet();
 
-            waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+            waitForNewCheckpoint(jobID, cluster.getMiniCluster());

Review Comment:
   I guess the scenario can happen in this test as well because it's almost the 
same test implementation as in `#testCheckpointRescalingKeyedState` :thinking: 



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java:
##########
@@ -411,7 +413,7 @@ public void 
testCheckpointRescalingWithKeyedAndNonPartitionedState() throws Exce
             // clear the CollectionSink set for the restarted job
             CollectionSink.clearElementsSet();
 
-            waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+            waitForNewCheckpoint(jobID, cluster.getMiniCluster());

Review Comment:
   I'm wondering whether the redundant code could be removed here. But that's 
probably a bit out-of-scope for this issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to