Thanks for the feedback @Till.

Yes I agree as well that opening up or changing the AsyncWaitOperator
doesn't seem to be a necessity here.
I think making "AsyncFunctionBase", making the current AsyncFunction as a
extension of it with a some of the default behaviors like Shuyi suggested
seems to be a good starting point.
To some extend we can also provide some of these strategies discussed as
default building blocks but I am not sure this is a must once we have the
"AsyncFunctionBase".

I would try to create a POC for the change and gather some feedbacks and
see if the abstract class contains too much or too little flexibilities.

Best,
Rong

On Tue, Mar 19, 2019 at 10:32 AM Till Rohrmann <trohrm...@apache.org> wrote:

> Sorry for joining the discussion so late. I agree that we could add some
> more syntactic sugar for handling failure cases. Looking at the existing
> interfaces, I think it should be fairly easy to create an abstract class
> AsyncFunctionWithRetry which extends AsyncFunction and encapsulates the
> retry logic for asynchronous operations. I think it is not strictly
> necessary to change the AsyncWaitOperator to add this functionality.
>
> Cheers,
> Till
>
> On Wed, Mar 13, 2019 at 5:42 PM Rong Rong <walter...@gmail.com> wrote:
>
>> Thanks for raising the concern @shuyi and the explanation @konstantin.
>>
>> Upon glancing on the Flink document, it seems like user have full control
>> on the timeout behavior [1]. But unlike AsyncWaitOperator, it is not
>> straightforward to access the internal state of the operator to, for
>> example, put the message back to the async buffer with a retry tag. Thus, I
>> also think that giving a set of common timeout handling seems to be a good
>> idea for Flink users and this could be very useful feature.
>>
>> Regarding the questions and concerns
>> 1. should the "retry counter" to be reset or to continue where it left
>> off?
>> - This is definitely a good point as this counter might need to go into
>> the operator state if we decided to carry over the retry counter.
>> Functionality-wise I think this should be reset because it doesn't
>> represent the same transient state at the time of failure once restart.
>>
>> 2. When should AsyncStream.orderedWait() skip a record?
>> - This should be configurable by user I am assuming, for example we can
>> have additional properties for each strategy described by @shuyi like a
>> combination of:
>>   - (RETRY_STRATEGY, MAX_RETRY_COUNT, RETRY_FAILURE_POLICY)
>>
>> I've also created a JIRA ticket [2] for the discussion, please feel free
>> to share your thoughts and comments.
>>
>> --
>> Rong
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html#timeout-handling
>> [2] https://issues.apache.org/jira/browse/FLINK-11909
>>
>>
>>
>> On Tue, Mar 12, 2019 at 6:29 AM Konstantin Knauf <
>> konstan...@ververica.com> wrote:
>>
>>> Hi Shuyi,
>>>
>>> I am not sure. You could handle retries in the user code within
>>> org.apache.flink.streaming.api.functions.async.AsyncFunction#asyncInvoke
>>> without using a DLQ as described in my original answer to William.  On the
>>> other hand, I agree that it could easier for the user and it is indeed a
>>> common scenario.
>>>
>>> Two follow up questions come to mind:
>>>
>>>    - When a Flink job fails and restarts, would you expect the "retry
>>>    counter" to be reset or to continue where it left off?
>>>    - With AsyncStream.orderedWait() when would you expect a record to
>>>    be skipped? After the final timeout, after the first timeout?
>>>
>>> Would you like to create a JIRA ticket [1] for this improvement with
>>> answers to the questions above and we can continue to discuss it there.
>>>
>>> Best,
>>>
>>> Konstantin
>>>
>>> [1]
>>> https://issues.apache.org/jira/projects/FLINK/issues/FLINK-11835?filter=allopenissues
>>>
>>>
>>> On Sun, Mar 10, 2019 at 9:20 AM Shuyi Chen <suez1...@gmail.com> wrote:
>>>
>>>> Hi Konstantin,
>>>>
>>>> (cc Till since he owns the code)
>>>>
>>>> For async-IO, IO failure and retry is a common & expected pattern. In
>>>> most of the use cases, users will need to deal with IO failure and retry.
>>>> Therefore, I think it's better to address the problem in Flink rather than
>>>> user implementing its custom logic in user code for a better dev
>>>> experience. We do have similar problem in many of our use cases. To enable
>>>> backoff and retry, we need to put the failed message to a DLQ (another
>>>> Kafka topic) and re-ingest the message from the DLQ topic to retry, which
>>>> is manual/cumbersome and require setting up extra Kafka topic.
>>>>
>>>> Can we add multiple strategies to handle async IO failure in the
>>>> AsyncWaitOperator? I propose the following strategies:
>>>>
>>>>
>>>>    - FAIL_OPERATOR (default & current behavior)
>>>>    - FIX_INTERVAL_RETRY (retry with configurable fixed interval up to
>>>>    N times)
>>>>    - EXP_BACKOFF_RETRY (retry with exponential backoff up to N times)
>>>>
>>>> What do you guys think? Thanks a lot.
>>>>
>>>> Shuyi
>>>>
>>>> On Fri, Mar 8, 2019 at 3:17 PM Konstantin Knauf <
>>>> konstan...@ververica.com> wrote:
>>>>
>>>>> Hi William,
>>>>>
>>>>> the AsyncOperator does not have such a setting. It is "merely" a
>>>>> wrapper around an asynchronous call, which provides integration with
>>>>> Flink's state & time management.
>>>>>
>>>>> I think, the way to go would be to do the exponential back-off in the
>>>>> user code and set the timeout of the AsyncOperator to the sum of the
>>>>> timeouts in the user code (e.g. 2s + 4s + 8s + 16s).
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Konstantin
>>>>>
>>>>>
>>>>> On Thu, Mar 7, 2019 at 5:20 PM William Saar <will...@saar.se> wrote:
>>>>>
>>>>>> Hi,
>>>>>> Is there a way to specify an exponential backoff strategy for when
>>>>>> async function calls fail?
>>>>>>
>>>>>> I have an async function that does web requests to a rate-limited
>>>>>> API. Can you handle that with settings on the async function call?
>>>>>>
>>>>>> Thanks,
>>>>>> William
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Konstantin Knauf | Solutions Architect
>>>>>
>>>>> +49 160 91394525
>>>>>
>>>>> <https://www.ververica.com/>
>>>>>
>>>>> Follow us @VervericaData
>>>>>
>>>>> --
>>>>>
>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>> Conference
>>>>>
>>>>> Stream Processing | Event Driven | Real Time
>>>>>
>>>>> --
>>>>>
>>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>
>>>>> --
>>>>> Data Artisans GmbH
>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>>>>
>>>>
>>>
>>> --
>>>
>>> Konstantin Knauf | Solutions Architect
>>>
>>> +49 160 91394525
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Data Artisans GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>>
>>

Reply via email to