Thanks, Lincoln, the updated comments look good to me.

Best,
Jark

On Fri, 27 May 2022 at 14:21, Lincoln Lee <lincoln.8...@gmail.com> wrote:

> Hi Jark & developers,
>
> I'm fine with this, and minor changes:
>
> "timeout from first invoke to final completion of asynchronous operation,
> may include multiple retries, and will be reset in case of failover"
>
> The FLIP[1] was updated including two changes:
> 1. generic type naming, use OUT instead of T
> 2. the new api's comments
>
> *And if no more new feedback, we will start a VOTE next monday.*
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
>
> Best,
> Lincoln Lee
>
>
> Jark Wu <imj...@gmail.com> 于2022年5月26日周四 23:10写道:
>
> > Hi Lincoln,
> >
> > What do you think about
> > "timeout for the asynchronous operation from the first invoke to finally
> > complete, which may across multiple retry attempts".
> >
> > Best,
> > Jark
> >
> > On Wed, 25 May 2022 at 20:29, Lincoln Lee <lincoln.8...@gmail.com>
> wrote:
> >
> > > Hi Jark,
> > >
> > > Thanks for your feedback!
> > >
> > > for 2) good advice for the generic type naming, use OUT instead of T
> for
> > > the async scenario can be better.
> > >
> > > for 3) your concern makes sense to me, we should make the change more
> > > explicitly to users, especially the api itself (although the
> > documentation
> > > is necessary, it is not sufficient). And I didn't paste the complete
> > method
> > > signature into the FLIP.
> > > Now review the comments of the new method again, obviously it can not
> > > eliminate your confusion by just saying:
> > > '@param timeout for the asynchronous operation to complete include all
> > > reattempts.'
> > >
> > > The 'timeout' we want to clarify is that the user function finally
> > reaches
> > > the complete state, including all of the reattempts' time, and there is
> > no
> > > separate timeout for each attempt.
> > >
> > > In a worst case, if the first async request is stuck until the timeout,
> > > then enable retry will not improve (we discussed this scenario, in the
> > case
> > > of such a stuck, very probability the retry still stucks, and more
> > > importantly, there's no contract on freeing the resource for the
> stucked
> > > request for the user function, so we prefer to keep the behavior as it
> is
> > > now)
> > >
> > > Do you think it would be easier to understand if changes to:  '@param
> > > timeout for the asynchronous operation that finally complete, including
> > all
> > > reattempts and there is no separate timeout for each attempt.' ?
> > >
> > > Best,
> > > Lincoln Lee
> > >
> > >
> > > Jark Wu <imj...@gmail.com> 于2022年5月25日周三 17:45写道:
> > >
> > > > Hi Lincoln,
> > > >
> > > > Thanks for proposing this retry feature for the async operator, this
> > > would
> > > > be very helpful for FLIP-234.
> > > > It's glad to see the vivid discussion, and the following are my
> > thoughts:
> > > >
> > > > 1) +1 w/o retry state.
> > > > It's very tricky and hard to implement a semantic exact state for
> retry
> > > > (currentAttemps and firstExecTime/costTime
> > > >  may not be enough). I think this might be overdesigned because most
> > > users
> > > > are fine with more retries when
> > > >  failover happens. Flink also doesn't provide the exact retry
> semantic
> > in
> > > > other places, e.g. "restart-strategy".
> > > >
> > > > 2) It confuses me what's the meaning of generic type <T>
> > > > of AsyncRetryStrategy and AsyncRetryPredicate.
> > > > It would be better to add an annotation description for it. In
> > addition,
> > > > maybe <OUT> would be better to keep
> > > > aligned with other async interfaces (e.g. AsyncFunction).
> > > >
> > > > 3) timeout parameter: total timeout vs. timeout per async operation
> > > > According to the Javadoc `AsyncDataStream#orderedWait/unorderedWait`,
> > the
> > > > "timeout" parameter is for
> > > > the asynchronous operation to complete, i.e. every call of
> > > > `AsyncFunction#asyncInvoke`. When we add a new
> > > > `orderedWaitWithRetry` method, I think we should keep the meaning of
> > > > "timeout" unchanged, otherwise,
> > > > we need a different parameter name and description.
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > > On Wed, 25 May 2022 at 15:00, Lincoln Lee <lincoln.8...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi everyone,
> > > > >
> > > > > Gen Luo, Yun Gao and I had a long offline discussion about the
> > > > > implementation of the recovery part. The key point was should we
> > store
> > > > the
> > > > > retry state and do the recovery after the job restart?
> > > > >
> > > > > We reached a consensus not to store the retry state for now, which
> is
> > > the
> > > > > clearest for users and does not require any new changes to the
> > current
> > > > > recovery behavior.  We have discussed three possible options, the
> > > > behavior
> > > > > of these three options is identical in normal processing, the only
> > > > > difference lies in what retry state is recorded when do
> > checkpointing,
> > > > and
> > > > > what is the strategy when recovering.
> > > > >
> > > > > More details are updated into the FLIP[1], and the PoC[2] is also
> > > > updated.
> > > > >
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
> > > > > [2] https://github.com/lincoln-lil/flink/tree/async-retry-poc
> > > > >
> > > > > Best,
> > > > > Lincoln Lee
> > > > >
> > > > >
> > > > > Lincoln Lee <lincoln.8...@gmail.com> 于2022年5月24日周二 12:23写道:
> > > > >
> > > > > > Hi Gen Luo,
> > > > > >
> > > > > > You're right, the total cost time include the failover-restart
> > time.
> > > So
> > > > > > when the failover time exceeds the retry timeout set by the user,
> > in
> > > > > fact,
> > > > > > all the data to be retry after recovery will have no additional
> > retry
> > > > > > opportunities, which is equivalent to normal data. In such
> > > > circumstances,
> > > > > > the retry state takes no effect. But not all jobs' restart is
> slow
> > > and
> > > > in
> > > > > > flink it is becoming more and more fast due the continuously
> > > > > improvements.
> > > > > > Hope this can help explaining your question.
> > > > > >
> > > > > > Best,
> > > > > > Lincoln Lee
> > > > > >
> > > > > >
> > > > > > Gen Luo <luogen...@gmail.com> 于2022年5月24日周二 11:50写道:
> > > > > >
> > > > > >> Hi Lincoln,
> > > > > >>
> > > > > >> Thanks for the explanation. I understand your thought, but I'm a
> > > > little
> > > > > >> confused by the additional detail.
> > > > > >> Is the startTime when the record is processed for the first
> time?
> > > And
> > > > > the
> > > > > >> cost time is counted based on it even after a job recovers from
> a
> > > > > failover
> > > > > >> or is restarted? For the failover case, the records may be
> > processed
> > > > > >> successfully when normally running, but after some time
> (probably
> > > > longer
> > > > > >> than the timeout) the job fails and restores, the records in the
> > > retry
> > > > > >> state will be timeout and discarded immediately. There's also
> same
> > > > > >> situation for the restarting case. I suppose in many cases the
> > > timeout
> > > > > >> will
> > > > > >> be less then the time a job may cost to restart, so in these
> cases
> > > the
> > > > > >> stored in-flight retry attempts will timeout immediately after
> the
> > > > > >> restarting, making the retry state meaningless. Please let me
> know
> > > if
> > > > I
> > > > > >> mistake somthing.
> > > > > >>
> > > > > >> Lincoln Lee <lincoln.8...@gmail.com> 于 2022年5月24日周二 10:20写道:
> > > > > >>
> > > > > >> > Thanks Gen Luo!
> > > > > >> >
> > > > > >> > Agree with you that prefer the simpler design.
> > > > > >> >
> > > > > >> > I’d like to share my thoughts on this choice: whether store
> the
> > > > retry
> > > > > >> state
> > > > > >> > or not only affect the recovery logic, not the per-record
> > > > processing,
> > > > > >> so I
> > > > > >> > just compare the two:
> > > > > >> > 1. w/ retry state:  simple recovery but lost precision
> > > > > >> > 2. w/o retry state: one more state and little complexly but
> > > precise
> > > > > for
> > > > > >> > users
> > > > > >> > I prefer the second one for the user perspective, the
> additional
> > > > > >> complexity
> > > > > >> > is manageable.
> > > > > >> >
> > > > > >> > One detail that not mentioned in the FLIP: we will check if
> any
> > > time
> > > > > >> left
> > > > > >> >  (now() - startTime > timeout) for next attempt, so the real
> > total
> > > > > >> attempts
> > > > > >> > will always less than or equal to maxAttempts and the total
> cost
> > > > time
> > > > > <=
> > > > > >> > timeout (one special case is job failover takes too long)
> > > > > >> >
> > > > > >> > For the api, I've updated the FLIP[1]
> > > > > >> >
> > > > > >> > [1]:
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
> > > > > >> >
> > > > > >> > Best,
> > > > > >> > Lincoln Lee
> > > > > >> >
> > > > > >> >
> > > > > >> > Gen Luo <luogen...@gmail.com> 于2022年5月23日周一 16:54写道:
> > > > > >> >
> > > > > >> > > Hi Lincoln,
> > > > > >> > >
> > > > > >> > > Thanks for the quick reply.
> > > > > >> > >
> > > > > >> > >
> > > > > >> > >
> > > > > >> > > 1. I understand when restarting a job with a savepoint, the
> > > retry
> > > > > >> state
> > > > > >> > can
> > > > > >> > > ensure the total retry attempts and delay is expected.
> > However,
> > > > when
> > > > > >> > > failover happens while a job is running, the remaining
> > attempts
> > > > > >> recorded
> > > > > >> > in
> > > > > >> > > the state are actually redid, and of course the total
> attempts
> > > are
> > > > > >> more
> > > > > >> > > than expected. The delay is indeed one of the concerns, but
> > I'm
> > > > > >> wondering
> > > > > >> > > whether the retry state kept here is really important to
> users
> > > or
> > > > > >> not. In
> > > > > >> > > my opinion its benefit is limited but it makes the change
> much
> > > > more
> > > > > >> > > complex. I would prefer a simpler solution, in which the
> retry
> > > > state
> > > > > >> is
> > > > > >> > > still possible to add if the need really arises in the
> future,
> > > > but I
> > > > > >> > > respect your decision.
> > > > > >> > >
> > > > > >> > >
> > > > > >> > >
> > > > > >> > > 2. I think adding a currentAttempts parameter to the method
> is
> > > > good
> > > > > >> > enough.
> > > > > >> > >
> > > > > >> > > Lincoln Lee <lincoln.8...@gmail.com> 于 2022年5月23日周一
> 14:52写道:
> > > > > >> > >
> > > > > >> > > > Hi Gen Luo,
> > > > > >> > > >     Thanks a lot for your feedback!
> > > > > >> > > >
> > > > > >> > > > 1. About the retry state:
> > > > > >> > > > I considered dropping the retry state which really
> > simplifies
> > > > > state
> > > > > >> > > changes
> > > > > >> > > > and avoids compatibility handling. The only reason I
> changed
> > > my
> > > > > mind
> > > > > >> > was
> > > > > >> > > > that it might be lossy to the user. Elements that has been
> > > tried
> > > > > >> > several
> > > > > >> > > > times but not exhausted its retry opportunities will reset
> > the
> > > > > retry
> > > > > >> > > state
> > > > > >> > > > after a job failover-restart and start the retry process
> > again
> > > > (if
> > > > > >> the
> > > > > >> > > > retry condition persists true), which may cause a greater
> > > delay
> > > > > for
> > > > > >> the
> > > > > >> > > > retried elements, actually retrying more times and for
> > longer
> > > > than
> > > > > >> > > > expected. (Although in the PoC may also have a special
> case
> > > when
> > > > > >> > > > recovering: if the remaining timeout is exhausted for the
> > > > > >> > recalculation,
> > > > > >> > > it
> > > > > >> > > > will execute immediately but will have to register a
> timeout
> > > > timer
> > > > > >> for
> > > > > >> > > the
> > > > > >> > > > async, here using an extra backoffTimeMillis)
> > > > > >> > > > For example, '60s fixed-delay retry if empty result,
> > > > max-attempts:
> > > > > >> 5,
> > > > > >> > > > timeout 300s'
> > > > > >> > > > When checkpointing, some data has been retry 2 times, then
> > > > suppose
> > > > > >> the
> > > > > >> > > job
> > > > > >> > > > is restarted and it takes 2min when the restart succeeds,
> if
> > > we
> > > > > drop
> > > > > >> > the
> > > > > >> > > > retry state, the worst case will take more 240s(60s * 2 +
> > > 2min)
> > > > > >> delay
> > > > > >> > for
> > > > > >> > > > users to finish retry.
> > > > > >> > > >
> > > > > >> > > > For my understanding(please correct me if I missed
> > something),
> > > > if
> > > > > a
> > > > > >> job
> > > > > >> > > is
> > > > > >> > > > resumed from a previous state and the retry strategy is
> > > changed,
> > > > > the
> > > > > >> > > > elements that need to be recovered in the retry state just
> > > needs
> > > > > the
> > > > > >> > new
> > > > > >> > > > strategy to take over the current attempts and time that
> has
> > > > been
> > > > > >> used,
> > > > > >> > > or
> > > > > >> > > > give up retry if no retry strategy was set.
> > > > > >> > > > > and can be more compatible when the user restart a job
> > with
> > > a
> > > > > >> changed
> > > > > >> > > > retry strategy.
> > > > > >> > > >
> > > > > >> > > > 2.  About the interface, do you think it would be helpful
> if
> > > add
> > > > > the
> > > > > >> > > > currentAttempts into getBackoffTimeMillis()? e.g.,  long
> > > > > >> > > > getBackoffTimeMillis(int currentAttempts)
> > > > > >> > > > The existing RetryStrategy and RestartBackoffTimeStrategy
> > were
> > > > in
> > > > > my
> > > > > >> > > > candidate list but not exactly match, and I want to avoid
> > > > creating
> > > > > >> the
> > > > > >> > > new
> > > > > >> > > > instances for every attempt in RetryStrategy.
> > > > > >> > > >
> > > > > >> > > > WDYT?
> > > > > >> > > >
> > > > > >> > > > Best,
> > > > > >> > > > Lincoln Lee
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > Gen Luo <luogen...@gmail.com> 于2022年5月23日周一 11:37写道:
> > > > > >> > > >
> > > > > >> > > > > Thank Lincoln for the proposal!
> > > > > >> > > > >
> > > > > >> > > > > The FLIP looks good to me. I'm in favor of the timer
> based
> > > > > >> > > > implementation,
> > > > > >> > > > > and I'd like to share some thoughts.
> > > > > >> > > > >
> > > > > >> > > > > I'm thinking if we have to store the retry status in the
> > > > state.
> > > > > I
> > > > > >> > > suppose
> > > > > >> > > > > the retrying requests can just submit as the first
> attempt
> > > > when
> > > > > >> the
> > > > > >> > job
> > > > > >> > > > > restores from a checkpoint, since in fact the side
> effect
> > of
> > > > the
> > > > > >> > > retries
> > > > > >> > > > > can not draw back by the restoring. This makes the state
> > > > simpler
> > > > > >> and
> > > > > >> > > > makes
> > > > > >> > > > > it unnecessary to do the state migration, and can be
> more
> > > > > >> compatible
> > > > > >> > > when
> > > > > >> > > > > the user restart a job with a changed retry strategy.
> > > > > >> > > > >
> > > > > >> > > > > Besides, I find it hard to implement a flexible backoff
> > > > strategy
> > > > > >> with
> > > > > >> > > the
> > > > > >> > > > > current AsyncRetryStrategy interface, for example an
> > > > > >> > > > > ExponentialBackoffRetryStrategy. Maybe we can add a
> > > parameter
> > > > of
> > > > > >> the
> > > > > >> > > > > attempt or just use the
> > > > > >> > org.apache.flink.util.concurrent.RetryStrategy
> > > > > >> > > to
> > > > > >> > > > > take the place of the retry strategy part in the
> > > > > >> AsyncRetryStrategy?
> > > > > >> > > > >
> > > > > >> > > > > Lincoln Lee <lincoln.8...@gmail.com> 于 2022年5月20日周五
> > > 14:24写道:
> > > > > >> > > > >
> > > > > >> > > > > > Hi everyone,
> > > > > >> > > > > >
> > > > > >> > > > > >    By comparing the two internal implementations of
> > > delayed
> > > > > >> > retries,
> > > > > >> > > we
> > > > > >> > > > > > prefer the timer-based solution, which obtains precise
> > > delay
> > > > > >> > control
> > > > > >> > > > > > through simple logic and only needs to pay (what we
> > > consider
> > > > > to
> > > > > >> be
> > > > > >> > > > > > acceptable) timer instance cost for the retry element.
> > > The
> > > > > >> FLIP[1]
> > > > > >> > > doc
> > > > > >> > > > > has
> > > > > >> > > > > > been updated.
> > > > > >> > > > > >
> > > > > >> > > > > > [1]:
> > > > > >> > > > > >
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
> > > > > >> > > > > >
> > > > > >> > > > > > Best,
> > > > > >> > > > > > Lincoln Lee
> > > > > >> > > > > >
> > > > > >> > > > > >
> > > > > >> > > > > > Lincoln Lee <lincoln.8...@gmail.com> 于2022年5月16日周一
> > > 15:09写道:
> > > > > >> > > > > >
> > > > > >> > > > > > > Hi Jinsong,
> > > > > >> > > > > > >
> > > > > >> > > > > > > Good question!
> > > > > >> > > > > > >
> > > > > >> > > > > > > The delayQueue is very similar to incompleteElements
> > in
> > > > > >> > > > > > > UnorderedStreamElementQueue, it only records the
> > > > references
> > > > > of
> > > > > >> > > > > in-flight
> > > > > >> > > > > > > retry elements, the core value is for the ease of a
> > fast
> > > > > scan
> > > > > >> > when
> > > > > >> > > > > force
> > > > > >> > > > > > > flush during endInput and less refactor for existing
> > > > logic.
> > > > > >> > > > > > >
> > > > > >> > > > > > > Users needn't configure a new capacity for the
> > > delayQueue,
> > > > > >> just
> > > > > >> > > turn
> > > > > >> > > > > the
> > > > > >> > > > > > > original one up (if needed).
> > > > > >> > > > > > > And separately store the input data and retry state
> is
> > > > > mainly
> > > > > >> to
> > > > > >> > > > > > implement
> > > > > >> > > > > > > backwards compatibility. The first version of Poc, I
> > > used
> > > > a
> > > > > >> > single
> > > > > >> > > > > > combined
> > > > > >> > > > > > > state in order to reduce state costs, but hard to
> keep
> > > > > >> > > compatibility,
> > > > > >> > > > > and
> > > > > >> > > > > > > changed  into two via Yun Gao's concern about the
> > > > > >> compatibility.
> > > > > >> > > > > > >
> > > > > >> > > > > > > Best,
> > > > > >> > > > > > > Lincoln Lee
> > > > > >> > > > > > >
> > > > > >> > > > > > >
> > > > > >> > > > > > > Jingsong Li <jingsongl...@gmail.com> 于2022年5月16日周一
> > > > 14:48写道:
> > > > > >> > > > > > >
> > > > > >> > > > > > >> Thanks  Lincoln for your reply.
> > > > > >> > > > > > >>
> > > > > >> > > > > > >> I'm a little confused about the relationship
> between
> > > > > >> > > > Ordered/Unordered
> > > > > >> > > > > > >> Queue and DelayQueue. Why do we need to have a
> > > > DelayQueue?
> > > > > >> > > > > > >> Can we remove the DelayQueue and put the state of
> the
> > > > retry
> > > > > >> in
> > > > > >> > the
> > > > > >> > > > > > >> StreamRecordQueueEntry (seems like it's already in
> > the
> > > > > FLIP)
> > > > > >> > > > > > >> The advantages of doing this are:
> > > > > >> > > > > > >> 1. twice less data is stored in state
> > > > > >> > > > > > >> 2. the concept is unified, the user only needs to
> > > > configure
> > > > > >> one
> > > > > >> > > > queue
> > > > > >> > > > > > >> capacity
> > > > > >> > > > > > >>
> > > > > >> > > > > > >> Best,
> > > > > >> > > > > > >> Jingsong
> > > > > >> > > > > > >>
> > > > > >> > > > > > >> On Mon, May 16, 2022 at 12:10 PM Lincoln Lee <
> > > > > >> > > > lincoln.8...@gmail.com>
> > > > > >> > > > > > >> wrote:
> > > > > >> > > > > > >>
> > > > > >> > > > > > >> > Hi Jinsong,
> > > > > >> > > > > > >> > Thanks for your feedback! Let me try to answer
> the
> > > two
> > > > > >> > > questions:
> > > > > >> > > > > > >> >
> > > > > >> > > > > > >> > For q1: Motivation
> > > > > >> > > > > > >> > Yes, users can implement retries themselves based
> > on
> > > > the
> > > > > >> > > external
> > > > > >> > > > > > async
> > > > > >> > > > > > >> > client, but this requires each user to do similar
> > > > things,
> > > > > >> and
> > > > > >> > if
> > > > > >> > > > we
> > > > > >> > > > > > can
> > > > > >> > > > > > >> > support retries uniformly, user code would become
> > > much
> > > > > >> > simpler.
> > > > > >> > > > > > >> >
> > > > > >> > > > > > >> > > The real external call should happen in the
> > > > > asynchronous
> > > > > >> > > thread.
> > > > > >> > > > > > >> > My question is: If the user makes a retry in this
> > > > > >> asynchronous
> > > > > >> > > > > thread
> > > > > >> > > > > > by
> > > > > >> > > > > > >> > themselves, is there a difference between this
> and
> > > the
> > > > > >> current
> > > > > >> > > > > FLIP's?
> > > > > >> > > > > > >> >
> > > > > >> > > > > > >> >
> > > > > >> > > > > > >> > For q2: Block Main Thread
> > > > > >> > > > > > >> > You're right, the queue data will be stored in
> the
> > > > > >> ListState
> > > > > >> > > which
> > > > > >> > > > > is
> > > > > >> > > > > > an
> > > > > >> > > > > > >> > OperateState, though in fact, for ListState
> > storage,
> > > > the
> > > > > >> > > > theoretical
> > > > > >> > > > > > >> upper
> > > > > >> > > > > > >> > limit is Integer.MAX_VALUE, but we can't increase
> > the
> > > > > queue
> > > > > >> > > > capacity
> > > > > >> > > > > > too
> > > > > >> > > > > > >> > big in production because the risk of OOM
> increases
> > > > when
> > > > > >> the
> > > > > >> > > queue
> > > > > >> > > > > > >> capacity
> > > > > >> > > > > > >> > grows, and increases the task parallelism maybe a
> > > more
> > > > > >> viable
> > > > > >> > > way
> > > > > >> > > > > when
> > > > > >> > > > > > >> > encounter too many retry items for a single task.
> > > > > >> > > > > > >> > We recommend using a proper estimate of queue
> > > capacity
> > > > > >> based
> > > > > >> > on
> > > > > >> > > > the
> > > > > >> > > > > > >> formula
> > > > > >> > > > > > >> > like this: 'inputRate * retryRate *
> > > avgRetryDuration',
> > > > > and
> > > > > >> > also
> > > > > >> > > > the
> > > > > >> > > > > > >> actual
> > > > > >> > > > > > >> > checkpoint duration in runtime.
> > > > > >> > > > > > >> >
> > > > > >> > > > > > >> > > If I understand correctly, the retry queue will
> > be
> > > > put
> > > > > >> into
> > > > > >> > > > > > ListState,
> > > > > >> > > > > > >> > this
> > > > > >> > > > > > >> > state is OperatorState? As far as I know,
> > > OperatorState
> > > > > >> does
> > > > > >> > not
> > > > > >> > > > > have
> > > > > >> > > > > > >> the
> > > > > >> > > > > > >> > ability to store a lot of data.
> > > > > >> > > > > > >> > So after we need to retry more data, we should
> need
> > > to
> > > > > >> block
> > > > > >> > the
> > > > > >> > > > > main
> > > > > >> > > > > > >> > thread? What is the maximum size of the default
> > retry
> > > > > >> queue?
> > > > > >> > > > > > >> >
> > > > > >> > > > > > >> >
> > > > > >> > > > > > >> >
> > > > > >> > > > > > >> > Best,
> > > > > >> > > > > > >> > Lincoln Lee
> > > > > >> > > > > > >> >
> > > > > >> > > > > > >> >
> > > > > >> > > > > > >> > Jingsong Li <jingsongl...@gmail.com>
> 于2022年5月16日周一
> > > > > >> 10:31写道:
> > > > > >> > > > > > >> >
> > > > > >> > > > > > >> > > Thank Lincoln for the proposal.
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > > ## Motivation:
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > > > asyncInvoke and callback functions are
> executed
> > > > > >> > > synchronously
> > > > > >> > > > by
> > > > > >> > > > > > the
> > > > > >> > > > > > >> > main
> > > > > >> > > > > > >> > > thread, which is not suitable adding long time
> > > > blocking
> > > > > >> > > > > operations,
> > > > > >> > > > > > >> and
> > > > > >> > > > > > >> > > introducing additional thread will bring extra
> > > > > complexity
> > > > > >> > for
> > > > > >> > > > > users
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > > According to the documentation of
> AsyncFunction:
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > > > For each #asyncInvoke, an async io operation
> > can
> > > be
> > > > > >> > > triggered,
> > > > > >> > > > > and
> > > > > >> > > > > > >> once
> > > > > >> > > > > > >> > > it has been done, the result can be collected
> by
> > > > > calling
> > > > > >> > > {@link
> > > > > >> > > > > > >> > > ResultFuture#complete}. For each async
> operation,
> > > its
> > > > > >> > context
> > > > > >> > > is
> > > > > >> > > > > > >> stored
> > > > > >> > > > > > >> > in
> > > > > >> > > > > > >> > > the operator immediately after invoking
> > > #asyncInvoke,
> > > > > >> > avoiding
> > > > > >> > > > > > >> blocking
> > > > > >> > > > > > >> > for
> > > > > >> > > > > > >> > > each stream input as long as the internal
> buffer
> > is
> > > > not
> > > > > >> > full.
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > > The real external call should happen in the
> > > > > asynchronous
> > > > > >> > > thread.
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > > My question is: If the user makes a retry in
> this
> > > > > >> > asynchronous
> > > > > >> > > > > > thread
> > > > > >> > > > > > >> by
> > > > > >> > > > > > >> > > themselves, is there a difference between this
> > and
> > > > the
> > > > > >> > current
> > > > > >> > > > > > FLIP's?
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > > ## Block Main Thread
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > > If I understand correctly, the retry queue will
> > be
> > > > put
> > > > > >> into
> > > > > >> > > > > > ListState,
> > > > > >> > > > > > >> > this
> > > > > >> > > > > > >> > > state is OperatorState? As far as I know,
> > > > OperatorState
> > > > > >> does
> > > > > >> > > not
> > > > > >> > > > > > have
> > > > > >> > > > > > >> the
> > > > > >> > > > > > >> > > ability to store a lot of data.
> > > > > >> > > > > > >> > > So after we need to retry more data, we should
> > need
> > > > to
> > > > > >> block
> > > > > >> > > the
> > > > > >> > > > > > main
> > > > > >> > > > > > >> > > thread? What is the maximum size of the default
> > > retry
> > > > > >> queue?
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > > Best,
> > > > > >> > > > > > >> > > Jingsong
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > > On Thu, May 12, 2022 at 8:56 PM Lincoln Lee <
> > > > > >> > > > > lincoln.8...@gmail.com
> > > > > >> > > > > > >
> > > > > >> > > > > > >> > > wrote:
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> > > > Dear Flink developers,
> > > > > >> > > > > > >> > > >
> > > > > >> > > > > > >> > > > I would like to open a discussion on FLIP 232
> > > [1],
> > > > > >> for an
> > > > > >> > > > > > >> extension of
> > > > > >> > > > > > >> > > > AsyncWaitOperator to support retry for user's
> > > > > >> > asyncFunction.
> > > > > >> > > > > > >> > > >
> > > > > >> > > > > > >> > > > To do so, new user interface will added to
> > define
> > > > the
> > > > > >> > > trigger
> > > > > >> > > > > > >> condition
> > > > > >> > > > > > >> > > for
> > > > > >> > > > > > >> > > > retry and when should retry. Internally, a
> > > delayed
> > > > > >> retry
> > > > > >> > > > > mechanism
> > > > > >> > > > > > >> will
> > > > > >> > > > > > >> > > be
> > > > > >> > > > > > >> > > > introduced.
> > > > > >> > > > > > >> > > >
> > > > > >> > > > > > >> > > > There's PoC for this FLIP [2][3], thanks Yun
> > Gao
> > > > for
> > > > > >> > offline
> > > > > >> > > > > > >> > discussions
> > > > > >> > > > > > >> > > > and valuable comments.
> > > > > >> > > > > > >> > > > The new feature is backwards compatible that
> > can
> > > > > >> recover
> > > > > >> > > from
> > > > > >> > > > > > state
> > > > > >> > > > > > >> > which
> > > > > >> > > > > > >> > > > was generated by prior flink versions, and if
> > no
> > > > > retry
> > > > > >> > > > strategy
> > > > > >> > > > > > >> enabled
> > > > > >> > > > > > >> > > the
> > > > > >> > > > > > >> > > > behavior is as before.
> > > > > >> > > > > > >> > > >
> > > > > >> > > > > > >> > > > [1]
> > > > > >> > > > > > >> > > >
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> >
> > > > > >> > > > > > >>
> > > > > >> > > > > >
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
> > > > > >> > > > > > >> > > > [2] based on timer trigger
> > > > > >> > > > > > >> > > >
> > > > > >> > > >
> > > https://github.com/lincoln-lil/flink/pull/new/async-retry-timer
> > > > > >> > > > > > >> > > > [3] based on DelayQueue with pull fashion
> > > > > >> > > > > > >> > > >
> > > > > >> > >
> https://github.com/lincoln-lil/flink/pull/new/async-op-retry
> > > > > >> > > > > > >> > > >
> > > > > >> > > > > > >> > > >
> > > > > >> > > > > > >> > > > Best,
> > > > > >> > > > > > >> > > > Lincoln Lee
> > > > > >> > > > > > >> > > >
> > > > > >> > > > > > >> > >
> > > > > >> > > > > > >> >
> > > > > >> > > > > > >>
> > > > > >> > > > > > >
> > > > > >> > > > > >
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to