Hi Jinsong,
Thanks for your feedback! Let me try to answer the two questions:

For q1: Motivation
Yes, users can implement retries themselves based on the external  async
client, but this requires each user to do similar things, and if we can
support retries uniformly, user code would become much simpler.

> The real external call should happen in the asynchronous thread.
My question is: If the user makes a retry in this asynchronous thread by
themselves, is there a difference between this and the current FLIP's?


For q2: Block Main Thread
You're right, the queue data will be stored in the ListState which is an
OperateState, though in fact, for ListState storage, the theoretical upper
limit is Integer.MAX_VALUE, but we can't increase the queue capacity too
big in production because the risk of OOM increases when the queue capacity
grows, and increases the task parallelism maybe a more viable way when
encounter too many retry items for a single task.
We recommend using a proper estimate of queue capacity based on the formula
like this: 'inputRate * retryRate * avgRetryDuration', and also the actual
checkpoint duration in runtime.

> If I understand correctly, the retry queue will be put into ListState,
this
state is OperatorState? As far as I know, OperatorState does not have the
ability to store a lot of data.
So after we need to retry more data, we should need to block the main
thread? What is the maximum size of the default retry queue?



Best,
Lincoln Lee


Jingsong Li <jingsongl...@gmail.com> 于2022年5月16日周一 10:31写道:

> Thank Lincoln for the proposal.
>
> ## Motivation:
>
> > asyncInvoke and callback functions are executed synchronously by the main
> thread, which is not suitable adding long time blocking operations, and
> introducing additional thread will bring extra complexity for users
>
> According to the documentation of AsyncFunction:
>
> > For each #asyncInvoke, an async io operation can be triggered, and once
> it has been done, the result can be collected by calling {@link
> ResultFuture#complete}. For each async operation, its context is stored in
> the operator immediately after invoking #asyncInvoke, avoiding blocking for
> each stream input as long as the internal buffer is not full.
>
> The real external call should happen in the asynchronous thread.
>
> My question is: If the user makes a retry in this asynchronous thread by
> themselves, is there a difference between this and the current FLIP's?
>
> ## Block Main Thread
>
> If I understand correctly, the retry queue will be put into ListState, this
> state is OperatorState? As far as I know, OperatorState does not have the
> ability to store a lot of data.
> So after we need to retry more data, we should need to block the main
> thread? What is the maximum size of the default retry queue?
>
> Best,
> Jingsong
>
> On Thu, May 12, 2022 at 8:56 PM Lincoln Lee <lincoln.8...@gmail.com>
> wrote:
>
> > Dear Flink developers,
> >
> > I would like to open a discussion on FLIP 232 [1],  for an extension of
> > AsyncWaitOperator to support retry for user's asyncFunction.
> >
> > To do so, new user interface will added to define the trigger condition
> for
> > retry and when should retry. Internally, a delayed retry mechanism will
> be
> > introduced.
> >
> > There's PoC for this FLIP [2][3], thanks Yun Gao for offline discussions
> > and valuable comments.
> > The new feature is backwards compatible that can recover from state which
> > was generated by prior flink versions, and if no retry strategy enabled
> the
> > behavior is as before.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
> > [2] based on timer trigger
> > https://github.com/lincoln-lil/flink/pull/new/async-retry-timer
> > [3] based on DelayQueue with pull fashion
> > https://github.com/lincoln-lil/flink/pull/new/async-op-retry
> >
> >
> > Best,
> > Lincoln Lee
> >
>

Reply via email to