1996fanrui commented on code in PR #23867:
URL: https://github.com/apache/flink/pull/23867#discussion_r1438066538


##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java:
##########
@@ -214,28 +266,111 @@ void testMultipleSettings() throws Exception {
         assertThat(restartStrategy.canRestart()).isTrue();
         assertThat(restartStrategy.getBackoffTime()).isEqualTo(2L);
 
-        clock.advanceTime(3, TimeUnit.MILLISECONDS);
-        restartStrategy.notifyFailure(failure);
-        assertThat(restartStrategy.canRestart()).isTrue();
-        assertCorrectRandomRange(restartStrategy::getBackoffTime, 3L, 4L, 5L);
-
-        clock.advanceTime(7, TimeUnit.MILLISECONDS);
-        restartStrategy.notifyFailure(failure);
-        assertThat(restartStrategy.canRestart()).isTrue();
-        assertCorrectRandomRange(restartStrategy::getBackoffTime, 6L, 7L, 8L, 
9L);
-
         // ensure backoff is reset after threshold is reached
         clock.advanceTime(resetBackoffThresholdMS + 9 + 1, 
TimeUnit.MILLISECONDS);
         restartStrategy.notifyFailure(failure);
         assertThat(restartStrategy.canRestart()).isTrue();
         assertThat(restartStrategy.getBackoffTime()).isOne();
+        clock.advanceTime(Duration.ofMillis(maxBackoffMS + 1));
 
         // ensure backoff still increases
         restartStrategy.notifyFailure(failure);
         assertThat(restartStrategy.canRestart()).isTrue();
         assertThat(restartStrategy.getBackoffTime()).isEqualTo(2L);
     }
 
+    @Test
+    void testMergeMultipleExceptionsIntoOneAttempt() {
+        ManualClock clock = new ManualClock();
+        long initialBackoffMS = 2L;
+        double backoffMultiplier = 2.0d;
+        final long maxBackoffMS = 6L;
+        final long resetBackoffThresholdMS = 80L;
+
+        final ExponentialDelayRestartBackoffTimeStrategy restartStrategy =
+                new ExponentialDelayRestartBackoffTimeStrategy(
+                        clock,
+                        initialBackoffMS,
+                        maxBackoffMS,
+                        backoffMultiplier,
+                        resetBackoffThresholdMS,
+                        0.d,
+                        3);
+
+        // All exceptions merged into one attempt if the time is same.
+        long currentBackOffMs = initialBackoffMS;
+        checkMultipleExceptionsAreMerged(clock, currentBackOffMs, 
restartStrategy);
+
+        // After advance time it's a new round, so new exception will be a new 
attempt.
+        clock.advanceTime(1, TimeUnit.MILLISECONDS);
+        currentBackOffMs *= backoffMultiplier;
+        checkMultipleExceptionsAreMerged(clock, currentBackOffMs, 
restartStrategy);
+
+        // After advance time it's a new round, so new exception will be a new 
attempt.
+        clock.advanceTime(1, TimeUnit.MILLISECONDS);
+        currentBackOffMs = maxBackoffMS;
+        checkMultipleExceptionsAreMerged(clock, currentBackOffMs, 
restartStrategy);
+
+        // After advance time it's a new round, and it reaches the maxAttempts.
+        clock.advanceTime(1, TimeUnit.MILLISECONDS);
+        restartStrategy.notifyFailure(failure);
+        assertThat(restartStrategy.canRestart()).isFalse();
+    }
+
+    @Test
+    void testMergingExceptionsWorksWithResetting() {
+        ManualClock clock = new ManualClock();
+        long initialBackoffMS = 2L;
+        double backoffMultiplier = 2.0d;
+        final long maxBackoffMS = 6L;
+        final long resetBackoffThresholdMS = 80L;
+
+        final ExponentialDelayRestartBackoffTimeStrategy restartStrategy =
+                new ExponentialDelayRestartBackoffTimeStrategy(
+                        clock,
+                        initialBackoffMS,
+                        maxBackoffMS,
+                        backoffMultiplier,
+                        resetBackoffThresholdMS,
+                        0.d,
+                        3);
+
+        // Test the merging logic works well after a series of resetting.
+        for (int i = 0; i < 10; i++) {
+            // All exceptions merged into one attempt if the time is same.
+            long currentBackOffMs = initialBackoffMS;
+            checkMultipleExceptionsAreMerged(clock, currentBackOffMs, 
restartStrategy);
+
+            // After advance time it's a new round, so new exception will be a 
new attempt.
+            clock.advanceTime(1, TimeUnit.MILLISECONDS);
+            currentBackOffMs *= backoffMultiplier;
+            checkMultipleExceptionsAreMerged(clock, currentBackOffMs, 
restartStrategy);
+
+            // After advance time it's a new round, so new exception will be a 
new attempt.
+            clock.advanceTime(1, TimeUnit.MILLISECONDS);
+            currentBackOffMs = maxBackoffMS;
+            checkMultipleExceptionsAreMerged(clock, currentBackOffMs, 
restartStrategy);
+
+            // After resetBackoffThresholdMS+1, the restartStrategy should be 
reset.
+            clock.advanceTime(resetBackoffThresholdMS + 1, 
TimeUnit.MILLISECONDS);
+        }

Review Comment:
   Thanks for the review! Updated~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to