This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3a2fc5ef34f563c906473cbe4bdd79a9d7eec48e
Author: lincoln lee <lincoln.8...@gmail.com>
AuthorDate: Tue Aug 9 17:53:04 2022 +0800

    [hotfix][runtime] Do last attempt without successfully canceling the retry 
timer to prevent unexpected incomplete element during finish phase in 
AsyncWaitOperator
    
    It is hard to reproduce this in runtime tests, but occasionally happens in 
AsyncLookupJoinITCase#testAsyncJoinTemporalTableWithLookupThresholdWithSufficientRetry
 of FLINK-28849. It's better to add a separate test in runtime.
    
    This closes #20482
---
 .../flink/streaming/api/operators/async/AsyncWaitOperator.java    | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index ba3f1c3ad87..0d88943b21e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -359,10 +359,10 @@ public class AsyncWaitOperator<IN, OUT>
             if (inFlightDelayRetryHandlers.size() > 0) {
                 for (RetryableResultHandlerDelegator delegator : 
inFlightDelayRetryHandlers) {
                     assert delegator.delayedRetryTimer != null;
-                    // cancel retry timer, cancel failure means retry action 
already being executed
-                    if (delegator.delayedRetryTimer.cancel(true)) {
-                        tryOnce(delegator);
-                    }
+                    // fire an attempt intermediately not rely on successfully 
canceling the retry
+                    // timer for two reasons: 1. cancel retry timer can not be 
100% safe 2. there's
+                    // protection for repeated retries
+                    tryOnce(delegator);
                 }
                 inFlightDelayRetryHandlers.clear();
             }

Reply via email to