akalash commented on a change in pull request #18332:
URL: https://github.com/apache/flink/pull/18332#discussion_r783137704



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
##########
@@ -1175,6 +1185,31 @@ private void terminateMiniClusterServices() throws 
Exception {
         }
     }
 
+    /**
+     * Prevent multiple submission of the same JobGraph that has been mutated 
in between

Review comment:
       I agree that the contract looks strange. Why can not we indeed clone the 
JobGraph in submitJob?(it is better not to clone but use builder and so but as 
I understand it is not easy to do so now)

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
##########
@@ -431,36 +430,48 @@ public void invoke(Long value) {
                                 .build());
         cluster.before();
         try {
-            final JobID jobID1 = new JobID();
-            jobGraph.setJobID(jobID1);
-            cluster.getClusterClient().submitJob(jobGraph).get();
-            CommonTestUtils.waitForAllTaskRunning(cluster.getMiniCluster(), 
jobID1, false);
+            final JobID firstJobId = new JobID();
+            final JobGraph firstJobGraph = InstantiationUtil.clone(jobGraph);
+            firstJobGraph.setJobID(firstJobId);
+            cluster.getClusterClient().submitJob(firstJobGraph).get();
+            CommonTestUtils.waitForAllTaskRunning(cluster.getMiniCluster(), 
firstJobId, false);
             // wait for some records to be processed before taking the 
checkpoint
             counter.get().await();
-            final String firstCheckpoint = 
cluster.getMiniCluster().triggerCheckpoint(jobID1).get();
-
-            cluster.getClusterClient().cancel(jobID1).get();
-            jobGraph.setSavepointRestoreSettings(
+            final String firstCheckpoint =
+                    
cluster.getMiniCluster().triggerCheckpoint(firstJobId).get();
+            cluster.getClusterClient().cancel(firstJobId).get();
+            CommonTestUtils.waitForJobStatus(

Review comment:
       I don't fully understand why we need it here. if we clone JobGraph then 
everything works perfectly fine even without this waiting. It seems for me that 
the awaiting on `cancel's future` is enough to be sure that the next submit 
with the new JobGraph will be successful. Did I miss some rare race condition?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to