morhidi commented on code in PR #237: URL: https://github.com/apache/flink-kubernetes-operator/pull/237#discussion_r881419989
########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java: ########## @@ -618,32 +623,51 @@ public SavepointFetchResult fetchSavepointInfo( savepointStatusMessageParameters.jobIdPathParameter.resolve(JobID.fromHexString(jobId)); savepointStatusMessageParameters.triggerIdPathParameter.resolve( TriggerId.fromHexString(triggerId)); - CompletableFuture<AsynchronousOperationResult<SavepointInfo>> response = - clusterClient.sendRequest( - savepointStatusHeaders, - savepointStatusMessageParameters, - EmptyRequestBody.getInstance()); - - if (response.get() == null || response.get().resource() == null) { - return SavepointFetchResult.pending(); - } - - if (response.get().resource().getLocation() == null) { - if (response.get().resource().getFailureCause() != null) { - LOG.error( - "Failure occurred while fetching the savepoint result", - response.get().resource().getFailureCause()); - return SavepointFetchResult.error( - response.get().resource().getFailureCause().toString()); - } else { + while (true) { + CompletableFuture<AsynchronousOperationResult<SavepointInfo>> response = + clusterClient.sendRequest( + savepointStatusHeaders, + savepointStatusMessageParameters, + EmptyRequestBody.getInstance()); + + if (response.get() == null || response.get().resource() == null) { return SavepointFetchResult.pending(); } + + if (response.get().resource().getLocation() == null) { + // If the failure cause contains 'Not all required tasks are currently + // running.', then continue until savepoint is successfully fetched. + if (response.get().resource().getFailureCause() != null) { Review Comment: Once we get back the error `Aborting checkpoint. Failure reason: Not all required tasks are currently running.`, the checkpoint will never succeed. ########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java: ########## @@ -618,32 +623,51 @@ public SavepointFetchResult fetchSavepointInfo( savepointStatusMessageParameters.jobIdPathParameter.resolve(JobID.fromHexString(jobId)); savepointStatusMessageParameters.triggerIdPathParameter.resolve( TriggerId.fromHexString(triggerId)); - CompletableFuture<AsynchronousOperationResult<SavepointInfo>> response = - clusterClient.sendRequest( - savepointStatusHeaders, - savepointStatusMessageParameters, - EmptyRequestBody.getInstance()); - - if (response.get() == null || response.get().resource() == null) { - return SavepointFetchResult.pending(); - } - - if (response.get().resource().getLocation() == null) { - if (response.get().resource().getFailureCause() != null) { - LOG.error( - "Failure occurred while fetching the savepoint result", - response.get().resource().getFailureCause()); - return SavepointFetchResult.error( - response.get().resource().getFailureCause().toString()); - } else { + while (true) { Review Comment: We cannot block the execution here, better to wait another reconcile loop -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org