Re: [DISCUSS] FLIP-443: Interruptible watermark processing

2024-05-23 Thread Zakelly Lan
Hi Piotrek,

It looks good to me. Thanks for the update!


Best,
Zakelly

On Thu, May 23, 2024 at 7:04 PM Piotr Nowojski 
wrote:

> Hi Zakelly,
>
> I've thought about it a bit more, and I think only `#execute()` methods
> make the most sense to be used when implementing operators (and
> interruptible mails), so I will just add `MailOptions` parameters only to
> them. If necessary, we can add more in the future.
>
> I have updated the FLIP. If it looks good to you, I would start a voting
> thread today/tomorrow.
>
> Best,
> Piotrek
>
> czw., 23 maj 2024 o 09:00 Zakelly Lan  napisał(a):
>
> > Hi Piotrek,
> >
> > Well, compared to this plan, I prefer your previous one, which is more in
> > line with the intuition for executors' API, by calling `execute`
> directly.
> > Before the variants get too much, I'd suggest we only do minimum change
> for
> > only "interruptible".
> >
> > My original thinking is, doubling each method could result in a scenario
> > where new methods lack callers. But like you said, for the sake of
> > completeness, I could accept the doubling method plan.
> >
> >
> > Thanks & Best,
> > Zakelly
> >
> > On Wed, May 22, 2024 at 5:05 PM Piotr Nowojski 
> > wrote:
> >
> > > Hi Zakelly,
> > >
> > > > I suggest not doubling the existing methods. Only providing the
> > following
> > > one is enough
> > >
> > > In that case I would prefer to have a complete set of the methods for
> the
> > > sake of completeness. If the number of variants is/would be getting too
> > > much, we could convert the class into a builder?
> > >
> > >
> > >
> >
> mailboxExecutor.execute(myThrowingRunnable).setInterriptuble().description("bla
> > > %d").arg(42).submit();
> > >
> > > It could be done in both in the future, if we would ever need to add
> even
> > > more methods, or I could do it now. WDYT?
> > >
> > > Best,
> > > Piotrek
> > >
> > > śr., 22 maj 2024 o 08:48 Zakelly Lan 
> napisał(a):
> > >
> > > > Hi Piotrek,
> > > >
> > > > `MailOptions` looks good to me. I suggest not doubling the existing
> > > > methods. Only providing the following one is enough:
> > > >
> > > > void execute(
> > > > > MailOptions mailOptions,
> > > > > ThrowingRunnable command,
> > > > > String descriptionFormat,
> > > > > Object... descriptionArgs);
> > > >
> > > >
> > > > WDYT?
> > > >
> > > >
> > > > Best,
> > > > Zakelly
> > > >
> > > >
> > > > On Wed, May 22, 2024 at 12:53 AM Piotr Nowojski <
> pnowoj...@apache.org>
> > > > wrote:
> > > >
> > > > > Hi Zakelly and others,
> > > > >
> > > > > > 1. I'd suggest also providing `isInterruptable()` in `Mail`, and
> > the
> > > > > > continuation mail will return true. The FLIP-425 will leverage
> this
> > > > queue
> > > > > > to execute some state requests, and when the cp arrives, the
> > operator
> > > > may
> > > > > > call `yield()` to drain. It may happen that the continuation mail
> > is
> > > > > called
> > > > > > again in `yield()`. By checking `isInterruptable()`, we can skip
> > this
> > > > > mail
> > > > > > and re-enqueue.
> > > > >
> > > > > Do you have some suggestions on how `isInterruptible` should be
> > > defined?
> > > > > Do we have to double the amount of methods in the
> `MailboxExecutor`,
> > to
> > > > > provide versions of the existing methods, that would enqueue
> > > > > "interruptible"
> > > > > versions of mails? Something like:
> > > > >
> > > > > default void execute(ThrowingRunnable
> > command,
> > > > > String description) {
> > > > > execute(DEFAULT_OPTIONS, command, description);
> > > > > }
> > > > >
> > > > > default void execute(MailOptions options, ThrowingRunnable > > extends
> > > > > Exception> command, String description) {
> > > > > execute(options, command, description, EMPTY_ARGS);
> > > > > }
> > > > >
> > > > > default void execute(
> > > > > ThrowingRunnable command,
> > > > > String descriptionFormat,
> > > > > Object... descriptionArgs) {
> > > > > execute(DEFAULT_OPTIONS, command, descriptionFormat,
> > > > > descriptionArgs);
> > > > > }
> > > > >
> > > > >void execute(
> > > > > MailOptions options,
> > > > > ThrowingRunnable command,
> > > > > String descriptionFormat,
> > > > > Object... descriptionArgs);
> > > > >
> > > > >public static class MailOptions {
> > > > > (...)
> > > > > public MailOptions() {
> > > > > }
> > > > >
> > > > > MailOptions setIsInterruptible() {
> > > > > this.isInterruptible = true;
> > > > > return this;
> > > > > }
> > > > > }
> > > > >
> > > > > And usage would be like this:
> > > > >
> > > > > mailboxExecutor.execute(new MailOptions().setIsInterruptible(), ()
> > -> {
> > > > > foo(); }, "foo");
> > > > >
> > > > > ?
> > > > >
> > > > > Best,
> > > > > Piotrek
> > > > >
> > > > > czw., 16 maj 2024 o 11:26 Rui Fan <1996fan...@gmail.com>
> napisał(a):
> > > > >
> > > > > > Hi 

Re: [DISCUSS] FLIP-443: Interruptible watermark processing

2024-05-23 Thread Piotr Nowojski
Hi Zakelly,

I've thought about it a bit more, and I think only `#execute()` methods
make the most sense to be used when implementing operators (and
interruptible mails), so I will just add `MailOptions` parameters only to
them. If necessary, we can add more in the future.

I have updated the FLIP. If it looks good to you, I would start a voting
thread today/tomorrow.

Best,
Piotrek

czw., 23 maj 2024 o 09:00 Zakelly Lan  napisał(a):

> Hi Piotrek,
>
> Well, compared to this plan, I prefer your previous one, which is more in
> line with the intuition for executors' API, by calling `execute` directly.
> Before the variants get too much, I'd suggest we only do minimum change for
> only "interruptible".
>
> My original thinking is, doubling each method could result in a scenario
> where new methods lack callers. But like you said, for the sake of
> completeness, I could accept the doubling method plan.
>
>
> Thanks & Best,
> Zakelly
>
> On Wed, May 22, 2024 at 5:05 PM Piotr Nowojski 
> wrote:
>
> > Hi Zakelly,
> >
> > > I suggest not doubling the existing methods. Only providing the
> following
> > one is enough
> >
> > In that case I would prefer to have a complete set of the methods for the
> > sake of completeness. If the number of variants is/would be getting too
> > much, we could convert the class into a builder?
> >
> >
> >
> mailboxExecutor.execute(myThrowingRunnable).setInterriptuble().description("bla
> > %d").arg(42).submit();
> >
> > It could be done in both in the future, if we would ever need to add even
> > more methods, or I could do it now. WDYT?
> >
> > Best,
> > Piotrek
> >
> > śr., 22 maj 2024 o 08:48 Zakelly Lan  napisał(a):
> >
> > > Hi Piotrek,
> > >
> > > `MailOptions` looks good to me. I suggest not doubling the existing
> > > methods. Only providing the following one is enough:
> > >
> > > void execute(
> > > > MailOptions mailOptions,
> > > > ThrowingRunnable command,
> > > > String descriptionFormat,
> > > > Object... descriptionArgs);
> > >
> > >
> > > WDYT?
> > >
> > >
> > > Best,
> > > Zakelly
> > >
> > >
> > > On Wed, May 22, 2024 at 12:53 AM Piotr Nowojski 
> > > wrote:
> > >
> > > > Hi Zakelly and others,
> > > >
> > > > > 1. I'd suggest also providing `isInterruptable()` in `Mail`, and
> the
> > > > > continuation mail will return true. The FLIP-425 will leverage this
> > > queue
> > > > > to execute some state requests, and when the cp arrives, the
> operator
> > > may
> > > > > call `yield()` to drain. It may happen that the continuation mail
> is
> > > > called
> > > > > again in `yield()`. By checking `isInterruptable()`, we can skip
> this
> > > > mail
> > > > > and re-enqueue.
> > > >
> > > > Do you have some suggestions on how `isInterruptible` should be
> > defined?
> > > > Do we have to double the amount of methods in the `MailboxExecutor`,
> to
> > > > provide versions of the existing methods, that would enqueue
> > > > "interruptible"
> > > > versions of mails? Something like:
> > > >
> > > > default void execute(ThrowingRunnable
> command,
> > > > String description) {
> > > > execute(DEFAULT_OPTIONS, command, description);
> > > > }
> > > >
> > > > default void execute(MailOptions options, ThrowingRunnable > extends
> > > > Exception> command, String description) {
> > > > execute(options, command, description, EMPTY_ARGS);
> > > > }
> > > >
> > > > default void execute(
> > > > ThrowingRunnable command,
> > > > String descriptionFormat,
> > > > Object... descriptionArgs) {
> > > > execute(DEFAULT_OPTIONS, command, descriptionFormat,
> > > > descriptionArgs);
> > > > }
> > > >
> > > >void execute(
> > > > MailOptions options,
> > > > ThrowingRunnable command,
> > > > String descriptionFormat,
> > > > Object... descriptionArgs);
> > > >
> > > >public static class MailOptions {
> > > > (...)
> > > > public MailOptions() {
> > > > }
> > > >
> > > > MailOptions setIsInterruptible() {
> > > > this.isInterruptible = true;
> > > > return this;
> > > > }
> > > > }
> > > >
> > > > And usage would be like this:
> > > >
> > > > mailboxExecutor.execute(new MailOptions().setIsInterruptible(), ()
> -> {
> > > > foo(); }, "foo");
> > > >
> > > > ?
> > > >
> > > > Best,
> > > > Piotrek
> > > >
> > > > czw., 16 maj 2024 o 11:26 Rui Fan <1996fan...@gmail.com> napisał(a):
> > > >
> > > > > Hi Piotr,
> > > > >
> > > > > > we checked in the firing timers benchmark [1] and we didn't
> observe
> > > any
> > > > > > performance regression.
> > > > >
> > > > > Thanks for the feedback, it's good news to hear that. I didn't
> notice
> > > > > we already have fireProcessingTimers benchmark.
> > > > >
> > > > > If so, we can follow it after this FLIP is merged.
> > > > >
> > > > > +1 for this FLIP.
> > > > >
> > > > > Best,
> > > > > Rui
> > > > >
> > > > > On Thu, May 

Re: [DISCUSS] FLIP-443: Interruptible watermark processing

2024-05-23 Thread Zakelly Lan
Hi Piotrek,

Well, compared to this plan, I prefer your previous one, which is more in
line with the intuition for executors' API, by calling `execute` directly.
Before the variants get too much, I'd suggest we only do minimum change for
only "interruptible".

My original thinking is, doubling each method could result in a scenario
where new methods lack callers. But like you said, for the sake of
completeness, I could accept the doubling method plan.


Thanks & Best,
Zakelly

On Wed, May 22, 2024 at 5:05 PM Piotr Nowojski  wrote:

> Hi Zakelly,
>
> > I suggest not doubling the existing methods. Only providing the following
> one is enough
>
> In that case I would prefer to have a complete set of the methods for the
> sake of completeness. If the number of variants is/would be getting too
> much, we could convert the class into a builder?
>
>
> mailboxExecutor.execute(myThrowingRunnable).setInterriptuble().description("bla
> %d").arg(42).submit();
>
> It could be done in both in the future, if we would ever need to add even
> more methods, or I could do it now. WDYT?
>
> Best,
> Piotrek
>
> śr., 22 maj 2024 o 08:48 Zakelly Lan  napisał(a):
>
> > Hi Piotrek,
> >
> > `MailOptions` looks good to me. I suggest not doubling the existing
> > methods. Only providing the following one is enough:
> >
> > void execute(
> > > MailOptions mailOptions,
> > > ThrowingRunnable command,
> > > String descriptionFormat,
> > > Object... descriptionArgs);
> >
> >
> > WDYT?
> >
> >
> > Best,
> > Zakelly
> >
> >
> > On Wed, May 22, 2024 at 12:53 AM Piotr Nowojski 
> > wrote:
> >
> > > Hi Zakelly and others,
> > >
> > > > 1. I'd suggest also providing `isInterruptable()` in `Mail`, and the
> > > > continuation mail will return true. The FLIP-425 will leverage this
> > queue
> > > > to execute some state requests, and when the cp arrives, the operator
> > may
> > > > call `yield()` to drain. It may happen that the continuation mail is
> > > called
> > > > again in `yield()`. By checking `isInterruptable()`, we can skip this
> > > mail
> > > > and re-enqueue.
> > >
> > > Do you have some suggestions on how `isInterruptible` should be
> defined?
> > > Do we have to double the amount of methods in the `MailboxExecutor`, to
> > > provide versions of the existing methods, that would enqueue
> > > "interruptible"
> > > versions of mails? Something like:
> > >
> > > default void execute(ThrowingRunnable command,
> > > String description) {
> > > execute(DEFAULT_OPTIONS, command, description);
> > > }
> > >
> > > default void execute(MailOptions options, ThrowingRunnable extends
> > > Exception> command, String description) {
> > > execute(options, command, description, EMPTY_ARGS);
> > > }
> > >
> > > default void execute(
> > > ThrowingRunnable command,
> > > String descriptionFormat,
> > > Object... descriptionArgs) {
> > > execute(DEFAULT_OPTIONS, command, descriptionFormat,
> > > descriptionArgs);
> > > }
> > >
> > >void execute(
> > > MailOptions options,
> > > ThrowingRunnable command,
> > > String descriptionFormat,
> > > Object... descriptionArgs);
> > >
> > >public static class MailOptions {
> > > (...)
> > > public MailOptions() {
> > > }
> > >
> > > MailOptions setIsInterruptible() {
> > > this.isInterruptible = true;
> > > return this;
> > > }
> > > }
> > >
> > > And usage would be like this:
> > >
> > > mailboxExecutor.execute(new MailOptions().setIsInterruptible(), () -> {
> > > foo(); }, "foo");
> > >
> > > ?
> > >
> > > Best,
> > > Piotrek
> > >
> > > czw., 16 maj 2024 o 11:26 Rui Fan <1996fan...@gmail.com> napisał(a):
> > >
> > > > Hi Piotr,
> > > >
> > > > > we checked in the firing timers benchmark [1] and we didn't observe
> > any
> > > > > performance regression.
> > > >
> > > > Thanks for the feedback, it's good news to hear that. I didn't notice
> > > > we already have fireProcessingTimers benchmark.
> > > >
> > > > If so, we can follow it after this FLIP is merged.
> > > >
> > > > +1 for this FLIP.
> > > >
> > > > Best,
> > > > Rui
> > > >
> > > > On Thu, May 16, 2024 at 5:13 PM Piotr Nowojski  >
> > > > wrote:
> > > >
> > > > > Hi Zakelly,
> > > > >
> > > > > > I'm suggesting skipping the continuation mail during draining of
> > > async
> > > > > state access.
> > > > >
> > > > > I see. That makes sense to me now. I will later update the FLIP.
> > > > >
> > > > > > the code path will become more complex after this FLIP
> > > > > due to the addition of shouldIntterupt() checks, right?
> > > > >
> > > > > Yes, that's correct.
> > > > >
> > > > > > If so, it's better to add a benchmark to check whether the job
> > > > > > performance regresses when one job has a lot of timers.
> > > > > > If the performance regresses too much, we need to re-consider it.
> > > > > > Of course, I hope the 

Re: [DISCUSS] FLIP-443: Interruptible watermark processing

2024-05-22 Thread Piotr Nowojski
Hi Zakelly,

> I suggest not doubling the existing methods. Only providing the following
one is enough

In that case I would prefer to have a complete set of the methods for the
sake of completeness. If the number of variants is/would be getting too
much, we could convert the class into a builder?

mailboxExecutor.execute(myThrowingRunnable).setInterriptuble().description("bla
%d").arg(42).submit();

It could be done in both in the future, if we would ever need to add even
more methods, or I could do it now. WDYT?

Best,
Piotrek

śr., 22 maj 2024 o 08:48 Zakelly Lan  napisał(a):

> Hi Piotrek,
>
> `MailOptions` looks good to me. I suggest not doubling the existing
> methods. Only providing the following one is enough:
>
> void execute(
> > MailOptions mailOptions,
> > ThrowingRunnable command,
> > String descriptionFormat,
> > Object... descriptionArgs);
>
>
> WDYT?
>
>
> Best,
> Zakelly
>
>
> On Wed, May 22, 2024 at 12:53 AM Piotr Nowojski 
> wrote:
>
> > Hi Zakelly and others,
> >
> > > 1. I'd suggest also providing `isInterruptable()` in `Mail`, and the
> > > continuation mail will return true. The FLIP-425 will leverage this
> queue
> > > to execute some state requests, and when the cp arrives, the operator
> may
> > > call `yield()` to drain. It may happen that the continuation mail is
> > called
> > > again in `yield()`. By checking `isInterruptable()`, we can skip this
> > mail
> > > and re-enqueue.
> >
> > Do you have some suggestions on how `isInterruptible` should be defined?
> > Do we have to double the amount of methods in the `MailboxExecutor`, to
> > provide versions of the existing methods, that would enqueue
> > "interruptible"
> > versions of mails? Something like:
> >
> > default void execute(ThrowingRunnable command,
> > String description) {
> > execute(DEFAULT_OPTIONS, command, description);
> > }
> >
> > default void execute(MailOptions options, ThrowingRunnable > Exception> command, String description) {
> > execute(options, command, description, EMPTY_ARGS);
> > }
> >
> > default void execute(
> > ThrowingRunnable command,
> > String descriptionFormat,
> > Object... descriptionArgs) {
> > execute(DEFAULT_OPTIONS, command, descriptionFormat,
> > descriptionArgs);
> > }
> >
> >void execute(
> > MailOptions options,
> > ThrowingRunnable command,
> > String descriptionFormat,
> > Object... descriptionArgs);
> >
> >public static class MailOptions {
> > (...)
> > public MailOptions() {
> > }
> >
> > MailOptions setIsInterruptible() {
> > this.isInterruptible = true;
> > return this;
> > }
> > }
> >
> > And usage would be like this:
> >
> > mailboxExecutor.execute(new MailOptions().setIsInterruptible(), () -> {
> > foo(); }, "foo");
> >
> > ?
> >
> > Best,
> > Piotrek
> >
> > czw., 16 maj 2024 o 11:26 Rui Fan <1996fan...@gmail.com> napisał(a):
> >
> > > Hi Piotr,
> > >
> > > > we checked in the firing timers benchmark [1] and we didn't observe
> any
> > > > performance regression.
> > >
> > > Thanks for the feedback, it's good news to hear that. I didn't notice
> > > we already have fireProcessingTimers benchmark.
> > >
> > > If so, we can follow it after this FLIP is merged.
> > >
> > > +1 for this FLIP.
> > >
> > > Best,
> > > Rui
> > >
> > > On Thu, May 16, 2024 at 5:13 PM Piotr Nowojski 
> > > wrote:
> > >
> > > > Hi Zakelly,
> > > >
> > > > > I'm suggesting skipping the continuation mail during draining of
> > async
> > > > state access.
> > > >
> > > > I see. That makes sense to me now. I will later update the FLIP.
> > > >
> > > > > the code path will become more complex after this FLIP
> > > > due to the addition of shouldIntterupt() checks, right?
> > > >
> > > > Yes, that's correct.
> > > >
> > > > > If so, it's better to add a benchmark to check whether the job
> > > > > performance regresses when one job has a lot of timers.
> > > > > If the performance regresses too much, we need to re-consider it.
> > > > > Of course, I hope the performance is fine.
> > > >
> > > > I had the same concerns when initially David Moravek proposed this
> > > > solution,
> > > > but we checked in the firing timers benchmark [1] and we didn't
> observe
> > > any
> > > > performance regression.
> > > >
> > > > Best,
> > > > Piotrek
> > > >
> > > > [1] http://flink-speed.xyz/timeline/?ben=fireProcessingTimers=3
> > > >
> > > >
> > > >
> > > > wt., 7 maj 2024 o 09:47 Rui Fan <1996fan...@gmail.com> napisał(a):
> > > >
> > > > > Hi Piotr,
> > > > >
> > > > > Overall this FLIP is fine for me. I have a minor concern:
> > > > > IIUC, the code path will become more complex after this FLIP
> > > > > due to the addition of shouldIntterupt() checks, right?
> > > > >
> > > > > If so, it's better to add a benchmark to check whether the job
> > > > > performance regresses when one job has a 

Re: [DISCUSS] FLIP-443: Interruptible watermark processing

2024-05-22 Thread Zakelly Lan
Hi Piotrek,

`MailOptions` looks good to me. I suggest not doubling the existing
methods. Only providing the following one is enough:

void execute(
> MailOptions mailOptions,
> ThrowingRunnable command,
> String descriptionFormat,
> Object... descriptionArgs);


WDYT?


Best,
Zakelly


On Wed, May 22, 2024 at 12:53 AM Piotr Nowojski 
wrote:

> Hi Zakelly and others,
>
> > 1. I'd suggest also providing `isInterruptable()` in `Mail`, and the
> > continuation mail will return true. The FLIP-425 will leverage this queue
> > to execute some state requests, and when the cp arrives, the operator may
> > call `yield()` to drain. It may happen that the continuation mail is
> called
> > again in `yield()`. By checking `isInterruptable()`, we can skip this
> mail
> > and re-enqueue.
>
> Do you have some suggestions on how `isInterruptible` should be defined?
> Do we have to double the amount of methods in the `MailboxExecutor`, to
> provide versions of the existing methods, that would enqueue
> "interruptible"
> versions of mails? Something like:
>
> default void execute(ThrowingRunnable command,
> String description) {
> execute(DEFAULT_OPTIONS, command, description);
> }
>
> default void execute(MailOptions options, ThrowingRunnable Exception> command, String description) {
> execute(options, command, description, EMPTY_ARGS);
> }
>
> default void execute(
> ThrowingRunnable command,
> String descriptionFormat,
> Object... descriptionArgs) {
> execute(DEFAULT_OPTIONS, command, descriptionFormat,
> descriptionArgs);
> }
>
>void execute(
> MailOptions options,
> ThrowingRunnable command,
> String descriptionFormat,
> Object... descriptionArgs);
>
>public static class MailOptions {
> (...)
> public MailOptions() {
> }
>
> MailOptions setIsInterruptible() {
> this.isInterruptible = true;
> return this;
> }
> }
>
> And usage would be like this:
>
> mailboxExecutor.execute(new MailOptions().setIsInterruptible(), () -> {
> foo(); }, "foo");
>
> ?
>
> Best,
> Piotrek
>
> czw., 16 maj 2024 o 11:26 Rui Fan <1996fan...@gmail.com> napisał(a):
>
> > Hi Piotr,
> >
> > > we checked in the firing timers benchmark [1] and we didn't observe any
> > > performance regression.
> >
> > Thanks for the feedback, it's good news to hear that. I didn't notice
> > we already have fireProcessingTimers benchmark.
> >
> > If so, we can follow it after this FLIP is merged.
> >
> > +1 for this FLIP.
> >
> > Best,
> > Rui
> >
> > On Thu, May 16, 2024 at 5:13 PM Piotr Nowojski 
> > wrote:
> >
> > > Hi Zakelly,
> > >
> > > > I'm suggesting skipping the continuation mail during draining of
> async
> > > state access.
> > >
> > > I see. That makes sense to me now. I will later update the FLIP.
> > >
> > > > the code path will become more complex after this FLIP
> > > due to the addition of shouldIntterupt() checks, right?
> > >
> > > Yes, that's correct.
> > >
> > > > If so, it's better to add a benchmark to check whether the job
> > > > performance regresses when one job has a lot of timers.
> > > > If the performance regresses too much, we need to re-consider it.
> > > > Of course, I hope the performance is fine.
> > >
> > > I had the same concerns when initially David Moravek proposed this
> > > solution,
> > > but we checked in the firing timers benchmark [1] and we didn't observe
> > any
> > > performance regression.
> > >
> > > Best,
> > > Piotrek
> > >
> > > [1] http://flink-speed.xyz/timeline/?ben=fireProcessingTimers=3
> > >
> > >
> > >
> > > wt., 7 maj 2024 o 09:47 Rui Fan <1996fan...@gmail.com> napisał(a):
> > >
> > > > Hi Piotr,
> > > >
> > > > Overall this FLIP is fine for me. I have a minor concern:
> > > > IIUC, the code path will become more complex after this FLIP
> > > > due to the addition of shouldIntterupt() checks, right?
> > > >
> > > > If so, it's better to add a benchmark to check whether the job
> > > > performance regresses when one job has a lot of timers.
> > > > If the performance regresses too much, we need to re-consider it.
> > > > Of course, I hope the performance is fine.
> > > >
> > > > Best,
> > > > Rui
> > > >
> > > > On Mon, May 6, 2024 at 6:30 PM Zakelly Lan 
> > > wrote:
> > > >
> > > > > Hi Piotr,
> > > > >
> > > > > I'm saying the scenario where things happen in the following order:
> > > > > 1. advance watermark and process timers.
> > > > > 2. the cp arrives and interrupts the timer processing, after this
> the
> > > > > continuation mail is in the mailbox queue.
> > > > > 3. `snapshotState` is called, where the async state access
> responses
> > > will
> > > > > be drained by calling `tryYield()` [1]. —— What if the continuation
> > > mail
> > > > is
> > > > > triggered by `tryYield()`?
> > > > >
> > > > > I'm suggesting skipping the continuation mail during draining of
> > async
> > > 

Re: [DISCUSS] FLIP-443: Interruptible watermark processing

2024-05-21 Thread Piotr Nowojski
Hi Zakelly and others,

> 1. I'd suggest also providing `isInterruptable()` in `Mail`, and the
> continuation mail will return true. The FLIP-425 will leverage this queue
> to execute some state requests, and when the cp arrives, the operator may
> call `yield()` to drain. It may happen that the continuation mail is
called
> again in `yield()`. By checking `isInterruptable()`, we can skip this mail
> and re-enqueue.

Do you have some suggestions on how `isInterruptible` should be defined?
Do we have to double the amount of methods in the `MailboxExecutor`, to
provide versions of the existing methods, that would enqueue
"interruptible"
versions of mails? Something like:

default void execute(ThrowingRunnable command,
String description) {
execute(DEFAULT_OPTIONS, command, description);
}

default void execute(MailOptions options, ThrowingRunnable command, String description) {
execute(options, command, description, EMPTY_ARGS);
}

default void execute(
ThrowingRunnable command,
String descriptionFormat,
Object... descriptionArgs) {
execute(DEFAULT_OPTIONS, command, descriptionFormat,
descriptionArgs);
}

   void execute(
MailOptions options,
ThrowingRunnable command,
String descriptionFormat,
Object... descriptionArgs);

   public static class MailOptions {
(...)
public MailOptions() {
}

MailOptions setIsInterruptible() {
this.isInterruptible = true;
return this;
}
}

And usage would be like this:

mailboxExecutor.execute(new MailOptions().setIsInterruptible(), () -> {
foo(); }, "foo");

?

Best,
Piotrek

czw., 16 maj 2024 o 11:26 Rui Fan <1996fan...@gmail.com> napisał(a):

> Hi Piotr,
>
> > we checked in the firing timers benchmark [1] and we didn't observe any
> > performance regression.
>
> Thanks for the feedback, it's good news to hear that. I didn't notice
> we already have fireProcessingTimers benchmark.
>
> If so, we can follow it after this FLIP is merged.
>
> +1 for this FLIP.
>
> Best,
> Rui
>
> On Thu, May 16, 2024 at 5:13 PM Piotr Nowojski 
> wrote:
>
> > Hi Zakelly,
> >
> > > I'm suggesting skipping the continuation mail during draining of async
> > state access.
> >
> > I see. That makes sense to me now. I will later update the FLIP.
> >
> > > the code path will become more complex after this FLIP
> > due to the addition of shouldIntterupt() checks, right?
> >
> > Yes, that's correct.
> >
> > > If so, it's better to add a benchmark to check whether the job
> > > performance regresses when one job has a lot of timers.
> > > If the performance regresses too much, we need to re-consider it.
> > > Of course, I hope the performance is fine.
> >
> > I had the same concerns when initially David Moravek proposed this
> > solution,
> > but we checked in the firing timers benchmark [1] and we didn't observe
> any
> > performance regression.
> >
> > Best,
> > Piotrek
> >
> > [1] http://flink-speed.xyz/timeline/?ben=fireProcessingTimers=3
> >
> >
> >
> > wt., 7 maj 2024 o 09:47 Rui Fan <1996fan...@gmail.com> napisał(a):
> >
> > > Hi Piotr,
> > >
> > > Overall this FLIP is fine for me. I have a minor concern:
> > > IIUC, the code path will become more complex after this FLIP
> > > due to the addition of shouldIntterupt() checks, right?
> > >
> > > If so, it's better to add a benchmark to check whether the job
> > > performance regresses when one job has a lot of timers.
> > > If the performance regresses too much, we need to re-consider it.
> > > Of course, I hope the performance is fine.
> > >
> > > Best,
> > > Rui
> > >
> > > On Mon, May 6, 2024 at 6:30 PM Zakelly Lan 
> > wrote:
> > >
> > > > Hi Piotr,
> > > >
> > > > I'm saying the scenario where things happen in the following order:
> > > > 1. advance watermark and process timers.
> > > > 2. the cp arrives and interrupts the timer processing, after this the
> > > > continuation mail is in the mailbox queue.
> > > > 3. `snapshotState` is called, where the async state access responses
> > will
> > > > be drained by calling `tryYield()` [1]. —— What if the continuation
> > mail
> > > is
> > > > triggered by `tryYield()`?
> > > >
> > > > I'm suggesting skipping the continuation mail during draining of
> async
> > > > state access.
> > > >
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/1904b215e36e4fd48e48ece7ffdf2f1470653130/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java#L305
> > > >
> > > > Best,
> > > > Zakelly
> > > >
> > > >
> > > > On Mon, May 6, 2024 at 6:00 PM Piotr Nowojski 
> > > > wrote:
> > > >
> > > > > Hi Zakelly,
> > > > >
> > > > > Can you elaborate a bit more on what you have in mind? How marking
> > > mails
> > > > as
> > > > > interruptible helps with something? If an incoming async state
> access
> > > > > response comes, it could just request to 

Re: [DISCUSS] FLIP-443: Interruptible watermark processing

2024-05-16 Thread Rui Fan
Hi Piotr,

> we checked in the firing timers benchmark [1] and we didn't observe any
> performance regression.

Thanks for the feedback, it's good news to hear that. I didn't notice
we already have fireProcessingTimers benchmark.

If so, we can follow it after this FLIP is merged.

+1 for this FLIP.

Best,
Rui

On Thu, May 16, 2024 at 5:13 PM Piotr Nowojski  wrote:

> Hi Zakelly,
>
> > I'm suggesting skipping the continuation mail during draining of async
> state access.
>
> I see. That makes sense to me now. I will later update the FLIP.
>
> > the code path will become more complex after this FLIP
> due to the addition of shouldIntterupt() checks, right?
>
> Yes, that's correct.
>
> > If so, it's better to add a benchmark to check whether the job
> > performance regresses when one job has a lot of timers.
> > If the performance regresses too much, we need to re-consider it.
> > Of course, I hope the performance is fine.
>
> I had the same concerns when initially David Moravek proposed this
> solution,
> but we checked in the firing timers benchmark [1] and we didn't observe any
> performance regression.
>
> Best,
> Piotrek
>
> [1] http://flink-speed.xyz/timeline/?ben=fireProcessingTimers=3
>
>
>
> wt., 7 maj 2024 o 09:47 Rui Fan <1996fan...@gmail.com> napisał(a):
>
> > Hi Piotr,
> >
> > Overall this FLIP is fine for me. I have a minor concern:
> > IIUC, the code path will become more complex after this FLIP
> > due to the addition of shouldIntterupt() checks, right?
> >
> > If so, it's better to add a benchmark to check whether the job
> > performance regresses when one job has a lot of timers.
> > If the performance regresses too much, we need to re-consider it.
> > Of course, I hope the performance is fine.
> >
> > Best,
> > Rui
> >
> > On Mon, May 6, 2024 at 6:30 PM Zakelly Lan 
> wrote:
> >
> > > Hi Piotr,
> > >
> > > I'm saying the scenario where things happen in the following order:
> > > 1. advance watermark and process timers.
> > > 2. the cp arrives and interrupts the timer processing, after this the
> > > continuation mail is in the mailbox queue.
> > > 3. `snapshotState` is called, where the async state access responses
> will
> > > be drained by calling `tryYield()` [1]. —— What if the continuation
> mail
> > is
> > > triggered by `tryYield()`?
> > >
> > > I'm suggesting skipping the continuation mail during draining of async
> > > state access.
> > >
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/apache/flink/blob/1904b215e36e4fd48e48ece7ffdf2f1470653130/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java#L305
> > >
> > > Best,
> > > Zakelly
> > >
> > >
> > > On Mon, May 6, 2024 at 6:00 PM Piotr Nowojski 
> > > wrote:
> > >
> > > > Hi Zakelly,
> > > >
> > > > Can you elaborate a bit more on what you have in mind? How marking
> > mails
> > > as
> > > > interruptible helps with something? If an incoming async state access
> > > > response comes, it could just request to interrupt any currently
> > ongoing
> > > > computations, regardless the currently executed mail is or is not
> > > > interruptible.
> > > >
> > > > Best,
> > > > Piotrek
> > > >
> > > > pon., 6 maj 2024 o 06:33 Zakelly Lan 
> > napisał(a):
> > > >
> > > > > Hi Piotr,
> > > > >
> > > > > Thanks for the improvement, overall +1 for this. I'd leave a minor
> > > > comment:
> > > > >
> > > > > 1. I'd suggest also providing `isInterruptable()` in `Mail`, and
> the
> > > > > continuation mail will return true. The FLIP-425 will leverage this
> > > queue
> > > > > to execute some state requests, and when the cp arrives, the
> operator
> > > may
> > > > > call `yield()` to drain. It may happen that the continuation mail
> is
> > > > called
> > > > > again in `yield()`. By checking `isInterruptable()`, we can skip
> this
> > > > mail
> > > > > and re-enqueue.
> > > > >
> > > > >
> > > > > Best,
> > > > > Zakelly
> > > > >
> > > > > On Wed, May 1, 2024 at 4:35 PM Yanfei Lei 
> > wrote:
> > > > >
> > > > > > Thanks for your answers, Piotrek. I got it now.  +1 for this
> > > > improvement.
> > > > > >
> > > > > > Best,
> > > > > > Yanfei
> > > > > >
> > > > > > Stefan Richter  于2024年4月30日周二
> > > 21:30写道:
> > > > > > >
> > > > > > >
> > > > > > > Thanks for the improvement proposal, I’m +1 for the change!
> > > > > > >
> > > > > > > Best,
> > > > > > > Stefan
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > > On 30. Apr 2024, at 15:23, Roman Khachatryan <
> ro...@apache.org
> > >
> > > > > wrote:
> > > > > > > >
> > > > > > > > Thanks for the proposal, I definitely see the need for this
> > > > > > improvement, +1.
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Roman
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, Apr 30, 2024 at 3:11 PM Piotr Nowojski <
> > > > pnowoj...@apache.org
> > > > > > > wrote:
> > > > > > > >
> > > > > > > >> Hi Yanfei,
> > > > > > > >>
> > > > > > > >> Thanks for the feedback!
> > > > > > > >>
> > > > > > > 

Re: [DISCUSS] FLIP-443: Interruptible watermark processing

2024-05-16 Thread Piotr Nowojski
Hi Zakelly,

> I'm suggesting skipping the continuation mail during draining of async
state access.

I see. That makes sense to me now. I will later update the FLIP.

> the code path will become more complex after this FLIP
due to the addition of shouldIntterupt() checks, right?

Yes, that's correct.

> If so, it's better to add a benchmark to check whether the job
> performance regresses when one job has a lot of timers.
> If the performance regresses too much, we need to re-consider it.
> Of course, I hope the performance is fine.

I had the same concerns when initially David Moravek proposed this
solution,
but we checked in the firing timers benchmark [1] and we didn't observe any
performance regression.

Best,
Piotrek

[1] http://flink-speed.xyz/timeline/?ben=fireProcessingTimers=3



wt., 7 maj 2024 o 09:47 Rui Fan <1996fan...@gmail.com> napisał(a):

> Hi Piotr,
>
> Overall this FLIP is fine for me. I have a minor concern:
> IIUC, the code path will become more complex after this FLIP
> due to the addition of shouldIntterupt() checks, right?
>
> If so, it's better to add a benchmark to check whether the job
> performance regresses when one job has a lot of timers.
> If the performance regresses too much, we need to re-consider it.
> Of course, I hope the performance is fine.
>
> Best,
> Rui
>
> On Mon, May 6, 2024 at 6:30 PM Zakelly Lan  wrote:
>
> > Hi Piotr,
> >
> > I'm saying the scenario where things happen in the following order:
> > 1. advance watermark and process timers.
> > 2. the cp arrives and interrupts the timer processing, after this the
> > continuation mail is in the mailbox queue.
> > 3. `snapshotState` is called, where the async state access responses will
> > be drained by calling `tryYield()` [1]. —— What if the continuation mail
> is
> > triggered by `tryYield()`?
> >
> > I'm suggesting skipping the continuation mail during draining of async
> > state access.
> >
> >
> > [1]
> >
> >
> https://github.com/apache/flink/blob/1904b215e36e4fd48e48ece7ffdf2f1470653130/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java#L305
> >
> > Best,
> > Zakelly
> >
> >
> > On Mon, May 6, 2024 at 6:00 PM Piotr Nowojski 
> > wrote:
> >
> > > Hi Zakelly,
> > >
> > > Can you elaborate a bit more on what you have in mind? How marking
> mails
> > as
> > > interruptible helps with something? If an incoming async state access
> > > response comes, it could just request to interrupt any currently
> ongoing
> > > computations, regardless the currently executed mail is or is not
> > > interruptible.
> > >
> > > Best,
> > > Piotrek
> > >
> > > pon., 6 maj 2024 o 06:33 Zakelly Lan 
> napisał(a):
> > >
> > > > Hi Piotr,
> > > >
> > > > Thanks for the improvement, overall +1 for this. I'd leave a minor
> > > comment:
> > > >
> > > > 1. I'd suggest also providing `isInterruptable()` in `Mail`, and the
> > > > continuation mail will return true. The FLIP-425 will leverage this
> > queue
> > > > to execute some state requests, and when the cp arrives, the operator
> > may
> > > > call `yield()` to drain. It may happen that the continuation mail is
> > > called
> > > > again in `yield()`. By checking `isInterruptable()`, we can skip this
> > > mail
> > > > and re-enqueue.
> > > >
> > > >
> > > > Best,
> > > > Zakelly
> > > >
> > > > On Wed, May 1, 2024 at 4:35 PM Yanfei Lei 
> wrote:
> > > >
> > > > > Thanks for your answers, Piotrek. I got it now.  +1 for this
> > > improvement.
> > > > >
> > > > > Best,
> > > > > Yanfei
> > > > >
> > > > > Stefan Richter  于2024年4月30日周二
> > 21:30写道:
> > > > > >
> > > > > >
> > > > > > Thanks for the improvement proposal, I’m +1 for the change!
> > > > > >
> > > > > > Best,
> > > > > > Stefan
> > > > > >
> > > > > >
> > > > > >
> > > > > > > On 30. Apr 2024, at 15:23, Roman Khachatryan  >
> > > > wrote:
> > > > > > >
> > > > > > > Thanks for the proposal, I definitely see the need for this
> > > > > improvement, +1.
> > > > > > >
> > > > > > > Regards,
> > > > > > > Roman
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Apr 30, 2024 at 3:11 PM Piotr Nowojski <
> > > pnowoj...@apache.org
> > > > > > wrote:
> > > > > > >
> > > > > > >> Hi Yanfei,
> > > > > > >>
> > > > > > >> Thanks for the feedback!
> > > > > > >>
> > > > > > >>> 1. Currently when AbstractStreamOperator or
> > > > AbstractStreamOperatorV2
> > > > > > >>> processes a watermark, the watermark will be sent to
> > downstream,
> > > if
> > > > > > >>> the `InternalTimerServiceImpl#advanceWatermark` is
> interrupted,
> > > > when
> > > > > > >>> is the watermark sent downstream?
> > > > > > >>
> > > > > > >> The watermark would be outputted by an operator only once all
> > > > relevant
> > > > > > >> timers are fired.
> > > > > > >> In other words, if firing of timers is interrupted a
> > continuation
> > > > > mail to
> > > > > > >> continue firing those
> > > > > > >> interrupted timers is created. Watermark will be emitted
> > > downstream
> > > 

Re: [DISCUSS] FLIP-443: Interruptible watermark processing

2024-05-07 Thread Rui Fan
Hi Piotr,

Overall this FLIP is fine for me. I have a minor concern:
IIUC, the code path will become more complex after this FLIP
due to the addition of shouldIntterupt() checks, right?

If so, it's better to add a benchmark to check whether the job
performance regresses when one job has a lot of timers.
If the performance regresses too much, we need to re-consider it.
Of course, I hope the performance is fine.

Best,
Rui

On Mon, May 6, 2024 at 6:30 PM Zakelly Lan  wrote:

> Hi Piotr,
>
> I'm saying the scenario where things happen in the following order:
> 1. advance watermark and process timers.
> 2. the cp arrives and interrupts the timer processing, after this the
> continuation mail is in the mailbox queue.
> 3. `snapshotState` is called, where the async state access responses will
> be drained by calling `tryYield()` [1]. —— What if the continuation mail is
> triggered by `tryYield()`?
>
> I'm suggesting skipping the continuation mail during draining of async
> state access.
>
>
> [1]
>
> https://github.com/apache/flink/blob/1904b215e36e4fd48e48ece7ffdf2f1470653130/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java#L305
>
> Best,
> Zakelly
>
>
> On Mon, May 6, 2024 at 6:00 PM Piotr Nowojski 
> wrote:
>
> > Hi Zakelly,
> >
> > Can you elaborate a bit more on what you have in mind? How marking mails
> as
> > interruptible helps with something? If an incoming async state access
> > response comes, it could just request to interrupt any currently ongoing
> > computations, regardless the currently executed mail is or is not
> > interruptible.
> >
> > Best,
> > Piotrek
> >
> > pon., 6 maj 2024 o 06:33 Zakelly Lan  napisał(a):
> >
> > > Hi Piotr,
> > >
> > > Thanks for the improvement, overall +1 for this. I'd leave a minor
> > comment:
> > >
> > > 1. I'd suggest also providing `isInterruptable()` in `Mail`, and the
> > > continuation mail will return true. The FLIP-425 will leverage this
> queue
> > > to execute some state requests, and when the cp arrives, the operator
> may
> > > call `yield()` to drain. It may happen that the continuation mail is
> > called
> > > again in `yield()`. By checking `isInterruptable()`, we can skip this
> > mail
> > > and re-enqueue.
> > >
> > >
> > > Best,
> > > Zakelly
> > >
> > > On Wed, May 1, 2024 at 4:35 PM Yanfei Lei  wrote:
> > >
> > > > Thanks for your answers, Piotrek. I got it now.  +1 for this
> > improvement.
> > > >
> > > > Best,
> > > > Yanfei
> > > >
> > > > Stefan Richter  于2024年4月30日周二
> 21:30写道:
> > > > >
> > > > >
> > > > > Thanks for the improvement proposal, I’m +1 for the change!
> > > > >
> > > > > Best,
> > > > > Stefan
> > > > >
> > > > >
> > > > >
> > > > > > On 30. Apr 2024, at 15:23, Roman Khachatryan 
> > > wrote:
> > > > > >
> > > > > > Thanks for the proposal, I definitely see the need for this
> > > > improvement, +1.
> > > > > >
> > > > > > Regards,
> > > > > > Roman
> > > > > >
> > > > > >
> > > > > > On Tue, Apr 30, 2024 at 3:11 PM Piotr Nowojski <
> > pnowoj...@apache.org
> > > > > wrote:
> > > > > >
> > > > > >> Hi Yanfei,
> > > > > >>
> > > > > >> Thanks for the feedback!
> > > > > >>
> > > > > >>> 1. Currently when AbstractStreamOperator or
> > > AbstractStreamOperatorV2
> > > > > >>> processes a watermark, the watermark will be sent to
> downstream,
> > if
> > > > > >>> the `InternalTimerServiceImpl#advanceWatermark` is interrupted,
> > > when
> > > > > >>> is the watermark sent downstream?
> > > > > >>
> > > > > >> The watermark would be outputted by an operator only once all
> > > relevant
> > > > > >> timers are fired.
> > > > > >> In other words, if firing of timers is interrupted a
> continuation
> > > > mail to
> > > > > >> continue firing those
> > > > > >> interrupted timers is created. Watermark will be emitted
> > downstream
> > > > at the
> > > > > >> end of that
> > > > > >> continuation mail.
> > > > > >>
> > > > > >>> 2. IIUC, processing-timer's firing is also encapsulated into
> mail
> > > and
> > > > > >>> executed in mailbox. Is processing-timer allowed to be
> > interrupted?
> > > > > >>
> > > > > >> Yes, both firing processing and even time timers share the same
> > code
> > > > and
> > > > > >> both will
> > > > > >> support interruptions in the same way. Actually I've renamed the
> > > FLIP
> > > > from
> > > > > >>
> > > > > >>> Interruptible watermarks processing
> > > > > >>
> > > > > >> to:
> > > > > >>
> > > > > >>> Interruptible timers firing
> > > > > >>
> > > > > >> to make this more clear.
> > > > > >>
> > > > > >> Best,
> > > > > >> Piotrek
> > > > > >>
> > > > > >> wt., 30 kwi 2024 o 06:08 Yanfei Lei 
> > > napisał(a):
> > > > > >>
> > > > > >>> Hi Piotrek,
> > > > > >>>
> > > > > >>> Thanks for this proposal. It looks like it will shorten the
> > > > checkpoint
> > > > > >>> duration, especially in the case of back pressure. +1 for it!
> > I'd
> > > > > >>> like to ask some questions to understand your thoughts more
> > > > 

Re: [DISCUSS] FLIP-443: Interruptible watermark processing

2024-05-06 Thread Zakelly Lan
Hi Piotr,

I'm saying the scenario where things happen in the following order:
1. advance watermark and process timers.
2. the cp arrives and interrupts the timer processing, after this the
continuation mail is in the mailbox queue.
3. `snapshotState` is called, where the async state access responses will
be drained by calling `tryYield()` [1]. —— What if the continuation mail is
triggered by `tryYield()`?

I'm suggesting skipping the continuation mail during draining of async
state access.


[1]
https://github.com/apache/flink/blob/1904b215e36e4fd48e48ece7ffdf2f1470653130/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java#L305

Best,
Zakelly


On Mon, May 6, 2024 at 6:00 PM Piotr Nowojski  wrote:

> Hi Zakelly,
>
> Can you elaborate a bit more on what you have in mind? How marking mails as
> interruptible helps with something? If an incoming async state access
> response comes, it could just request to interrupt any currently ongoing
> computations, regardless the currently executed mail is or is not
> interruptible.
>
> Best,
> Piotrek
>
> pon., 6 maj 2024 o 06:33 Zakelly Lan  napisał(a):
>
> > Hi Piotr,
> >
> > Thanks for the improvement, overall +1 for this. I'd leave a minor
> comment:
> >
> > 1. I'd suggest also providing `isInterruptable()` in `Mail`, and the
> > continuation mail will return true. The FLIP-425 will leverage this queue
> > to execute some state requests, and when the cp arrives, the operator may
> > call `yield()` to drain. It may happen that the continuation mail is
> called
> > again in `yield()`. By checking `isInterruptable()`, we can skip this
> mail
> > and re-enqueue.
> >
> >
> > Best,
> > Zakelly
> >
> > On Wed, May 1, 2024 at 4:35 PM Yanfei Lei  wrote:
> >
> > > Thanks for your answers, Piotrek. I got it now.  +1 for this
> improvement.
> > >
> > > Best,
> > > Yanfei
> > >
> > > Stefan Richter  于2024年4月30日周二 21:30写道:
> > > >
> > > >
> > > > Thanks for the improvement proposal, I’m +1 for the change!
> > > >
> > > > Best,
> > > > Stefan
> > > >
> > > >
> > > >
> > > > > On 30. Apr 2024, at 15:23, Roman Khachatryan 
> > wrote:
> > > > >
> > > > > Thanks for the proposal, I definitely see the need for this
> > > improvement, +1.
> > > > >
> > > > > Regards,
> > > > > Roman
> > > > >
> > > > >
> > > > > On Tue, Apr 30, 2024 at 3:11 PM Piotr Nowojski <
> pnowoj...@apache.org
> > > > wrote:
> > > > >
> > > > >> Hi Yanfei,
> > > > >>
> > > > >> Thanks for the feedback!
> > > > >>
> > > > >>> 1. Currently when AbstractStreamOperator or
> > AbstractStreamOperatorV2
> > > > >>> processes a watermark, the watermark will be sent to downstream,
> if
> > > > >>> the `InternalTimerServiceImpl#advanceWatermark` is interrupted,
> > when
> > > > >>> is the watermark sent downstream?
> > > > >>
> > > > >> The watermark would be outputted by an operator only once all
> > relevant
> > > > >> timers are fired.
> > > > >> In other words, if firing of timers is interrupted a continuation
> > > mail to
> > > > >> continue firing those
> > > > >> interrupted timers is created. Watermark will be emitted
> downstream
> > > at the
> > > > >> end of that
> > > > >> continuation mail.
> > > > >>
> > > > >>> 2. IIUC, processing-timer's firing is also encapsulated into mail
> > and
> > > > >>> executed in mailbox. Is processing-timer allowed to be
> interrupted?
> > > > >>
> > > > >> Yes, both firing processing and even time timers share the same
> code
> > > and
> > > > >> both will
> > > > >> support interruptions in the same way. Actually I've renamed the
> > FLIP
> > > from
> > > > >>
> > > > >>> Interruptible watermarks processing
> > > > >>
> > > > >> to:
> > > > >>
> > > > >>> Interruptible timers firing
> > > > >>
> > > > >> to make this more clear.
> > > > >>
> > > > >> Best,
> > > > >> Piotrek
> > > > >>
> > > > >> wt., 30 kwi 2024 o 06:08 Yanfei Lei 
> > napisał(a):
> > > > >>
> > > > >>> Hi Piotrek,
> > > > >>>
> > > > >>> Thanks for this proposal. It looks like it will shorten the
> > > checkpoint
> > > > >>> duration, especially in the case of back pressure. +1 for it!
> I'd
> > > > >>> like to ask some questions to understand your thoughts more
> > > precisely.
> > > > >>>
> > > > >>> 1. Currently when AbstractStreamOperator or
> > AbstractStreamOperatorV2
> > > > >>> processes a watermark, the watermark will be sent to downstream,
> if
> > > > >>> the `InternalTimerServiceImpl#advanceWatermark` is interrupted,
> > when
> > > > >>> is the watermark sent downstream?
> > > > >>> 2. IIUC, processing-timer's firing is also encapsulated into mail
> > and
> > > > >>> executed in mailbox. Is processing-timer allowed to be
> interrupted?
> > > > >>>
> > > > >>> Best regards,
> > > > >>> Yanfei
> > > > >>>
> > > > >>> Piotr Nowojski  于2024年4月29日周一 21:57写道:
> > > > >>>
> > > > 
> > > >  Hi all,
> > > > 
> > > >  I would like to start a discussion on FLIP-443: Interruptible
> > > watermark
> > > >  

Re: [DISCUSS] FLIP-443: Interruptible watermark processing

2024-05-06 Thread Piotr Nowojski
Hi Zakelly,

Can you elaborate a bit more on what you have in mind? How marking mails as
interruptible helps with something? If an incoming async state access
response comes, it could just request to interrupt any currently ongoing
computations, regardless the currently executed mail is or is not
interruptible.

Best,
Piotrek

pon., 6 maj 2024 o 06:33 Zakelly Lan  napisał(a):

> Hi Piotr,
>
> Thanks for the improvement, overall +1 for this. I'd leave a minor comment:
>
> 1. I'd suggest also providing `isInterruptable()` in `Mail`, and the
> continuation mail will return true. The FLIP-425 will leverage this queue
> to execute some state requests, and when the cp arrives, the operator may
> call `yield()` to drain. It may happen that the continuation mail is called
> again in `yield()`. By checking `isInterruptable()`, we can skip this mail
> and re-enqueue.
>
>
> Best,
> Zakelly
>
> On Wed, May 1, 2024 at 4:35 PM Yanfei Lei  wrote:
>
> > Thanks for your answers, Piotrek. I got it now.  +1 for this improvement.
> >
> > Best,
> > Yanfei
> >
> > Stefan Richter  于2024年4月30日周二 21:30写道:
> > >
> > >
> > > Thanks for the improvement proposal, I’m +1 for the change!
> > >
> > > Best,
> > > Stefan
> > >
> > >
> > >
> > > > On 30. Apr 2024, at 15:23, Roman Khachatryan 
> wrote:
> > > >
> > > > Thanks for the proposal, I definitely see the need for this
> > improvement, +1.
> > > >
> > > > Regards,
> > > > Roman
> > > >
> > > >
> > > > On Tue, Apr 30, 2024 at 3:11 PM Piotr Nowojski  > > wrote:
> > > >
> > > >> Hi Yanfei,
> > > >>
> > > >> Thanks for the feedback!
> > > >>
> > > >>> 1. Currently when AbstractStreamOperator or
> AbstractStreamOperatorV2
> > > >>> processes a watermark, the watermark will be sent to downstream, if
> > > >>> the `InternalTimerServiceImpl#advanceWatermark` is interrupted,
> when
> > > >>> is the watermark sent downstream?
> > > >>
> > > >> The watermark would be outputted by an operator only once all
> relevant
> > > >> timers are fired.
> > > >> In other words, if firing of timers is interrupted a continuation
> > mail to
> > > >> continue firing those
> > > >> interrupted timers is created. Watermark will be emitted downstream
> > at the
> > > >> end of that
> > > >> continuation mail.
> > > >>
> > > >>> 2. IIUC, processing-timer's firing is also encapsulated into mail
> and
> > > >>> executed in mailbox. Is processing-timer allowed to be interrupted?
> > > >>
> > > >> Yes, both firing processing and even time timers share the same code
> > and
> > > >> both will
> > > >> support interruptions in the same way. Actually I've renamed the
> FLIP
> > from
> > > >>
> > > >>> Interruptible watermarks processing
> > > >>
> > > >> to:
> > > >>
> > > >>> Interruptible timers firing
> > > >>
> > > >> to make this more clear.
> > > >>
> > > >> Best,
> > > >> Piotrek
> > > >>
> > > >> wt., 30 kwi 2024 o 06:08 Yanfei Lei 
> napisał(a):
> > > >>
> > > >>> Hi Piotrek,
> > > >>>
> > > >>> Thanks for this proposal. It looks like it will shorten the
> > checkpoint
> > > >>> duration, especially in the case of back pressure. +1 for it!  I'd
> > > >>> like to ask some questions to understand your thoughts more
> > precisely.
> > > >>>
> > > >>> 1. Currently when AbstractStreamOperator or
> AbstractStreamOperatorV2
> > > >>> processes a watermark, the watermark will be sent to downstream, if
> > > >>> the `InternalTimerServiceImpl#advanceWatermark` is interrupted,
> when
> > > >>> is the watermark sent downstream?
> > > >>> 2. IIUC, processing-timer's firing is also encapsulated into mail
> and
> > > >>> executed in mailbox. Is processing-timer allowed to be interrupted?
> > > >>>
> > > >>> Best regards,
> > > >>> Yanfei
> > > >>>
> > > >>> Piotr Nowojski  于2024年4月29日周一 21:57写道:
> > > >>>
> > > 
> > >  Hi all,
> > > 
> > >  I would like to start a discussion on FLIP-443: Interruptible
> > watermark
> > >  processing.
> > > 
> > > 
> >
> https://www.google.com/url?q=https://cwiki.apache.org/confluence/x/qgn9EQ=gmail-imap=171508837000=AOvVaw0eTZDvLwdZUDai5GqoSGrD
> > > 
> > >  This proposal tries to make Flink's subtask thread more responsive
> > when
> > >  processing watermarks/firing timers, and make those operations
> > >  interruptible/break them apart into smaller steps. At the same
> time,
> > > >> the
> > >  proposed solution could be potentially adopted in other places in
> > the
> > > >>> code
> > >  base as well, to solve similar problems with other flatMap-like
> > > >> operators
> > >  (non windowed joins, aggregations, CepOperator, ...).
> > > 
> > >  I'm looking forward to your thoughts.
> > > 
> > >  Best,
> > >  Piotrek
> > >
> >
>


Re: [DISCUSS] FLIP-443: Interruptible watermark processing

2024-05-05 Thread Zakelly Lan
Hi Piotr,

Thanks for the improvement, overall +1 for this. I'd leave a minor comment:

1. I'd suggest also providing `isInterruptable()` in `Mail`, and the
continuation mail will return true. The FLIP-425 will leverage this queue
to execute some state requests, and when the cp arrives, the operator may
call `yield()` to drain. It may happen that the continuation mail is called
again in `yield()`. By checking `isInterruptable()`, we can skip this mail
and re-enqueue.


Best,
Zakelly

On Wed, May 1, 2024 at 4:35 PM Yanfei Lei  wrote:

> Thanks for your answers, Piotrek. I got it now.  +1 for this improvement.
>
> Best,
> Yanfei
>
> Stefan Richter  于2024年4月30日周二 21:30写道:
> >
> >
> > Thanks for the improvement proposal, I’m +1 for the change!
> >
> > Best,
> > Stefan
> >
> >
> >
> > > On 30. Apr 2024, at 15:23, Roman Khachatryan  wrote:
> > >
> > > Thanks for the proposal, I definitely see the need for this
> improvement, +1.
> > >
> > > Regards,
> > > Roman
> > >
> > >
> > > On Tue, Apr 30, 2024 at 3:11 PM Piotr Nowojski  > wrote:
> > >
> > >> Hi Yanfei,
> > >>
> > >> Thanks for the feedback!
> > >>
> > >>> 1. Currently when AbstractStreamOperator or AbstractStreamOperatorV2
> > >>> processes a watermark, the watermark will be sent to downstream, if
> > >>> the `InternalTimerServiceImpl#advanceWatermark` is interrupted, when
> > >>> is the watermark sent downstream?
> > >>
> > >> The watermark would be outputted by an operator only once all relevant
> > >> timers are fired.
> > >> In other words, if firing of timers is interrupted a continuation
> mail to
> > >> continue firing those
> > >> interrupted timers is created. Watermark will be emitted downstream
> at the
> > >> end of that
> > >> continuation mail.
> > >>
> > >>> 2. IIUC, processing-timer's firing is also encapsulated into mail and
> > >>> executed in mailbox. Is processing-timer allowed to be interrupted?
> > >>
> > >> Yes, both firing processing and even time timers share the same code
> and
> > >> both will
> > >> support interruptions in the same way. Actually I've renamed the FLIP
> from
> > >>
> > >>> Interruptible watermarks processing
> > >>
> > >> to:
> > >>
> > >>> Interruptible timers firing
> > >>
> > >> to make this more clear.
> > >>
> > >> Best,
> > >> Piotrek
> > >>
> > >> wt., 30 kwi 2024 o 06:08 Yanfei Lei  napisał(a):
> > >>
> > >>> Hi Piotrek,
> > >>>
> > >>> Thanks for this proposal. It looks like it will shorten the
> checkpoint
> > >>> duration, especially in the case of back pressure. +1 for it!  I'd
> > >>> like to ask some questions to understand your thoughts more
> precisely.
> > >>>
> > >>> 1. Currently when AbstractStreamOperator or AbstractStreamOperatorV2
> > >>> processes a watermark, the watermark will be sent to downstream, if
> > >>> the `InternalTimerServiceImpl#advanceWatermark` is interrupted, when
> > >>> is the watermark sent downstream?
> > >>> 2. IIUC, processing-timer's firing is also encapsulated into mail and
> > >>> executed in mailbox. Is processing-timer allowed to be interrupted?
> > >>>
> > >>> Best regards,
> > >>> Yanfei
> > >>>
> > >>> Piotr Nowojski  于2024年4月29日周一 21:57写道:
> > >>>
> > 
> >  Hi all,
> > 
> >  I would like to start a discussion on FLIP-443: Interruptible
> watermark
> >  processing.
> > 
> > 
> https://www.google.com/url?q=https://cwiki.apache.org/confluence/x/qgn9EQ=gmail-imap=171508837000=AOvVaw0eTZDvLwdZUDai5GqoSGrD
> > 
> >  This proposal tries to make Flink's subtask thread more responsive
> when
> >  processing watermarks/firing timers, and make those operations
> >  interruptible/break them apart into smaller steps. At the same time,
> > >> the
> >  proposed solution could be potentially adopted in other places in
> the
> > >>> code
> >  base as well, to solve similar problems with other flatMap-like
> > >> operators
> >  (non windowed joins, aggregations, CepOperator, ...).
> > 
> >  I'm looking forward to your thoughts.
> > 
> >  Best,
> >  Piotrek
> >
>


Re: [DISCUSS] FLIP-443: Interruptible watermark processing

2024-05-01 Thread Yanfei Lei
Thanks for your answers, Piotrek. I got it now.  +1 for this improvement.

Best,
Yanfei

Stefan Richter  于2024年4月30日周二 21:30写道:
>
>
> Thanks for the improvement proposal, I’m +1 for the change!
>
> Best,
> Stefan
>
>
>
> > On 30. Apr 2024, at 15:23, Roman Khachatryan  wrote:
> >
> > Thanks for the proposal, I definitely see the need for this improvement, +1.
> >
> > Regards,
> > Roman
> >
> >
> > On Tue, Apr 30, 2024 at 3:11 PM Piotr Nowojski  > > wrote:
> >
> >> Hi Yanfei,
> >>
> >> Thanks for the feedback!
> >>
> >>> 1. Currently when AbstractStreamOperator or AbstractStreamOperatorV2
> >>> processes a watermark, the watermark will be sent to downstream, if
> >>> the `InternalTimerServiceImpl#advanceWatermark` is interrupted, when
> >>> is the watermark sent downstream?
> >>
> >> The watermark would be outputted by an operator only once all relevant
> >> timers are fired.
> >> In other words, if firing of timers is interrupted a continuation mail to
> >> continue firing those
> >> interrupted timers is created. Watermark will be emitted downstream at the
> >> end of that
> >> continuation mail.
> >>
> >>> 2. IIUC, processing-timer's firing is also encapsulated into mail and
> >>> executed in mailbox. Is processing-timer allowed to be interrupted?
> >>
> >> Yes, both firing processing and even time timers share the same code and
> >> both will
> >> support interruptions in the same way. Actually I've renamed the FLIP from
> >>
> >>> Interruptible watermarks processing
> >>
> >> to:
> >>
> >>> Interruptible timers firing
> >>
> >> to make this more clear.
> >>
> >> Best,
> >> Piotrek
> >>
> >> wt., 30 kwi 2024 o 06:08 Yanfei Lei  napisał(a):
> >>
> >>> Hi Piotrek,
> >>>
> >>> Thanks for this proposal. It looks like it will shorten the checkpoint
> >>> duration, especially in the case of back pressure. +1 for it!  I'd
> >>> like to ask some questions to understand your thoughts more precisely.
> >>>
> >>> 1. Currently when AbstractStreamOperator or AbstractStreamOperatorV2
> >>> processes a watermark, the watermark will be sent to downstream, if
> >>> the `InternalTimerServiceImpl#advanceWatermark` is interrupted, when
> >>> is the watermark sent downstream?
> >>> 2. IIUC, processing-timer's firing is also encapsulated into mail and
> >>> executed in mailbox. Is processing-timer allowed to be interrupted?
> >>>
> >>> Best regards,
> >>> Yanfei
> >>>
> >>> Piotr Nowojski  于2024年4月29日周一 21:57写道:
> >>>
> 
>  Hi all,
> 
>  I would like to start a discussion on FLIP-443: Interruptible watermark
>  processing.
> 
>  https://www.google.com/url?q=https://cwiki.apache.org/confluence/x/qgn9EQ=gmail-imap=171508837000=AOvVaw0eTZDvLwdZUDai5GqoSGrD
> 
>  This proposal tries to make Flink's subtask thread more responsive when
>  processing watermarks/firing timers, and make those operations
>  interruptible/break them apart into smaller steps. At the same time,
> >> the
>  proposed solution could be potentially adopted in other places in the
> >>> code
>  base as well, to solve similar problems with other flatMap-like
> >> operators
>  (non windowed joins, aggregations, CepOperator, ...).
> 
>  I'm looking forward to your thoughts.
> 
>  Best,
>  Piotrek
>


Re: [DISCUSS] FLIP-443: Interruptible watermark processing

2024-04-30 Thread Stefan Richter

Thanks for the improvement proposal, I’m +1 for the change!

Best,
Stefan



> On 30. Apr 2024, at 15:23, Roman Khachatryan  wrote:
> 
> Thanks for the proposal, I definitely see the need for this improvement, +1.
> 
> Regards,
> Roman
> 
> 
> On Tue, Apr 30, 2024 at 3:11 PM Piotr Nowojski  > wrote:
> 
>> Hi Yanfei,
>> 
>> Thanks for the feedback!
>> 
>>> 1. Currently when AbstractStreamOperator or AbstractStreamOperatorV2
>>> processes a watermark, the watermark will be sent to downstream, if
>>> the `InternalTimerServiceImpl#advanceWatermark` is interrupted, when
>>> is the watermark sent downstream?
>> 
>> The watermark would be outputted by an operator only once all relevant
>> timers are fired.
>> In other words, if firing of timers is interrupted a continuation mail to
>> continue firing those
>> interrupted timers is created. Watermark will be emitted downstream at the
>> end of that
>> continuation mail.
>> 
>>> 2. IIUC, processing-timer's firing is also encapsulated into mail and
>>> executed in mailbox. Is processing-timer allowed to be interrupted?
>> 
>> Yes, both firing processing and even time timers share the same code and
>> both will
>> support interruptions in the same way. Actually I've renamed the FLIP from
>> 
>>> Interruptible watermarks processing
>> 
>> to:
>> 
>>> Interruptible timers firing
>> 
>> to make this more clear.
>> 
>> Best,
>> Piotrek
>> 
>> wt., 30 kwi 2024 o 06:08 Yanfei Lei  napisał(a):
>> 
>>> Hi Piotrek,
>>> 
>>> Thanks for this proposal. It looks like it will shorten the checkpoint
>>> duration, especially in the case of back pressure. +1 for it!  I'd
>>> like to ask some questions to understand your thoughts more precisely.
>>> 
>>> 1. Currently when AbstractStreamOperator or AbstractStreamOperatorV2
>>> processes a watermark, the watermark will be sent to downstream, if
>>> the `InternalTimerServiceImpl#advanceWatermark` is interrupted, when
>>> is the watermark sent downstream?
>>> 2. IIUC, processing-timer's firing is also encapsulated into mail and
>>> executed in mailbox. Is processing-timer allowed to be interrupted?
>>> 
>>> Best regards,
>>> Yanfei
>>> 
>>> Piotr Nowojski  于2024年4月29日周一 21:57写道:
>>> 
 
 Hi all,
 
 I would like to start a discussion on FLIP-443: Interruptible watermark
 processing.
 
 https://www.google.com/url?q=https://cwiki.apache.org/confluence/x/qgn9EQ=gmail-imap=171508837000=AOvVaw0eTZDvLwdZUDai5GqoSGrD
 
 This proposal tries to make Flink's subtask thread more responsive when
 processing watermarks/firing timers, and make those operations
 interruptible/break them apart into smaller steps. At the same time,
>> the
 proposed solution could be potentially adopted in other places in the
>>> code
 base as well, to solve similar problems with other flatMap-like
>> operators
 (non windowed joins, aggregations, CepOperator, ...).
 
 I'm looking forward to your thoughts.
 
 Best,
 Piotrek



Re: [DISCUSS] FLIP-443: Interruptible watermark processing

2024-04-30 Thread Roman Khachatryan
Thanks for the proposal, I definitely see the need for this improvement, +1.

Regards,
Roman


On Tue, Apr 30, 2024 at 3:11 PM Piotr Nowojski  wrote:

> Hi Yanfei,
>
> Thanks for the feedback!
>
> > 1. Currently when AbstractStreamOperator or AbstractStreamOperatorV2
> > processes a watermark, the watermark will be sent to downstream, if
> > the `InternalTimerServiceImpl#advanceWatermark` is interrupted, when
> > is the watermark sent downstream?
>
> The watermark would be outputted by an operator only once all relevant
> timers are fired.
> In other words, if firing of timers is interrupted a continuation mail to
> continue firing those
> interrupted timers is created. Watermark will be emitted downstream at the
> end of that
> continuation mail.
>
> > 2. IIUC, processing-timer's firing is also encapsulated into mail and
> > executed in mailbox. Is processing-timer allowed to be interrupted?
>
> Yes, both firing processing and even time timers share the same code and
> both will
> support interruptions in the same way. Actually I've renamed the FLIP from
>
> > Interruptible watermarks processing
>
> to:
>
> > Interruptible timers firing
>
> to make this more clear.
>
> Best,
> Piotrek
>
> wt., 30 kwi 2024 o 06:08 Yanfei Lei  napisał(a):
>
> > Hi Piotrek,
> >
> > Thanks for this proposal. It looks like it will shorten the checkpoint
> > duration, especially in the case of back pressure. +1 for it!  I'd
> > like to ask some questions to understand your thoughts more precisely.
> >
> > 1. Currently when AbstractStreamOperator or AbstractStreamOperatorV2
> > processes a watermark, the watermark will be sent to downstream, if
> > the `InternalTimerServiceImpl#advanceWatermark` is interrupted, when
> > is the watermark sent downstream?
> > 2. IIUC, processing-timer's firing is also encapsulated into mail and
> > executed in mailbox. Is processing-timer allowed to be interrupted?
> >
> > Best regards,
> > Yanfei
> >
> > Piotr Nowojski  于2024年4月29日周一 21:57写道:
> >
> > >
> > > Hi all,
> > >
> > > I would like to start a discussion on FLIP-443: Interruptible watermark
> > > processing.
> > >
> > > https://cwiki.apache.org/confluence/x/qgn9EQ
> > >
> > > This proposal tries to make Flink's subtask thread more responsive when
> > > processing watermarks/firing timers, and make those operations
> > > interruptible/break them apart into smaller steps. At the same time,
> the
> > > proposed solution could be potentially adopted in other places in the
> > code
> > > base as well, to solve similar problems with other flatMap-like
> operators
> > > (non windowed joins, aggregations, CepOperator, ...).
> > >
> > > I'm looking forward to your thoughts.
> > >
> > > Best,
> > > Piotrek
> >
>


Re: [DISCUSS] FLIP-443: Interruptible watermark processing

2024-04-30 Thread Piotr Nowojski
Hi Yanfei,

Thanks for the feedback!

> 1. Currently when AbstractStreamOperator or AbstractStreamOperatorV2
> processes a watermark, the watermark will be sent to downstream, if
> the `InternalTimerServiceImpl#advanceWatermark` is interrupted, when
> is the watermark sent downstream?

The watermark would be outputted by an operator only once all relevant
timers are fired.
In other words, if firing of timers is interrupted a continuation mail to
continue firing those
interrupted timers is created. Watermark will be emitted downstream at the
end of that
continuation mail.

> 2. IIUC, processing-timer's firing is also encapsulated into mail and
> executed in mailbox. Is processing-timer allowed to be interrupted?

Yes, both firing processing and even time timers share the same code and
both will
support interruptions in the same way. Actually I've renamed the FLIP from

> Interruptible watermarks processing

to:

> Interruptible timers firing

to make this more clear.

Best,
Piotrek

wt., 30 kwi 2024 o 06:08 Yanfei Lei  napisał(a):

> Hi Piotrek,
>
> Thanks for this proposal. It looks like it will shorten the checkpoint
> duration, especially in the case of back pressure. +1 for it!  I'd
> like to ask some questions to understand your thoughts more precisely.
>
> 1. Currently when AbstractStreamOperator or AbstractStreamOperatorV2
> processes a watermark, the watermark will be sent to downstream, if
> the `InternalTimerServiceImpl#advanceWatermark` is interrupted, when
> is the watermark sent downstream?
> 2. IIUC, processing-timer's firing is also encapsulated into mail and
> executed in mailbox. Is processing-timer allowed to be interrupted?
>
> Best regards,
> Yanfei
>
> Piotr Nowojski  于2024年4月29日周一 21:57写道:
>
> >
> > Hi all,
> >
> > I would like to start a discussion on FLIP-443: Interruptible watermark
> > processing.
> >
> > https://cwiki.apache.org/confluence/x/qgn9EQ
> >
> > This proposal tries to make Flink's subtask thread more responsive when
> > processing watermarks/firing timers, and make those operations
> > interruptible/break them apart into smaller steps. At the same time, the
> > proposed solution could be potentially adopted in other places in the
> code
> > base as well, to solve similar problems with other flatMap-like operators
> > (non windowed joins, aggregations, CepOperator, ...).
> >
> > I'm looking forward to your thoughts.
> >
> > Best,
> > Piotrek
>


Re: [DISCUSS] FLIP-443: Interruptible watermark processing

2024-04-29 Thread Yanfei Lei
Hi Piotrek,

Thanks for this proposal. It looks like it will shorten the checkpoint
duration, especially in the case of back pressure. +1 for it!  I'd
like to ask some questions to understand your thoughts more precisely.

1. Currently when AbstractStreamOperator or AbstractStreamOperatorV2
processes a watermark, the watermark will be sent to downstream, if
the `InternalTimerServiceImpl#advanceWatermark` is interrupted, when
is the watermark sent downstream?
2. IIUC, processing-timer's firing is also encapsulated into mail and
executed in mailbox. Is processing-timer allowed to be interrupted?

Best regards,
Yanfei

Piotr Nowojski  于2024年4月29日周一 21:57写道:

>
> Hi all,
>
> I would like to start a discussion on FLIP-443: Interruptible watermark
> processing.
>
> https://cwiki.apache.org/confluence/x/qgn9EQ
>
> This proposal tries to make Flink's subtask thread more responsive when
> processing watermarks/firing timers, and make those operations
> interruptible/break them apart into smaller steps. At the same time, the
> proposed solution could be potentially adopted in other places in the code
> base as well, to solve similar problems with other flatMap-like operators
> (non windowed joins, aggregations, CepOperator, ...).
>
> I'm looking forward to your thoughts.
>
> Best,
> Piotrek