masteryhx commented on code in PR #20404:
URL: https://github.com/apache/flink/pull/20404#discussion_r935086496


##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -91,39 +94,50 @@ public void testSwitchFromDisablingToEnablingInClaimMode() 
throws Exception {
         MiniCluster miniCluster = cluster.getMiniCluster();
         StreamExecutionEnvironment env1 =
                 getEnv(delegatedStateBackend, firstCheckpointFolder, false, 
100, 600000);
-        JobGraph firstJobGraph = buildJobGraph(env1);
+        SharedReference<AtomicInteger> currentCheckpointNum =
+                sharedObjects.add(new AtomicInteger(0));
+        JobGraph firstJobGraph =
+                buildJobGraph(env1, TOTAL_ELEMENTS / 5, TOTAL_ELEMENTS / 4, 
currentCheckpointNum);
+
+        try {
+            miniCluster.submitJob(firstJobGraph).get();
+            miniCluster.requestJobResult(firstJobGraph.getJobID()).get();
+        } catch (Exception ex) {
+            Preconditions.checkState(
+                    ExceptionUtils.findThrowable(ex, 
ArtificialFailure.class).isPresent());
+        }
 
-        miniCluster.submitJob(firstJobGraph).get();
-        waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
-        miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
-        miniCluster.cancelJob(firstJobGraph.getJobID()).get();
         String firstRestorePath =
                 getLatestCompletedCheckpointPath(firstJobGraph.getJobID(), 
miniCluster).get();
 
-        // 1st restore, switch from disable to enable
+        // 1st restore, switch from disable to enable. Make 
materializationInterval = 600000, to
+        // avoid materialization.
         File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
         StreamExecutionEnvironment env2 =
                 getEnv(delegatedStateBackend, secondCheckpointFolder, true, 
100, 600000);
-        JobGraph secondJobGraph = buildJobGraph(env2);
+        JobGraph secondJobGraph =
+                buildJobGraph(env2, TOTAL_ELEMENTS / 3, TOTAL_ELEMENTS / 2, 
currentCheckpointNum);
         setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+        try {
+            miniCluster.submitJob(secondJobGraph).get();
+            miniCluster.requestJobResult(secondJobGraph.getJobID()).get();
+        } catch (Exception ex) {
+            Preconditions.checkState(
+                    ExceptionUtils.findThrowable(ex, 
ArtificialFailure.class).isPresent());
+        }
 
-        miniCluster.submitJob(secondJobGraph).get();
-        waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
-        miniCluster.triggerCheckpoint(secondJobGraph.getJobID()).get();
-        miniCluster.cancelJob(secondJobGraph.getJobID()).get();
         String secondRestorePath =
                 getLatestCompletedCheckpointPath(secondJobGraph.getJobID(), 
miniCluster).get();
 
         // 2nd restore, private state of first restore checkpoint still exist.
         File thirdCheckpointFolder = TEMPORARY_FOLDER.newFolder();
         StreamExecutionEnvironment env3 =
                 getEnv(delegatedStateBackend, thirdCheckpointFolder, true, 
100, 100);
-        JobGraph thirdJobGraph = buildJobGraph(env3);
+        JobGraph thirdJobGraph =
+                buildJobGraph(env3, TOTAL_ELEMENTS, Integer.MAX_VALUE, 
currentCheckpointNum);
         setSavepointRestoreSettings(thirdJobGraph, secondRestorePath);
         miniCluster.submitJob(thirdJobGraph).get();
-        waitForAllTaskRunning(miniCluster, thirdJobGraph.getJobID(), true);
-        miniCluster.triggerCheckpoint(thirdJobGraph.getJobID()).get();
-        miniCluster.cancelJob(thirdJobGraph.getJobID()).get();
+        miniCluster.requestJobResult(thirdJobGraph.getJobID()).get();

Review Comment:
   We have refactored the code consistent with other cases using same 
`JobGraph`,  I think we may also could use `waitAndAssert` to check the 
correctness ?



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -91,39 +94,50 @@ public void testSwitchFromDisablingToEnablingInClaimMode() 
throws Exception {
         MiniCluster miniCluster = cluster.getMiniCluster();
         StreamExecutionEnvironment env1 =
                 getEnv(delegatedStateBackend, firstCheckpointFolder, false, 
100, 600000);
-        JobGraph firstJobGraph = buildJobGraph(env1);
+        SharedReference<AtomicInteger> currentCheckpointNum =
+                sharedObjects.add(new AtomicInteger(0));
+        JobGraph firstJobGraph =
+                buildJobGraph(env1, TOTAL_ELEMENTS / 5, TOTAL_ELEMENTS / 4, 
currentCheckpointNum);
+
+        try {
+            miniCluster.submitJob(firstJobGraph).get();
+            miniCluster.requestJobResult(firstJobGraph.getJobID()).get();
+        } catch (Exception ex) {
+            Preconditions.checkState(
+                    ExceptionUtils.findThrowable(ex, 
ArtificialFailure.class).isPresent());
+        }
 
-        miniCluster.submitJob(firstJobGraph).get();
-        waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
-        miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
-        miniCluster.cancelJob(firstJobGraph.getJobID()).get();
         String firstRestorePath =
                 getLatestCompletedCheckpointPath(firstJobGraph.getJobID(), 
miniCluster).get();
 
-        // 1st restore, switch from disable to enable
+        // 1st restore, switch from disable to enable. Make 
materializationInterval = 600000, to

Review Comment:
   Actually, We could set materializationInterval to a negative value to avoid 
materialization currently.
   See the description of 
`StateChangelogOptions#PERIODIC_MATERIALIZATION_INTERVAL`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to