Hi Zakelly,

I've thought about it a bit more, and I think only `#execute()` methods
make the most sense to be used when implementing operators (and
interruptible mails), so I will just add `MailOptions` parameters only to
them. If necessary, we can add more in the future.

I have updated the FLIP. If it looks good to you, I would start a voting
thread today/tomorrow.

Best,
Piotrek

czw., 23 maj 2024 o 09:00 Zakelly Lan <zakelly....@gmail.com> napisał(a):

> Hi Piotrek,
>
> Well, compared to this plan, I prefer your previous one, which is more in
> line with the intuition for executors' API, by calling `execute` directly.
> Before the variants get too much, I'd suggest we only do minimum change for
> only "interruptible".
>
> My original thinking is, doubling each method could result in a scenario
> where new methods lack callers. But like you said, for the sake of
> completeness, I could accept the doubling method plan.
>
>
> Thanks & Best,
> Zakelly
>
> On Wed, May 22, 2024 at 5:05 PM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
> > Hi Zakelly,
> >
> > > I suggest not doubling the existing methods. Only providing the
> following
> > one is enough
> >
> > In that case I would prefer to have a complete set of the methods for the
> > sake of completeness. If the number of variants is/would be getting too
> > much, we could convert the class into a builder?
> >
> >
> >
> mailboxExecutor.execute(myThrowingRunnable).setInterriptuble().description("bla
> > %d").arg(42).submit();
> >
> > It could be done in both in the future, if we would ever need to add even
> > more methods, or I could do it now. WDYT?
> >
> > Best,
> > Piotrek
> >
> > śr., 22 maj 2024 o 08:48 Zakelly Lan <zakelly....@gmail.com> napisał(a):
> >
> > > Hi Piotrek,
> > >
> > > `MailOptions` looks good to me. I suggest not doubling the existing
> > > methods. Only providing the following one is enough:
> > >
> > > void execute(
> > > >     MailOptions mailOptions,
> > > >     ThrowingRunnable<? extends Exception> command,
> > > >     String descriptionFormat,
> > > >     Object... descriptionArgs);
> > >
> > >
> > > WDYT?
> > >
> > >
> > > Best,
> > > Zakelly
> > >
> > >
> > > On Wed, May 22, 2024 at 12:53 AM Piotr Nowojski <pnowoj...@apache.org>
> > > wrote:
> > >
> > > > Hi Zakelly and others,
> > > >
> > > > > 1. I'd suggest also providing `isInterruptable()` in `Mail`, and
> the
> > > > > continuation mail will return true. The FLIP-425 will leverage this
> > > queue
> > > > > to execute some state requests, and when the cp arrives, the
> operator
> > > may
> > > > > call `yield()` to drain. It may happen that the continuation mail
> is
> > > > called
> > > > > again in `yield()`. By checking `isInterruptable()`, we can skip
> this
> > > > mail
> > > > > and re-enqueue.
> > > >
> > > > Do you have some suggestions on how `isInterruptible` should be
> > defined?
> > > > Do we have to double the amount of methods in the `MailboxExecutor`,
> to
> > > > provide versions of the existing methods, that would enqueue
> > > > "interruptible"
> > > > versions of mails? Something like:
> > > >
> > > >     default void execute(ThrowingRunnable<? extends Exception>
> command,
> > > > String description) {
> > > >         execute(DEFAULT_OPTIONS, command, description);
> > > >     }
> > > >
> > > >     default void execute(MailOptions options, ThrowingRunnable<?
> > extends
> > > > Exception> command, String description) {
> > > >         execute(options, command, description, EMPTY_ARGS);
> > > >     }
> > > >
> > > >     default void execute(
> > > >             ThrowingRunnable<? extends Exception> command,
> > > >             String descriptionFormat,
> > > >             Object... descriptionArgs) {
> > > >         execute(DEFAULT_OPTIONS, command, descriptionFormat,
> > > > descriptionArgs);
> > > >     }
> > > >
> > > >    void execute(
> > > >             MailOptions options,
> > > >             ThrowingRunnable<? extends Exception> command,
> > > >             String descriptionFormat,
> > > >             Object... descriptionArgs);
> > > >
> > > >    public static class MailOptions {
> > > >         (...)
> > > >         public MailOptions() {
> > > >         }
> > > >
> > > >         MailOptions setIsInterruptible() {
> > > >             this.isInterruptible = true;
> > > >             return this;
> > > >         }
> > > >     }
> > > >
> > > > And usage would be like this:
> > > >
> > > > mailboxExecutor.execute(new MailOptions().setIsInterruptible(), ()
> -> {
> > > > foo(); }, "foo");
> > > >
> > > > ?
> > > >
> > > > Best,
> > > > Piotrek
> > > >
> > > > czw., 16 maj 2024 o 11:26 Rui Fan <1996fan...@gmail.com> napisał(a):
> > > >
> > > > > Hi Piotr,
> > > > >
> > > > > > we checked in the firing timers benchmark [1] and we didn't
> observe
> > > any
> > > > > > performance regression.
> > > > >
> > > > > Thanks for the feedback, it's good news to hear that. I didn't
> notice
> > > > > we already have fireProcessingTimers benchmark.
> > > > >
> > > > > If so, we can follow it after this FLIP is merged.
> > > > >
> > > > > +1 for this FLIP.
> > > > >
> > > > > Best,
> > > > > Rui
> > > > >
> > > > > On Thu, May 16, 2024 at 5:13 PM Piotr Nowojski <
> pnowoj...@apache.org
> > >
> > > > > wrote:
> > > > >
> > > > > > Hi Zakelly,
> > > > > >
> > > > > > > I'm suggesting skipping the continuation mail during draining
> of
> > > > async
> > > > > > state access.
> > > > > >
> > > > > > I see. That makes sense to me now. I will later update the FLIP.
> > > > > >
> > > > > > > the code path will become more complex after this FLIP
> > > > > > due to the addition of shouldIntterupt() checks, right?
> > > > > >
> > > > > > Yes, that's correct.
> > > > > >
> > > > > > > If so, it's better to add a benchmark to check whether the job
> > > > > > > performance regresses when one job has a lot of timers.
> > > > > > > If the performance regresses too much, we need to re-consider
> it.
> > > > > > > Of course, I hope the performance is fine.
> > > > > >
> > > > > > I had the same concerns when initially David Moravek proposed
> this
> > > > > > solution,
> > > > > > but we checked in the firing timers benchmark [1] and we didn't
> > > observe
> > > > > any
> > > > > > performance regression.
> > > > > >
> > > > > > Best,
> > > > > > Piotrek
> > > > > >
> > > > > > [1]
> > http://flink-speed.xyz/timeline/?ben=fireProcessingTimers&env=3
> > > > > >
> > > > > >
> > > > > >
> > > > > > wt., 7 maj 2024 o 09:47 Rui Fan <1996fan...@gmail.com>
> napisał(a):
> > > > > >
> > > > > > > Hi Piotr,
> > > > > > >
> > > > > > > Overall this FLIP is fine for me. I have a minor concern:
> > > > > > > IIUC, the code path will become more complex after this FLIP
> > > > > > > due to the addition of shouldIntterupt() checks, right?
> > > > > > >
> > > > > > > If so, it's better to add a benchmark to check whether the job
> > > > > > > performance regresses when one job has a lot of timers.
> > > > > > > If the performance regresses too much, we need to re-consider
> it.
> > > > > > > Of course, I hope the performance is fine.
> > > > > > >
> > > > > > > Best,
> > > > > > > Rui
> > > > > > >
> > > > > > > On Mon, May 6, 2024 at 6:30 PM Zakelly Lan <
> > zakelly....@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Piotr,
> > > > > > > >
> > > > > > > > I'm saying the scenario where things happen in the following
> > > order:
> > > > > > > > 1. advance watermark and process timers.
> > > > > > > > 2. the cp arrives and interrupts the timer processing, after
> > this
> > > > the
> > > > > > > > continuation mail is in the mailbox queue.
> > > > > > > > 3. `snapshotState` is called, where the async state access
> > > > responses
> > > > > > will
> > > > > > > > be drained by calling `tryYield()` [1]. —— What if the
> > > continuation
> > > > > > mail
> > > > > > > is
> > > > > > > > triggered by `tryYield()`?
> > > > > > > >
> > > > > > > > I'm suggesting skipping the continuation mail during draining
> > of
> > > > > async
> > > > > > > > state access.
> > > > > > > >
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/1904b215e36e4fd48e48ece7ffdf2f1470653130/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java#L305
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Zakelly
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, May 6, 2024 at 6:00 PM Piotr Nowojski <
> > > > pnowoj...@apache.org>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Zakelly,
> > > > > > > > >
> > > > > > > > > Can you elaborate a bit more on what you have in mind? How
> > > > marking
> > > > > > > mails
> > > > > > > > as
> > > > > > > > > interruptible helps with something? If an incoming async
> > state
> > > > > access
> > > > > > > > > response comes, it could just request to interrupt any
> > > currently
> > > > > > > ongoing
> > > > > > > > > computations, regardless the currently executed mail is or
> is
> > > not
> > > > > > > > > interruptible.
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Piotrek
> > > > > > > > >
> > > > > > > > > pon., 6 maj 2024 o 06:33 Zakelly Lan <
> zakelly....@gmail.com>
> > > > > > > napisał(a):
> > > > > > > > >
> > > > > > > > > > Hi Piotr,
> > > > > > > > > >
> > > > > > > > > > Thanks for the improvement, overall +1 for this. I'd
> leave
> > a
> > > > > minor
> > > > > > > > > comment:
> > > > > > > > > >
> > > > > > > > > > 1. I'd suggest also providing `isInterruptable()` in
> > `Mail`,
> > > > and
> > > > > > the
> > > > > > > > > > continuation mail will return true. The FLIP-425 will
> > > leverage
> > > > > this
> > > > > > > > queue
> > > > > > > > > > to execute some state requests, and when the cp arrives,
> > the
> > > > > > operator
> > > > > > > > may
> > > > > > > > > > call `yield()` to drain. It may happen that the
> > continuation
> > > > mail
> > > > > > is
> > > > > > > > > called
> > > > > > > > > > again in `yield()`. By checking `isInterruptable()`, we
> can
> > > > skip
> > > > > > this
> > > > > > > > > mail
> > > > > > > > > > and re-enqueue.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Zakelly
> > > > > > > > > >
> > > > > > > > > > On Wed, May 1, 2024 at 4:35 PM Yanfei Lei <
> > > fredia...@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Thanks for your answers, Piotrek. I got it now.  +1 for
> > > this
> > > > > > > > > improvement.
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Yanfei
> > > > > > > > > > >
> > > > > > > > > > > Stefan Richter <srich...@confluent.io.invalid>
> > > 于2024年4月30日周二
> > > > > > > > 21:30写道:
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for the improvement proposal, I’m +1 for the
> > > change!
> > > > > > > > > > > >
> > > > > > > > > > > > Best,
> > > > > > > > > > > > Stefan
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > > On 30. Apr 2024, at 15:23, Roman Khachatryan <
> > > > > > ro...@apache.org
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks for the proposal, I definitely see the need
> > for
> > > > this
> > > > > > > > > > > improvement, +1.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Regards,
> > > > > > > > > > > > > Roman
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Tue, Apr 30, 2024 at 3:11 PM Piotr Nowojski <
> > > > > > > > > pnowoj...@apache.org
> > > > > > > > > > > <mailto:pnowoj...@apache.org>> wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > >> Hi Yanfei,
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> Thanks for the feedback!
> > > > > > > > > > > > >>
> > > > > > > > > > > > >>> 1. Currently when AbstractStreamOperator or
> > > > > > > > > > AbstractStreamOperatorV2
> > > > > > > > > > > > >>> processes a watermark, the watermark will be sent
> > to
> > > > > > > > downstream,
> > > > > > > > > if
> > > > > > > > > > > > >>> the `InternalTimerServiceImpl#advanceWatermark`
> is
> > > > > > > interrupted,
> > > > > > > > > > when
> > > > > > > > > > > > >>> is the watermark sent downstream?
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> The watermark would be outputted by an operator
> only
> > > > once
> > > > > > all
> > > > > > > > > > relevant
> > > > > > > > > > > > >> timers are fired.
> > > > > > > > > > > > >> In other words, if firing of timers is
> interrupted a
> > > > > > > > continuation
> > > > > > > > > > > mail to
> > > > > > > > > > > > >> continue firing those
> > > > > > > > > > > > >> interrupted timers is created. Watermark will be
> > > emitted
> > > > > > > > > downstream
> > > > > > > > > > > at the
> > > > > > > > > > > > >> end of that
> > > > > > > > > > > > >> continuation mail.
> > > > > > > > > > > > >>
> > > > > > > > > > > > >>> 2. IIUC, processing-timer's firing is also
> > > encapsulated
> > > > > > into
> > > > > > > > mail
> > > > > > > > > > and
> > > > > > > > > > > > >>> executed in mailbox. Is processing-timer allowed
> to
> > > be
> > > > > > > > > interrupted?
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> Yes, both firing processing and even time timers
> > share
> > > > the
> > > > > > > same
> > > > > > > > > code
> > > > > > > > > > > and
> > > > > > > > > > > > >> both will
> > > > > > > > > > > > >> support interruptions in the same way. Actually
> I've
> > > > > renamed
> > > > > > > the
> > > > > > > > > > FLIP
> > > > > > > > > > > from
> > > > > > > > > > > > >>
> > > > > > > > > > > > >>> Interruptible watermarks processing
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> to:
> > > > > > > > > > > > >>
> > > > > > > > > > > > >>> Interruptible timers firing
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> to make this more clear.
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> Best,
> > > > > > > > > > > > >> Piotrek
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> wt., 30 kwi 2024 o 06:08 Yanfei Lei <
> > > > fredia...@gmail.com>
> > > > > > > > > > napisał(a):
> > > > > > > > > > > > >>
> > > > > > > > > > > > >>> Hi Piotrek,
> > > > > > > > > > > > >>>
> > > > > > > > > > > > >>> Thanks for this proposal. It looks like it will
> > > shorten
> > > > > the
> > > > > > > > > > > checkpoint
> > > > > > > > > > > > >>> duration, especially in the case of back
> pressure.
> > +1
> > > > for
> > > > > > it!
> > > > > > > > > I'd
> > > > > > > > > > > > >>> like to ask some questions to understand your
> > > thoughts
> > > > > more
> > > > > > > > > > > precisely.
> > > > > > > > > > > > >>>
> > > > > > > > > > > > >>> 1. Currently when AbstractStreamOperator or
> > > > > > > > > > AbstractStreamOperatorV2
> > > > > > > > > > > > >>> processes a watermark, the watermark will be sent
> > to
> > > > > > > > downstream,
> > > > > > > > > if
> > > > > > > > > > > > >>> the `InternalTimerServiceImpl#advanceWatermark`
> is
> > > > > > > interrupted,
> > > > > > > > > > when
> > > > > > > > > > > > >>> is the watermark sent downstream?
> > > > > > > > > > > > >>> 2. IIUC, processing-timer's firing is also
> > > encapsulated
> > > > > > into
> > > > > > > > mail
> > > > > > > > > > and
> > > > > > > > > > > > >>> executed in mailbox. Is processing-timer allowed
> to
> > > be
> > > > > > > > > interrupted?
> > > > > > > > > > > > >>>
> > > > > > > > > > > > >>> Best regards,
> > > > > > > > > > > > >>> Yanfei
> > > > > > > > > > > > >>>
> > > > > > > > > > > > >>> Piotr Nowojski <pnowoj...@apache.org>
> > 于2024年4月29日周一
> > > > > > 21:57写道:
> > > > > > > > > > > > >>>
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>> Hi all,
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>> I would like to start a discussion on FLIP-443:
> > > > > > > Interruptible
> > > > > > > > > > > watermark
> > > > > > > > > > > > >>>> processing.
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://www.google.com/url?q=https://cwiki.apache.org/confluence/x/qgn9EQ&source=gmail-imap&ust=1715088370000000&usg=AOvVaw0eTZDvLwdZUDai5GqoSGrD
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>> This proposal tries to make Flink's subtask
> thread
> > > > more
> > > > > > > > > responsive
> > > > > > > > > > > when
> > > > > > > > > > > > >>>> processing watermarks/firing timers, and make
> > those
> > > > > > > operations
> > > > > > > > > > > > >>>> interruptible/break them apart into smaller
> steps.
> > > At
> > > > > the
> > > > > > > same
> > > > > > > > > > time,
> > > > > > > > > > > > >> the
> > > > > > > > > > > > >>>> proposed solution could be potentially adopted
> in
> > > > other
> > > > > > > places
> > > > > > > > > in
> > > > > > > > > > > the
> > > > > > > > > > > > >>> code
> > > > > > > > > > > > >>>> base as well, to solve similar problems with
> other
> > > > > > > > flatMap-like
> > > > > > > > > > > > >> operators
> > > > > > > > > > > > >>>> (non windowed joins, aggregations, CepOperator,
> > > ...).
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>> I'm looking forward to your thoughts.
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>> Best,
> > > > > > > > > > > > >>>> Piotrek
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to