Hi Jinsong,

Good question!

The delayQueue is very similar to incompleteElements in
UnorderedStreamElementQueue, it only records the references of in-flight
retry elements, the core value is for the ease of a fast scan when force
flush during endInput and less refactor for existing logic.

Users needn't configure a new capacity for the delayQueue, just turn the
original one up (if needed).
And separately store the input data and retry state is mainly to implement
backwards compatibility. The first version of Poc, I used a single combined
state in order to reduce state costs, but hard to keep compatibility, and
changed  into two via Yun Gao's concern about the compatibility.

Best,
Lincoln Lee


Jingsong Li <jingsongl...@gmail.com> 于2022年5月16日周一 14:48写道:

> Thanks  Lincoln for your reply.
>
> I'm a little confused about the relationship between Ordered/Unordered
> Queue and DelayQueue. Why do we need to have a DelayQueue?
> Can we remove the DelayQueue and put the state of the retry in the
> StreamRecordQueueEntry (seems like it's already in the FLIP)
> The advantages of doing this are:
> 1. twice less data is stored in state
> 2. the concept is unified, the user only needs to configure one queue
> capacity
>
> Best,
> Jingsong
>
> On Mon, May 16, 2022 at 12:10 PM Lincoln Lee <lincoln.8...@gmail.com>
> wrote:
>
> > Hi Jinsong,
> > Thanks for your feedback! Let me try to answer the two questions:
> >
> > For q1: Motivation
> > Yes, users can implement retries themselves based on the external  async
> > client, but this requires each user to do similar things, and if we can
> > support retries uniformly, user code would become much simpler.
> >
> > > The real external call should happen in the asynchronous thread.
> > My question is: If the user makes a retry in this asynchronous thread by
> > themselves, is there a difference between this and the current FLIP's?
> >
> >
> > For q2: Block Main Thread
> > You're right, the queue data will be stored in the ListState which is an
> > OperateState, though in fact, for ListState storage, the theoretical
> upper
> > limit is Integer.MAX_VALUE, but we can't increase the queue capacity too
> > big in production because the risk of OOM increases when the queue
> capacity
> > grows, and increases the task parallelism maybe a more viable way when
> > encounter too many retry items for a single task.
> > We recommend using a proper estimate of queue capacity based on the
> formula
> > like this: 'inputRate * retryRate * avgRetryDuration', and also the
> actual
> > checkpoint duration in runtime.
> >
> > > If I understand correctly, the retry queue will be put into ListState,
> > this
> > state is OperatorState? As far as I know, OperatorState does not have the
> > ability to store a lot of data.
> > So after we need to retry more data, we should need to block the main
> > thread? What is the maximum size of the default retry queue?
> >
> >
> >
> > Best,
> > Lincoln Lee
> >
> >
> > Jingsong Li <jingsongl...@gmail.com> 于2022年5月16日周一 10:31写道:
> >
> > > Thank Lincoln for the proposal.
> > >
> > > ## Motivation:
> > >
> > > > asyncInvoke and callback functions are executed synchronously by the
> > main
> > > thread, which is not suitable adding long time blocking operations, and
> > > introducing additional thread will bring extra complexity for users
> > >
> > > According to the documentation of AsyncFunction:
> > >
> > > > For each #asyncInvoke, an async io operation can be triggered, and
> once
> > > it has been done, the result can be collected by calling {@link
> > > ResultFuture#complete}. For each async operation, its context is stored
> > in
> > > the operator immediately after invoking #asyncInvoke, avoiding blocking
> > for
> > > each stream input as long as the internal buffer is not full.
> > >
> > > The real external call should happen in the asynchronous thread.
> > >
> > > My question is: If the user makes a retry in this asynchronous thread
> by
> > > themselves, is there a difference between this and the current FLIP's?
> > >
> > > ## Block Main Thread
> > >
> > > If I understand correctly, the retry queue will be put into ListState,
> > this
> > > state is OperatorState? As far as I know, OperatorState does not have
> the
> > > ability to store a lot of data.
> > > So after we need to retry more data, we should need to block the main
> > > thread? What is the maximum size of the default retry queue?
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Thu, May 12, 2022 at 8:56 PM Lincoln Lee <lincoln.8...@gmail.com>
> > > wrote:
> > >
> > > > Dear Flink developers,
> > > >
> > > > I would like to open a discussion on FLIP 232 [1],  for an extension
> of
> > > > AsyncWaitOperator to support retry for user's asyncFunction.
> > > >
> > > > To do so, new user interface will added to define the trigger
> condition
> > > for
> > > > retry and when should retry. Internally, a delayed retry mechanism
> will
> > > be
> > > > introduced.
> > > >
> > > > There's PoC for this FLIP [2][3], thanks Yun Gao for offline
> > discussions
> > > > and valuable comments.
> > > > The new feature is backwards compatible that can recover from state
> > which
> > > > was generated by prior flink versions, and if no retry strategy
> enabled
> > > the
> > > > behavior is as before.
> > > >
> > > > [1]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
> > > > [2] based on timer trigger
> > > > https://github.com/lincoln-lil/flink/pull/new/async-retry-timer
> > > > [3] based on DelayQueue with pull fashion
> > > > https://github.com/lincoln-lil/flink/pull/new/async-op-retry
> > > >
> > > >
> > > > Best,
> > > > Lincoln Lee
> > > >
> > >
> >
>

Reply via email to