morhidi commented on code in PR #237:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/237#discussion_r881419989


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java:
##########
@@ -618,32 +623,51 @@ public SavepointFetchResult fetchSavepointInfo(
             
savepointStatusMessageParameters.jobIdPathParameter.resolve(JobID.fromHexString(jobId));
             savepointStatusMessageParameters.triggerIdPathParameter.resolve(
                     TriggerId.fromHexString(triggerId));
-            CompletableFuture<AsynchronousOperationResult<SavepointInfo>> 
response =
-                    clusterClient.sendRequest(
-                            savepointStatusHeaders,
-                            savepointStatusMessageParameters,
-                            EmptyRequestBody.getInstance());
-
-            if (response.get() == null || response.get().resource() == null) {
-                return SavepointFetchResult.pending();
-            }
-
-            if (response.get().resource().getLocation() == null) {
-                if (response.get().resource().getFailureCause() != null) {
-                    LOG.error(
-                            "Failure occurred while fetching the savepoint 
result",
-                            response.get().resource().getFailureCause());
-                    return SavepointFetchResult.error(
-                            
response.get().resource().getFailureCause().toString());
-                } else {
+            while (true) {
+                CompletableFuture<AsynchronousOperationResult<SavepointInfo>> 
response =
+                        clusterClient.sendRequest(
+                                savepointStatusHeaders,
+                                savepointStatusMessageParameters,
+                                EmptyRequestBody.getInstance());
+
+                if (response.get() == null || response.get().resource() == 
null) {
                     return SavepointFetchResult.pending();
                 }
+
+                if (response.get().resource().getLocation() == null) {
+                    // If the failure cause contains 'Not all required tasks 
are currently
+                    // running.', then continue until savepoint is 
successfully fetched.
+                    if (response.get().resource().getFailureCause() != null) {

Review Comment:
   Once we get back the error `Aborting checkpoint. Failure reason: Not all 
required tasks are currently running.`, the checkpoint will never succeed.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java:
##########
@@ -618,32 +623,51 @@ public SavepointFetchResult fetchSavepointInfo(
             
savepointStatusMessageParameters.jobIdPathParameter.resolve(JobID.fromHexString(jobId));
             savepointStatusMessageParameters.triggerIdPathParameter.resolve(
                     TriggerId.fromHexString(triggerId));
-            CompletableFuture<AsynchronousOperationResult<SavepointInfo>> 
response =
-                    clusterClient.sendRequest(
-                            savepointStatusHeaders,
-                            savepointStatusMessageParameters,
-                            EmptyRequestBody.getInstance());
-
-            if (response.get() == null || response.get().resource() == null) {
-                return SavepointFetchResult.pending();
-            }
-
-            if (response.get().resource().getLocation() == null) {
-                if (response.get().resource().getFailureCause() != null) {
-                    LOG.error(
-                            "Failure occurred while fetching the savepoint 
result",
-                            response.get().resource().getFailureCause());
-                    return SavepointFetchResult.error(
-                            
response.get().resource().getFailureCause().toString());
-                } else {
+            while (true) {

Review Comment:
   We cannot block the execution here, better to wait another reconcile loop



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to