rmetzger commented on a change in pull request #13217:
URL: https://github.com/apache/flink/pull/13217#discussion_r480067151



##########
File path: flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
##########
@@ -111,4 +122,48 @@ public static void executeProgram(
                        
Thread.currentThread().setContextClassLoader(contextClassLoader);
                }
        }
+
+       /**
+        * This method blocks until the job status is not INITIALIZING anymore.
+        * If the job is FAILED, it throws an CompletionException with the 
failure cause.
+        * @param jobStatusSupplier supplier returning the job status.
+        */
+       public static void waitUntilJobInitializationFinished(
+                               JobID jobID,
+                               SupplierWithException<JobStatus, Exception> 
jobStatusSupplier,
+                               SupplierWithException<JobResult, Exception> 
jobResultSupplier,
+                               ClassLoader userCodeClassloader)
+                       throws JobInitializationException {
+               LOG.debug("Wait until job initialization is finished");
+               WaitStrategy waitStrategy = new ExponentialWaitStrategy(50, 
2000);
+               try {
+                       JobStatus status = jobStatusSupplier.get();
+                       long attempt = 0;
+                       while (status == JobStatus.INITIALIZING) {
+                               Thread.sleep(waitStrategy.sleepTime(attempt++));
+                               status = jobStatusSupplier.get();
+                       }
+                       if (status == JobStatus.FAILED) {
+                               JobResult result = jobResultSupplier.get();
+                               Optional<SerializedThrowable> throwable = 
result.getSerializedThrowable();
+                               if (throwable.isPresent()) {
+                                       Throwable t = 
throwable.get().deserializeError(userCodeClassloader);
+                                       t = 
ExceptionUtils.stripCompletionException(t);
+                                       if (t instanceof 
JobInitializationException) {
+                                               throw t;
+                                       }
+                               }
+                       }
+               } catch (JobInitializationException initializationException) {
+                       throw initializationException;
+               } catch (Throwable throwable) {
+                       ExceptionUtils.checkInterrupted(throwable);
+                       throw new JobInitializationException(jobID, "Error 
while waiting for job to be initialized", throwable);
+               }
+       }
+
+       public static <T> void waitUntilJobInitializationFinished(JobID id, 
ClusterClient<T> client, ClassLoader userCodeClassloader) throws
+               JobInitializationException {
+               waitUntilJobInitializationFinished(id, () -> 
client.getJobStatus(id).get(), () -> client.requestJobResult(id).get(), 
userCodeClassloader);

Review comment:
       As far as I understand, the per job clusters are submitting jobs through 
`AbstractJobClusterExecutor.execute()`. They never use the 
`waitUntilJobInitializationFinished` utility on that codepath.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to