Dear Flink Community,
I am using AsyncDataStream.unorderedWaitWithRetry together with a
RichAsyncFunction to perform asynchronous processing in Flink 1.19. I have
configured a retry strategy using FixedDelayRetryStrategy with a maximum of
3 retries and a delay of 1000 milliseconds.
*Issue:*
When the initial call to asyncInvoke() times out, the overridden timeout()
method is invoked as expected. However, when a retry is triggered (either
due to a timeout or another exception), and that retry also experiences a
timeout, the timeout() method is not called again. It seems that the
timeout is not re-registered for retries, which causes those retry attempts
to hang indefinitely.
*Observed Output:*
CustomSend AsyncIO start...!Trying document at 1752058980697 -->
customDoc://korea.test.test/12345Timeout document at 1752058982687 -->
customDoc://korea.test.test/12345Trying document at 1752058983701 -->
customDoc://korea.test.test/12345// No further timeout messages appear
*Expected Output:*
Each retry should also be subject to timeout handling, and the timeout()
method should be called if the retry does not complete within the timeout
period. For example:
Trying document...Timeout...Trying document again...Timeout...
(repeated up to max retries)
*Test Configuration Summary:*
-
In asyncInvoke(), I simulate long processing using Thread.sleep(150000)
inside a CompletableFuture to force timeouts.
-
The timeout() method calls resultFuture.completeExceptionally(...).
-
unorderedWaitWithRetry(...) configuration:
-
Timeout: 1000 ms
-
Retry Strategy: Fixed delay, 3 retries, 1000 ms delay
-
Parallelism: 1
*Workaround / Potential Fix:*
I tried modifying the AsyncWaitOperator#tryOnce(...) method to manually
call resultHandlerDelegator.registerTimeout(timeout) before each retry
attempt. This seems to resolve the issue and ensures that all retries
respect the timeout.
private void tryOnce(RetryableResultHandlerDelegator
resultHandlerDelegator) throws Exception {
resultHandlerDelegator.currentAttempts++;
resultHandlerDelegator.registerTimeout(timeout); // Fix?
userFunction.asyncInvoke(
resultHandlerDelegator.resultHandler.inputRecord.getValue(),
resultHandlerDelegator);
}
*My Questions:*
-
Is this a known issue?
-
Is it valid to call registerTimeout() inside tryOnce()?
-
Could this introduce any side effects, especially when called via
finishInFlightDelayedRetry()?
If this is confirmed to be a bug, I am happy to create a JIRA ticket and
submit a fix.
Thank you for your time and support.
Best regards,
Honggeun Ji