1996fanrui commented on code in PR #23867: URL: https://github.com/apache/flink/pull/23867#discussion_r1438066538
########## flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java: ########## @@ -214,28 +266,111 @@ void testMultipleSettings() throws Exception { assertThat(restartStrategy.canRestart()).isTrue(); assertThat(restartStrategy.getBackoffTime()).isEqualTo(2L); - clock.advanceTime(3, TimeUnit.MILLISECONDS); - restartStrategy.notifyFailure(failure); - assertThat(restartStrategy.canRestart()).isTrue(); - assertCorrectRandomRange(restartStrategy::getBackoffTime, 3L, 4L, 5L); - - clock.advanceTime(7, TimeUnit.MILLISECONDS); - restartStrategy.notifyFailure(failure); - assertThat(restartStrategy.canRestart()).isTrue(); - assertCorrectRandomRange(restartStrategy::getBackoffTime, 6L, 7L, 8L, 9L); - // ensure backoff is reset after threshold is reached clock.advanceTime(resetBackoffThresholdMS + 9 + 1, TimeUnit.MILLISECONDS); restartStrategy.notifyFailure(failure); assertThat(restartStrategy.canRestart()).isTrue(); assertThat(restartStrategy.getBackoffTime()).isOne(); + clock.advanceTime(Duration.ofMillis(maxBackoffMS + 1)); // ensure backoff still increases restartStrategy.notifyFailure(failure); assertThat(restartStrategy.canRestart()).isTrue(); assertThat(restartStrategy.getBackoffTime()).isEqualTo(2L); } + @Test + void testMergeMultipleExceptionsIntoOneAttempt() { + ManualClock clock = new ManualClock(); + long initialBackoffMS = 2L; + double backoffMultiplier = 2.0d; + final long maxBackoffMS = 6L; + final long resetBackoffThresholdMS = 80L; + + final ExponentialDelayRestartBackoffTimeStrategy restartStrategy = + new ExponentialDelayRestartBackoffTimeStrategy( + clock, + initialBackoffMS, + maxBackoffMS, + backoffMultiplier, + resetBackoffThresholdMS, + 0.d, + 3); + + // All exceptions merged into one attempt if the time is same. + long currentBackOffMs = initialBackoffMS; + checkMultipleExceptionsAreMerged(clock, currentBackOffMs, restartStrategy); + + // After advance time it's a new round, so new exception will be a new attempt. + clock.advanceTime(1, TimeUnit.MILLISECONDS); + currentBackOffMs *= backoffMultiplier; + checkMultipleExceptionsAreMerged(clock, currentBackOffMs, restartStrategy); + + // After advance time it's a new round, so new exception will be a new attempt. + clock.advanceTime(1, TimeUnit.MILLISECONDS); + currentBackOffMs = maxBackoffMS; + checkMultipleExceptionsAreMerged(clock, currentBackOffMs, restartStrategy); + + // After advance time it's a new round, and it reaches the maxAttempts. + clock.advanceTime(1, TimeUnit.MILLISECONDS); + restartStrategy.notifyFailure(failure); + assertThat(restartStrategy.canRestart()).isFalse(); + } + + @Test + void testMergingExceptionsWorksWithResetting() { + ManualClock clock = new ManualClock(); + long initialBackoffMS = 2L; + double backoffMultiplier = 2.0d; + final long maxBackoffMS = 6L; + final long resetBackoffThresholdMS = 80L; + + final ExponentialDelayRestartBackoffTimeStrategy restartStrategy = + new ExponentialDelayRestartBackoffTimeStrategy( + clock, + initialBackoffMS, + maxBackoffMS, + backoffMultiplier, + resetBackoffThresholdMS, + 0.d, + 3); + + // Test the merging logic works well after a series of resetting. + for (int i = 0; i < 10; i++) { + // All exceptions merged into one attempt if the time is same. + long currentBackOffMs = initialBackoffMS; + checkMultipleExceptionsAreMerged(clock, currentBackOffMs, restartStrategy); + + // After advance time it's a new round, so new exception will be a new attempt. + clock.advanceTime(1, TimeUnit.MILLISECONDS); + currentBackOffMs *= backoffMultiplier; + checkMultipleExceptionsAreMerged(clock, currentBackOffMs, restartStrategy); + + // After advance time it's a new round, so new exception will be a new attempt. + clock.advanceTime(1, TimeUnit.MILLISECONDS); + currentBackOffMs = maxBackoffMS; + checkMultipleExceptionsAreMerged(clock, currentBackOffMs, restartStrategy); + + // After resetBackoffThresholdMS+1, the restartStrategy should be reset. + clock.advanceTime(resetBackoffThresholdMS + 1, TimeUnit.MILLISECONDS); + } Review Comment: Thanks for the review! Updated~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org