Myasuka commented on a change in pull request #18931:
URL: https://github.com/apache/flink/pull/18931#discussion_r817347164



##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java
##########
@@ -102,35 +129,48 @@ public void close() throws Exception {
          */
         private final AtomicBoolean attemptCompleted = new 
AtomicBoolean(false);

Review comment:
       The relationship of `actionCompleted` with `attemptCompleted` is really 
important within each `RetriableTask`. I think we can improve the readability 
via follow steps:
   
   1. Only kept one private constructor and drop the previous public 
constructor, let the `attemptCompleted` to be initialized in the constructor:
   ~~~java
           private RetriableTask(
                   int current,
                   AtomicBoolean actionCompleted,
                   RetriableAction runnable,
                   RetryPolicy retryPolicy,
                   ScheduledExecutorService blockingExecutor,
                   Histogram attemptsPerTaskHistogram,  // I think we place 
this parameter in the last place
                   ScheduledExecutorService scheduler,
                   Consumer<Throwable> failureCallback,
                   AtomicInteger activeAttempts) {
               this.current = current;
               this.runnable = runnable;
               this.failureCallback = failureCallback;
               this.retryPolicy = retryPolicy;
               this.blockingExecutor = blockingExecutor;
               this.actionCompleted = actionCompleted;
               this.attemptsPerTaskHistogram = attemptsPerTaskHistogram;
               this.scheduler = scheduler;
               this.activeAttempts = activeAttempts;
               this.attemptCompleted = new AtomicBoolean(false);
           }
   ~~~
   
   2. Introduce a static method to create the 1st RetriableTask:
   ~~~java
           private static RetriableTask createRetriableAttempts(
                   RetriableAction runnable,
                   RetryPolicy retryPolicy,
                   ScheduledExecutorService blockingExecutor,
                   Histogram attemptsPerTaskHistogram,
                   ScheduledExecutorService scheduler,
                   Consumer<Throwable> failureCallback) {
               return new RetriableTask(
                       1,
                       new AtomicBoolean(false),
                       runnable,
                       retryPolicy,
                       blockingExecutor,
                       attemptsPerTaskHistogram,
                       scheduler,
                       failureCallback,
                       new AtomicInteger(1));
           }
   ~~~
   
   3. Rename the `next()` method to `nextAttempt()`
   ~~~ java
           private RetriableTask nextAttempt() {
               return new RetriableTask(
                       current + 1,
                       actionCompleted,
                       runnable,
                       retryPolicy,
                       blockingExecutor,
                       attemptsPerTaskHistogram,
                       scheduler,
                       failureCallback,
                       activeAttempts);
           }
   ~~~
   
   What do you think of my prosoals?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to