Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5589#discussion_r171228120
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 ---
    @@ -223,6 +223,78 @@
                }
        }
     
    +   /**
    +    * Retry the given operation with the given delay in between successful 
completions where the
    +    * result does not match a given predicate.
    +    *
    +    * @param operation to retry
    +    * @param retries number of retries
    +    * @param retryDelay delay between retries
    +    * @param retryPredicate Predicate to test whether the result is 
acceptable
    +    * @param scheduledExecutor executor to be used for the retry operation
    +    * @param <T> type of the result
    +    * @return Future which retries the given operation a given amount of 
times and delays the retry
    +    *   in case the predicate isn't matched
    +    */
    +   public static <T> CompletableFuture<T> retrySuccesfulWithDelay(
    +           final Supplier<CompletableFuture<T>> operation,
    +           final long retries,
    +           final Time retryDelay,
    +           final Predicate<T> retryPredicate,
    +           final ScheduledExecutor scheduledExecutor) {
    +
    +           final CompletableFuture<T> resultFuture = new 
CompletableFuture<>();
    +
    +           retrySuccessfulOperationWithDelay(
    +                   resultFuture,
    +                   operation,
    +                   retries,
    +                   retryDelay,
    +                   retryPredicate,
    +                   scheduledExecutor);
    +
    +           return resultFuture;
    +   }
    +
    +   private static <T> void retrySuccessfulOperationWithDelay(
    +           final CompletableFuture<T> resultFuture,
    +           final Supplier<CompletableFuture<T>> operation,
    +           final long retries,
    +           final Time retryDelay,
    +           final Predicate<T> retryPredicate,
    +           final ScheduledExecutor scheduledExecutor) {
    +
    +           if (!resultFuture.isDone()) {
    +                   final CompletableFuture<T> operationResultFuture = 
operation.get();
    +
    +                   operationResultFuture.whenComplete(
    +                           (t, throwable) -> {
    +                                   if (throwable != null) {
    +                                           if (throwable instanceof 
CancellationException) {
    +                                                   
resultFuture.completeExceptionally(new RetryException("Operation future was 
cancelled.", throwable));
    +                                           } else {
    +                                                   
resultFuture.completeExceptionally(throwable);
    +                                           }
    +                                   } else {
    +                                           if (retries > 0 && 
!retryPredicate.test(t)) {
    +                                                   final 
ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
    +                                                           () -> 
retrySuccessfulOperationWithDelay(resultFuture, operation, retries - 1, 
retryDelay, retryPredicate, scheduledExecutor),
    +                                                           
retryDelay.toMilliseconds(),
    +                                                           
TimeUnit.MILLISECONDS);
    +
    +                                                   
resultFuture.whenComplete(
    +                                                           (innerT, 
innerThrowable) -> scheduledFuture.cancel(false));
    +                                           } else {
    +                                                   
resultFuture.complete(t);
    --- End diff --
    
    This seems rather unfortunate. In the code sample below I wait for a job to 
be RUNNING. I can only replace the loop, but not the final check which really 
limits the userfulness:
    ```
    JobStatus jobStatus = 
client.getJobStatus(jobSubmissionResult.getJobID()).get(submissionDeadLine.timeLeft().toMillis(),
 TimeUnit.MILLISECONDS);
    while (jobStatus != JobStatus.RUNNING && submissionDeadLine.hasTimeLeft()) {
        Thread.sleep(50);
        jobStatus = 
client.getJobStatus(jobSubmissionResult.getJobID()).get(submissionDeadLine.timeLeft().toMillis(),
 TimeUnit.MILLISECONDS);
    }
    
    if (jobStatus != JobStatus.RUNNING) {
        Assert.fail("Job not in state RUNNING.");
    }
    
    ================== Comparison ===============================
    
    CompletableFuture<JobStatus> jobStatusFuture = 
FutureUtils.retrySuccesfulWithDelay(
        () -> client.getJobStatus(jobSubmissionResult.getJobID()),
        submissionDeadLine.timeLeft().toMillis() / 50, // crappy retry count 
calculation
        Time.milliseconds(50),
        status -> status == JobStatus.RUNNING,
        TestingUtils.defaultScheduledExecutor()
    );
    
    if (jobStatusFuture.get(submissionDeadLine.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS) != JobStatus.RUNNING) {
        Assert.fail("Job not in state RUNNING.");
    }
    ```
    
    In it's current form this doesn't really provide value imo. (yes it's 
asynchronous, but in which tests do really even we need that?)


---

Reply via email to