Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161242267 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -185,37 +205,19 @@ private void submitJob(JobGraph jobGraph) throws JobSubmissionException { } } - private JobResult waitForJobExecutionResult( - final JobID jobId) throws ProgramInvocationException { - - final JobMessageParameters messageParameters = new JobMessageParameters(); - messageParameters.jobPathParameter.resolve(jobId); - JobExecutionResultResponseBody jobExecutionResultResponseBody; - try { - long attempt = 0; - do { - final CompletableFuture<JobExecutionResultResponseBody> responseFuture = - restClient.sendRequest( - restClusterClientConfiguration.getRestServerAddress(), - restClusterClientConfiguration.getRestServerPort(), - JobExecutionResultHeaders.getInstance(), - messageParameters); - jobExecutionResultResponseBody = responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); - Thread.sleep(waitStrategy.sleepTime(attempt)); - attempt++; - } - while (jobExecutionResultResponseBody.getStatus().getStatusId() != QueueStatus.StatusId.COMPLETED); - } catch (IOException | TimeoutException | ExecutionException e) { - throw new ProgramInvocationException(e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new ProgramInvocationException(e); + private <R, T extends AsynchronouslyCreatedResource<R>> R waitForResource( + final SupplierWithException<CompletableFuture<T>, IOException> resourceFutureSupplier) + throws IOException, InterruptedException, ExecutionException, TimeoutException { + T asynchronouslyCreatedResource; + long attempt = 0; + do { + final CompletableFuture<T> responseFuture = resourceFutureSupplier.get(); + asynchronouslyCreatedResource = responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + Thread.sleep(waitStrategy.sleepTime(attempt)); --- End diff -- fixed
---