Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-18 Thread Leonard Xu
+1 for interval-during-backlog


best,
leonard

> On Jul 14, 2023, at 11:38 PM, Piotr Nowojski  wrote:
> 
> Hi All,
> 
> We had a lot of off-line discussions. As a result I would suggest dropping
> the idea of introducing an end-to-end-latency concept, until
> we can properly implement it, which will require more designing and
> experimenting. I would suggest starting with a more manual solution,
> where the user needs to configure concrete parameters, like
> `execution.checkpointing.max-interval` or `execution.flush-interval`.
> 
> FLIP-309 looks good to me, I would just rename
> `execution.checkpointing.interval-during-backlog` to
> `execution.checkpointing.max-interval`.
> 
> I would also reference future work, that a solution that would allow set
> `isProcessingBacklog` for sources like Kafka will be introduced via
> FLIP-328 [1].
> 
> Best,
> Piotrek
> 
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> 
> śr., 12 lip 2023 o 03:49 Dong Lin  napisał(a):
> 
>> Hi Piotr,
>> 
>> I think I understand your motivation for suggeseting
>> execution.slow-end-to-end-latency now. Please see my followup comments
>> (after the previous email) inline.
>> 
>> On Wed, Jul 12, 2023 at 12:32 AM Piotr Nowojski 
>> wrote:
>> 
>>> Hi Dong,
>>> 
>>> Thanks for the updates, a couple of comments:
>>> 
 If a record is generated by a source when the source's
>>> isProcessingBacklog is true, or some of the records used to
 derive this record (by an operator) has isBacklog = true, then this
>>> record should have isBacklog = true. Otherwise,
 this record should have isBacklog = false.
>>> 
>>> nit:
>>> I think this conflicts with "Rule of thumb for non-source operators to
>> set
>>> isBacklog = true for the records it emits:"
>>> section later on, when it comes to a case if an operator has mixed
>>> isBacklog = false and isBacklog = true inputs.
>>> 
 execution.checkpointing.interval-during-backlog
>>> 
>>> Do we need to define this as an interval config parameter? Won't that add
>>> an option that will be almost instantly deprecated
>>> because what we actually would like to have is:
>>> execution.slow-end-to-end-latency and execution.end-to-end-latency
>>> 
>> 
>> I guess you are suggesting that we should allow users to specify a higher
>> end-to-end latency budget for those records that are emitted by two-phase
>> commit sink, than those records that are emitted by none-two-phase commit
>> sink.
>> 
>> My concern with this approach is that it will increase the complexity of
>> the definition of "processing latency requirement", as well as the
>> complexity of the Flink runtime code that handles it. Currently, the
>> FLIP-325 defines end-to-end latency as an attribute of the records that is
>> statically assigned when the record is generated at the source, regardless
>> of how it will be emitted later in the topology. If we make the changes
>> proposed above, we would need to define the latency requirement w.r.t. the
>> attribute of the operators that it travels through before its result is
>> emitted, which is less intuitive and more complex.
>> 
>> For now, it is not clear whether it is necessary to have two categories of
>> latency requirement for the same job. Maybe it is reasonable to assume that
>> if a job has two-phase commit sink and the user is OK to emit some results
>> at 1 minute interval, then more likely than not the user is also OK to emit
>> all results at 1 minute interval, include those that go through
>> none-two-phase commit sink?
>> 
>> If we do want to support different end-to-end latency depending on whether
>> the operator is emitted by two-phase commit sink, I would prefer to still
>> use execution.checkpointing.interval-during-backlog instead of
>> execution.slow-end-to-end-latency. This allows us to keep the concept of
>> end-to-end latency simple. Also, by explicitly including "checkpointing
>> interval" in the name of the config that directly affects checkpointing
>> interval, we can make it easier and more intuitive for users to understand
>> the impact and set proper value for such configs.
>> 
>> What do you think?
>> 
>> Best,
>> Dong
>> 
>> 
>>> Maybe we can introduce only `execution.slow-end-to-end-latency` (% maybe
>> a
>>> better name), and for the time being
>>> use it as the checkpoint interval value during backlog?
>> 
>> 
>>> Or do you envision that in the future users will be configuring only:
>>> - execution.end-to-end-latency
>>> and only optionally:
>>> - execution.checkpointing.interval-during-backlog
>>> ?
>>> 
>>> Best Piotrek
>>> 
>>> PS, I will read the summary that you have just published later, but I
>> think
>>> we don't need to block this FLIP on the
>>> existence of that high level summary.
>>> 
>>> wt., 11 lip 2023 o 17:49 Dong Lin  napisał(a):
>>> 
 Hi Piotr and everyone,
 
 I have documented the vision with a summary of the existing work in
>> 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-18 Thread Piotr Nowojski
Thanks Dong!

Piotrek

wt., 18 lip 2023 o 06:04 Dong Lin  napisał(a):

> Hi all,
>
> I have updated FLIP-309 as suggested by Piotr to include a reference to
> FLIP-328 in the future work section.
>
> Piotra, Stephan, and I discussed offline regarding the choice
> between execution.checkpointing.max-interval and
> execution.checkpointing.interval-during-backlog.
> The advantage of using "max-interval" is that Flink runtime can have more
> flexibility to decide when/how to adjust checkpointing intervals (based on
> information other than backlog). The advantage of using
> "interval-during-backlog" is that it is clearer to the user when/how this
> configured interval is used. Since there is no immediate need for the extra
> flexibility as of this FLIP, we agreed to go with interval-during-backlog
> for now. And we can rename this config to e.g.
> execution.checkpointing.max-interval when needed in the future.
>
> Thanks everyone for all the reviews and suggestions! And special thanks to
> Piotr and Stephan for taking extra time to provide detailed reviews and
> suggestions offline!
>
> Since there is no further comment, I will open the voting thread for this
> FLIP.
>
> Cheers,
> Dong
>
>
> On Fri, Jul 14, 2023 at 11:39 PM Piotr Nowojski 
> wrote:
>
> > Hi All,
> >
> > We had a lot of off-line discussions. As a result I would suggest
> dropping
> > the idea of introducing an end-to-end-latency concept, until
> > we can properly implement it, which will require more designing and
> > experimenting. I would suggest starting with a more manual solution,
> > where the user needs to configure concrete parameters, like
> > `execution.checkpointing.max-interval` or `execution.flush-interval`.
> >
> > FLIP-309 looks good to me, I would just rename
> > `execution.checkpointing.interval-during-backlog` to
> > `execution.checkpointing.max-interval`.
> >
> > I would also reference future work, that a solution that would allow set
> > `isProcessingBacklog` for sources like Kafka will be introduced via
> > FLIP-328 [1].
> >
> > Best,
> > Piotrek
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> >
> > śr., 12 lip 2023 o 03:49 Dong Lin  napisał(a):
> >
> > > Hi Piotr,
> > >
> > > I think I understand your motivation for suggeseting
> > > execution.slow-end-to-end-latency now. Please see my followup comments
> > > (after the previous email) inline.
> > >
> > > On Wed, Jul 12, 2023 at 12:32 AM Piotr Nowojski 
> > > wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > Thanks for the updates, a couple of comments:
> > > >
> > > > > If a record is generated by a source when the source's
> > > > isProcessingBacklog is true, or some of the records used to
> > > > > derive this record (by an operator) has isBacklog = true, then this
> > > > record should have isBacklog = true. Otherwise,
> > > > > this record should have isBacklog = false.
> > > >
> > > > nit:
> > > > I think this conflicts with "Rule of thumb for non-source operators
> to
> > > set
> > > > isBacklog = true for the records it emits:"
> > > > section later on, when it comes to a case if an operator has mixed
> > > > isBacklog = false and isBacklog = true inputs.
> > > >
> > > > > execution.checkpointing.interval-during-backlog
> > > >
> > > > Do we need to define this as an interval config parameter? Won't that
> > add
> > > > an option that will be almost instantly deprecated
> > > > because what we actually would like to have is:
> > > > execution.slow-end-to-end-latency and execution.end-to-end-latency
> > > >
> > >
> > > I guess you are suggesting that we should allow users to specify a
> higher
> > > end-to-end latency budget for those records that are emitted by
> two-phase
> > > commit sink, than those records that are emitted by none-two-phase
> commit
> > > sink.
> > >
> > > My concern with this approach is that it will increase the complexity
> of
> > > the definition of "processing latency requirement", as well as the
> > > complexity of the Flink runtime code that handles it. Currently, the
> > > FLIP-325 defines end-to-end latency as an attribute of the records that
> > is
> > > statically assigned when the record is generated at the source,
> > regardless
> > > of how it will be emitted later in the topology. If we make the changes
> > > proposed above, we would need to define the latency requirement w.r.t.
> > the
> > > attribute of the operators that it travels through before its result is
> > > emitted, which is less intuitive and more complex.
> > >
> > > For now, it is not clear whether it is necessary to have two categories
> > of
> > > latency requirement for the same job. Maybe it is reasonable to assume
> > that
> > > if a job has two-phase commit sink and the user is OK to emit some
> > results
> > > at 1 minute interval, then more likely than not the user is also OK to
> > emit
> > > all results at 1 minute interval, include those that 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-17 Thread Dong Lin
Hi all,

I have updated FLIP-309 as suggested by Piotr to include a reference to
FLIP-328 in the future work section.

Piotra, Stephan, and I discussed offline regarding the choice
between execution.checkpointing.max-interval and
execution.checkpointing.interval-during-backlog.
The advantage of using "max-interval" is that Flink runtime can have more
flexibility to decide when/how to adjust checkpointing intervals (based on
information other than backlog). The advantage of using
"interval-during-backlog" is that it is clearer to the user when/how this
configured interval is used. Since there is no immediate need for the extra
flexibility as of this FLIP, we agreed to go with interval-during-backlog
for now. And we can rename this config to e.g.
execution.checkpointing.max-interval when needed in the future.

Thanks everyone for all the reviews and suggestions! And special thanks to
Piotr and Stephan for taking extra time to provide detailed reviews and
suggestions offline!

Since there is no further comment, I will open the voting thread for this
FLIP.

Cheers,
Dong


On Fri, Jul 14, 2023 at 11:39 PM Piotr Nowojski 
wrote:

> Hi All,
>
> We had a lot of off-line discussions. As a result I would suggest dropping
> the idea of introducing an end-to-end-latency concept, until
> we can properly implement it, which will require more designing and
> experimenting. I would suggest starting with a more manual solution,
> where the user needs to configure concrete parameters, like
> `execution.checkpointing.max-interval` or `execution.flush-interval`.
>
> FLIP-309 looks good to me, I would just rename
> `execution.checkpointing.interval-during-backlog` to
> `execution.checkpointing.max-interval`.
>
> I would also reference future work, that a solution that would allow set
> `isProcessingBacklog` for sources like Kafka will be introduced via
> FLIP-328 [1].
>
> Best,
> Piotrek
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
>
> śr., 12 lip 2023 o 03:49 Dong Lin  napisał(a):
>
> > Hi Piotr,
> >
> > I think I understand your motivation for suggeseting
> > execution.slow-end-to-end-latency now. Please see my followup comments
> > (after the previous email) inline.
> >
> > On Wed, Jul 12, 2023 at 12:32 AM Piotr Nowojski 
> > wrote:
> >
> > > Hi Dong,
> > >
> > > Thanks for the updates, a couple of comments:
> > >
> > > > If a record is generated by a source when the source's
> > > isProcessingBacklog is true, or some of the records used to
> > > > derive this record (by an operator) has isBacklog = true, then this
> > > record should have isBacklog = true. Otherwise,
> > > > this record should have isBacklog = false.
> > >
> > > nit:
> > > I think this conflicts with "Rule of thumb for non-source operators to
> > set
> > > isBacklog = true for the records it emits:"
> > > section later on, when it comes to a case if an operator has mixed
> > > isBacklog = false and isBacklog = true inputs.
> > >
> > > > execution.checkpointing.interval-during-backlog
> > >
> > > Do we need to define this as an interval config parameter? Won't that
> add
> > > an option that will be almost instantly deprecated
> > > because what we actually would like to have is:
> > > execution.slow-end-to-end-latency and execution.end-to-end-latency
> > >
> >
> > I guess you are suggesting that we should allow users to specify a higher
> > end-to-end latency budget for those records that are emitted by two-phase
> > commit sink, than those records that are emitted by none-two-phase commit
> > sink.
> >
> > My concern with this approach is that it will increase the complexity of
> > the definition of "processing latency requirement", as well as the
> > complexity of the Flink runtime code that handles it. Currently, the
> > FLIP-325 defines end-to-end latency as an attribute of the records that
> is
> > statically assigned when the record is generated at the source,
> regardless
> > of how it will be emitted later in the topology. If we make the changes
> > proposed above, we would need to define the latency requirement w.r.t.
> the
> > attribute of the operators that it travels through before its result is
> > emitted, which is less intuitive and more complex.
> >
> > For now, it is not clear whether it is necessary to have two categories
> of
> > latency requirement for the same job. Maybe it is reasonable to assume
> that
> > if a job has two-phase commit sink and the user is OK to emit some
> results
> > at 1 minute interval, then more likely than not the user is also OK to
> emit
> > all results at 1 minute interval, include those that go through
> > none-two-phase commit sink?
> >
> > If we do want to support different end-to-end latency depending on
> whether
> > the operator is emitted by two-phase commit sink, I would prefer to still
> > use execution.checkpointing.interval-during-backlog instead of
> > execution.slow-end-to-end-latency. 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-14 Thread Piotr Nowojski
Hi All,

We had a lot of off-line discussions. As a result I would suggest dropping
the idea of introducing an end-to-end-latency concept, until
we can properly implement it, which will require more designing and
experimenting. I would suggest starting with a more manual solution,
where the user needs to configure concrete parameters, like
`execution.checkpointing.max-interval` or `execution.flush-interval`.

FLIP-309 looks good to me, I would just rename
`execution.checkpointing.interval-during-backlog` to
`execution.checkpointing.max-interval`.

I would also reference future work, that a solution that would allow set
`isProcessingBacklog` for sources like Kafka will be introduced via
FLIP-328 [1].

Best,
Piotrek

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag

śr., 12 lip 2023 o 03:49 Dong Lin  napisał(a):

> Hi Piotr,
>
> I think I understand your motivation for suggeseting
> execution.slow-end-to-end-latency now. Please see my followup comments
> (after the previous email) inline.
>
> On Wed, Jul 12, 2023 at 12:32 AM Piotr Nowojski 
> wrote:
>
> > Hi Dong,
> >
> > Thanks for the updates, a couple of comments:
> >
> > > If a record is generated by a source when the source's
> > isProcessingBacklog is true, or some of the records used to
> > > derive this record (by an operator) has isBacklog = true, then this
> > record should have isBacklog = true. Otherwise,
> > > this record should have isBacklog = false.
> >
> > nit:
> > I think this conflicts with "Rule of thumb for non-source operators to
> set
> > isBacklog = true for the records it emits:"
> > section later on, when it comes to a case if an operator has mixed
> > isBacklog = false and isBacklog = true inputs.
> >
> > > execution.checkpointing.interval-during-backlog
> >
> > Do we need to define this as an interval config parameter? Won't that add
> > an option that will be almost instantly deprecated
> > because what we actually would like to have is:
> > execution.slow-end-to-end-latency and execution.end-to-end-latency
> >
>
> I guess you are suggesting that we should allow users to specify a higher
> end-to-end latency budget for those records that are emitted by two-phase
> commit sink, than those records that are emitted by none-two-phase commit
> sink.
>
> My concern with this approach is that it will increase the complexity of
> the definition of "processing latency requirement", as well as the
> complexity of the Flink runtime code that handles it. Currently, the
> FLIP-325 defines end-to-end latency as an attribute of the records that is
> statically assigned when the record is generated at the source, regardless
> of how it will be emitted later in the topology. If we make the changes
> proposed above, we would need to define the latency requirement w.r.t. the
> attribute of the operators that it travels through before its result is
> emitted, which is less intuitive and more complex.
>
> For now, it is not clear whether it is necessary to have two categories of
> latency requirement for the same job. Maybe it is reasonable to assume that
> if a job has two-phase commit sink and the user is OK to emit some results
> at 1 minute interval, then more likely than not the user is also OK to emit
> all results at 1 minute interval, include those that go through
> none-two-phase commit sink?
>
> If we do want to support different end-to-end latency depending on whether
> the operator is emitted by two-phase commit sink, I would prefer to still
> use execution.checkpointing.interval-during-backlog instead of
> execution.slow-end-to-end-latency. This allows us to keep the concept of
> end-to-end latency simple. Also, by explicitly including "checkpointing
> interval" in the name of the config that directly affects checkpointing
> interval, we can make it easier and more intuitive for users to understand
> the impact and set proper value for such configs.
>
> What do you think?
>
> Best,
> Dong
>
>
> > Maybe we can introduce only `execution.slow-end-to-end-latency` (% maybe
> a
> > better name), and for the time being
> > use it as the checkpoint interval value during backlog?
>
>
> > Or do you envision that in the future users will be configuring only:
> > - execution.end-to-end-latency
> > and only optionally:
> > - execution.checkpointing.interval-during-backlog
> > ?
> >
> > Best Piotrek
> >
> > PS, I will read the summary that you have just published later, but I
> think
> > we don't need to block this FLIP on the
> > existence of that high level summary.
> >
> > wt., 11 lip 2023 o 17:49 Dong Lin  napisał(a):
> >
> > > Hi Piotr and everyone,
> > >
> > > I have documented the vision with a summary of the existing work in
> this
> > > doc. Please feel free to review/comment/edit this doc. Looking forward
> to
> > > working with you together in this line of work.
> > >
> > >
> > >
> >
> 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-11 Thread Dong Lin
Hi Piotr,

I think I understand your motivation for suggeseting
execution.slow-end-to-end-latency now. Please see my followup comments
(after the previous email) inline.

On Wed, Jul 12, 2023 at 12:32 AM Piotr Nowojski 
wrote:

> Hi Dong,
>
> Thanks for the updates, a couple of comments:
>
> > If a record is generated by a source when the source's
> isProcessingBacklog is true, or some of the records used to
> > derive this record (by an operator) has isBacklog = true, then this
> record should have isBacklog = true. Otherwise,
> > this record should have isBacklog = false.
>
> nit:
> I think this conflicts with "Rule of thumb for non-source operators to set
> isBacklog = true for the records it emits:"
> section later on, when it comes to a case if an operator has mixed
> isBacklog = false and isBacklog = true inputs.
>
> > execution.checkpointing.interval-during-backlog
>
> Do we need to define this as an interval config parameter? Won't that add
> an option that will be almost instantly deprecated
> because what we actually would like to have is:
> execution.slow-end-to-end-latency and execution.end-to-end-latency
>

I guess you are suggesting that we should allow users to specify a higher
end-to-end latency budget for those records that are emitted by two-phase
commit sink, than those records that are emitted by none-two-phase commit
sink.

My concern with this approach is that it will increase the complexity of
the definition of "processing latency requirement", as well as the
complexity of the Flink runtime code that handles it. Currently, the
FLIP-325 defines end-to-end latency as an attribute of the records that is
statically assigned when the record is generated at the source, regardless
of how it will be emitted later in the topology. If we make the changes
proposed above, we would need to define the latency requirement w.r.t. the
attribute of the operators that it travels through before its result is
emitted, which is less intuitive and more complex.

For now, it is not clear whether it is necessary to have two categories of
latency requirement for the same job. Maybe it is reasonable to assume that
if a job has two-phase commit sink and the user is OK to emit some results
at 1 minute interval, then more likely than not the user is also OK to emit
all results at 1 minute interval, include those that go through
none-two-phase commit sink?

If we do want to support different end-to-end latency depending on whether
the operator is emitted by two-phase commit sink, I would prefer to still
use execution.checkpointing.interval-during-backlog instead of
execution.slow-end-to-end-latency. This allows us to keep the concept of
end-to-end latency simple. Also, by explicitly including "checkpointing
interval" in the name of the config that directly affects checkpointing
interval, we can make it easier and more intuitive for users to understand
the impact and set proper value for such configs.

What do you think?

Best,
Dong


> Maybe we can introduce only `execution.slow-end-to-end-latency` (% maybe a
> better name), and for the time being
> use it as the checkpoint interval value during backlog?


> Or do you envision that in the future users will be configuring only:
> - execution.end-to-end-latency
> and only optionally:
> - execution.checkpointing.interval-during-backlog
> ?
>
> Best Piotrek
>
> PS, I will read the summary that you have just published later, but I think
> we don't need to block this FLIP on the
> existence of that high level summary.
>
> wt., 11 lip 2023 o 17:49 Dong Lin  napisał(a):
>
> > Hi Piotr and everyone,
> >
> > I have documented the vision with a summary of the existing work in this
> > doc. Please feel free to review/comment/edit this doc. Looking forward to
> > working with you together in this line of work.
> >
> >
> >
> https://docs.google.com/document/d/1CgxXvPdAbv60R9yrrQAwaRgK3aMAgAL7RPPr799tOsQ/edit?usp=sharing
> >
> > Best,
> > Dong
> >
> > On Tue, Jul 11, 2023 at 1:07 AM Piotr Nowojski  >
> > wrote:
> >
> > > Hi All,
> > >
> > > Me and Dong chatted offline about the above mentioned issues (thanks
> for
> > > that offline chat
> > > I think it helped both of us a lot). The summary is below.
> > >
> > > > Previously, I thought you meant to add a generic logic in
> > > SourceReaderBase
> > > > to read existing metrics (e.g. backpressure) and emit the
> > > > IsProcessingBacklogEvent to SourceCoordinator. I am sorry if I have
> > > > misunderstood your suggetions.
> > > >
> > > > After double-checking your previous suggestion, I am wondering if you
> > are
> > > > OK with the following approach:
> > > >
> > > > - Add a job-level config
> > execution.checkpointing.interval-during-backlog
> > > > - Add an API SourceReaderContext#setProcessingBacklog(boolean
> > > > isProcessingBacklog).
> > > > - When this API is invoked, it internally sends an
> > > > internal SourceReaderBacklogEvent to SourceCoordinator.
> > > > - SourceCoordinator should keep track of the latest
> 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-11 Thread Dong Lin
Hi Piotr,

Thanks for the comments. Please see my reply inline.

On Wed, Jul 12, 2023 at 12:32 AM Piotr Nowojski 
wrote:

> Hi Dong,
>
> Thanks for the updates, a couple of comments:
>
> > If a record is generated by a source when the source's
> isProcessingBacklog is true, or some of the records used to
> > derive this record (by an operator) has isBacklog = true, then this
> record should have isBacklog = true. Otherwise,
> > this record should have isBacklog = false.
>
> nit:
> I think this conflicts with "Rule of thumb for non-source operators to set
> isBacklog = true for the records it emits:"
> section later on, when it comes to a case if an operator has mixed
> isBacklog = false and isBacklog = true inputs.
>

Hmm... I double checked these paragraphs but could not find the conflicts.
Maybe I have missed something. Could you help explain what the conflict is?


>
> > execution.checkpointing.interval-during-backlog
>
> Do we need to define this as an interval config parameter? Won't that add
> an option that will be almost instantly deprecated
> because what we actually would like to have is:
> execution.slow-end-to-end-latency and execution.end-to-end-latency
>
> Maybe we can introduce only `execution.slow-end-to-end-latency` (% maybe a
> better name), and for the time being
> use it as the checkpoint interval value during backlog?
>

Good point!

I think it is feasible (and simpler in the long term) to have only 2
configs (i.e. execution.checkpointing.interval,
execution.end-to-end-latency), instead 3 configs (e.g.
execution.checkpointing.interval, execution.end-to-end-latency,
execution.checkpointing.interval-during-backlog), with the following
semantics:

1) *execution.checkpointing.interval*
It is used as the checkpointing interval if any of the following conditions
are true.
- execution.end-to-end-latency is set to null
- the job does not have any two-phase-commit sink that is processing
non-backlog records.

Typically, users should set execution.checkpointing.interval to upper-bound
the amount of work that will be redo after job failover.

2) *execution.end-to-end-latency*
It is the processing latency requirement for non-backlog records. If it is
not null, and if a two-phase commit sink is processing non-backlog records,
then the checkpointing interval will be set to
execution.end-to-end-latency. Its default value is null.

In order to achieve this goal, in addition to renaming the config from
execution.checkpointing.interval-during-backlog to execution.end-to-end
latency, we will need to add RecordAttributes (from FLIP-325) to propagate
isBacklog value to sink operator, add public API for sink operator, and
update every two-phase commit sink operator to report isBacklog status to
JM so that JM can adjust checkpointing interval.

Overall I would also prefer to take the long term approach rather than
adding a config that we will deprecate in the near future.

If this looks good to you overall, I will update the FLIP as described
above.

BTW, I am not sure I get the idea of why we need both
execution.slow-end-to-end-latency and execution.end-to-end-latency, or why
we need to have "slow" in the config name. Can you help explain it?

What do you think?

Best,
Dong


> Or do you envision that in the future users will be configuring only:
> - execution.end-to-end-latency
> and only optionally:
> - execution.checkpointing.interval-during-backlog
> ?
>
> Best Piotrek
>
> PS, I will read the summary that you have just published later, but I think
> we don't need to block this FLIP on the
> existence of that high level summary.
>
> wt., 11 lip 2023 o 17:49 Dong Lin  napisał(a):
>
> > Hi Piotr and everyone,
> >
> > I have documented the vision with a summary of the existing work in this
> > doc. Please feel free to review/comment/edit this doc. Looking forward to
> > working with you together in this line of work.
> >
> >
> >
> https://docs.google.com/document/d/1CgxXvPdAbv60R9yrrQAwaRgK3aMAgAL7RPPr799tOsQ/edit?usp=sharing
> >
> > Best,
> > Dong
> >
> > On Tue, Jul 11, 2023 at 1:07 AM Piotr Nowojski  >
> > wrote:
> >
> > > Hi All,
> > >
> > > Me and Dong chatted offline about the above mentioned issues (thanks
> for
> > > that offline chat
> > > I think it helped both of us a lot). The summary is below.
> > >
> > > > Previously, I thought you meant to add a generic logic in
> > > SourceReaderBase
> > > > to read existing metrics (e.g. backpressure) and emit the
> > > > IsProcessingBacklogEvent to SourceCoordinator. I am sorry if I have
> > > > misunderstood your suggetions.
> > > >
> > > > After double-checking your previous suggestion, I am wondering if you
> > are
> > > > OK with the following approach:
> > > >
> > > > - Add a job-level config
> > execution.checkpointing.interval-during-backlog
> > > > - Add an API SourceReaderContext#setProcessingBacklog(boolean
> > > > isProcessingBacklog).
> > > > - When this API is invoked, it internally sends an
> > > > internal SourceReaderBacklogEvent to 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-11 Thread Piotr Nowojski
Hi Dong,

Thanks for the updates, a couple of comments:

> If a record is generated by a source when the source's
isProcessingBacklog is true, or some of the records used to
> derive this record (by an operator) has isBacklog = true, then this
record should have isBacklog = true. Otherwise,
> this record should have isBacklog = false.

nit:
I think this conflicts with "Rule of thumb for non-source operators to set
isBacklog = true for the records it emits:"
section later on, when it comes to a case if an operator has mixed
isBacklog = false and isBacklog = true inputs.

> execution.checkpointing.interval-during-backlog

Do we need to define this as an interval config parameter? Won't that add
an option that will be almost instantly deprecated
because what we actually would like to have is:
execution.slow-end-to-end-latency and execution.end-to-end-latency

Maybe we can introduce only `execution.slow-end-to-end-latency` (% maybe a
better name), and for the time being
use it as the checkpoint interval value during backlog?

Or do you envision that in the future users will be configuring only:
- execution.end-to-end-latency
and only optionally:
- execution.checkpointing.interval-during-backlog
?

Best Piotrek

PS, I will read the summary that you have just published later, but I think
we don't need to block this FLIP on the
existence of that high level summary.

wt., 11 lip 2023 o 17:49 Dong Lin  napisał(a):

> Hi Piotr and everyone,
>
> I have documented the vision with a summary of the existing work in this
> doc. Please feel free to review/comment/edit this doc. Looking forward to
> working with you together in this line of work.
>
>
> https://docs.google.com/document/d/1CgxXvPdAbv60R9yrrQAwaRgK3aMAgAL7RPPr799tOsQ/edit?usp=sharing
>
> Best,
> Dong
>
> On Tue, Jul 11, 2023 at 1:07 AM Piotr Nowojski 
> wrote:
>
> > Hi All,
> >
> > Me and Dong chatted offline about the above mentioned issues (thanks for
> > that offline chat
> > I think it helped both of us a lot). The summary is below.
> >
> > > Previously, I thought you meant to add a generic logic in
> > SourceReaderBase
> > > to read existing metrics (e.g. backpressure) and emit the
> > > IsProcessingBacklogEvent to SourceCoordinator. I am sorry if I have
> > > misunderstood your suggetions.
> > >
> > > After double-checking your previous suggestion, I am wondering if you
> are
> > > OK with the following approach:
> > >
> > > - Add a job-level config
> execution.checkpointing.interval-during-backlog
> > > - Add an API SourceReaderContext#setProcessingBacklog(boolean
> > > isProcessingBacklog).
> > > - When this API is invoked, it internally sends an
> > > internal SourceReaderBacklogEvent to SourceCoordinator.
> > > - SourceCoordinator should keep track of the latest isProcessingBacklog
> > > status from all its subtasks. And for now, we will hardcode the logic
> > such
> > > that if any source reader says it is under backlog, then
> > > execution.checkpointing.interval-during-backlog is used.
> > >
> > > This approach looks good to me as it can achieve the same performance
> > with
> > > the same number of public APIs for the target use-case. And I suppose
> in
> > > the future we might be able to re-use this API for source reader to set
> > its
> > > backlog status based on its backpressure metrics, which could be an
> extra
> > > advantage over the current approach.
> > >
> > > Do you think we can agree to adopt the approach described above?
> >
> > Yes, I think that's a viable approach. I would be perfectly fine to not
> > introduce
> > `SourceReaderContext#setProcessingBacklog(boolean isProcessingBacklog).`
> > and sending the `SourceReaderBacklogEvent` from SourceReader to JM
> > in this FLIP. It could be implemented once we would decide to add some
> more
> > generic
> > ways of detecting backlog/backpressure on the SourceReader level.
> >
> > I think we could also just keep the current proposal of adding
> > `SplitEnumeratorContext#setIsProcessingBacklog`, and use it in the
> sources
> > that
> > can set it on the `SplitEnumerator` level. Later we could merge this with
> > another
> > mechanisms of detecting "isProcessingBacklog", like based on watermark
> lag,
> > backpressure, etc, via some component running on the JM.
> >
> > At the same time I'm fine with having the "isProcessingBacklog" concept
> to
> > switch
> > runtime back and forth between high and low latency modes instead of
> > "backpressure". In FLIP-325 I have asked:
> >
> > > I think there is one thing that hasn't been discussed neither here nor
> in
> > FLIP-309. Given that we have
> > > three dimensions:
> > > - e2e latency/checkpointing interval
> > > - enabling some kind of batching/buffering on the operator level
> > > - how much resources we want to allocate to the job
> > >
> > > How do we want Flink to adjust itself between those three? For example:
> > > a) Should we assume that given Job has a fixed amount of assigned
> > resources and make it paramount that
> > >   

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-11 Thread Dong Lin
Hi Piotr and everyone,

I have documented the vision with a summary of the existing work in this
doc. Please feel free to review/comment/edit this doc. Looking forward to
working with you together in this line of work.

https://docs.google.com/document/d/1CgxXvPdAbv60R9yrrQAwaRgK3aMAgAL7RPPr799tOsQ/edit?usp=sharing

Best,
Dong

On Tue, Jul 11, 2023 at 1:07 AM Piotr Nowojski 
wrote:

> Hi All,
>
> Me and Dong chatted offline about the above mentioned issues (thanks for
> that offline chat
> I think it helped both of us a lot). The summary is below.
>
> > Previously, I thought you meant to add a generic logic in
> SourceReaderBase
> > to read existing metrics (e.g. backpressure) and emit the
> > IsProcessingBacklogEvent to SourceCoordinator. I am sorry if I have
> > misunderstood your suggetions.
> >
> > After double-checking your previous suggestion, I am wondering if you are
> > OK with the following approach:
> >
> > - Add a job-level config execution.checkpointing.interval-during-backlog
> > - Add an API SourceReaderContext#setProcessingBacklog(boolean
> > isProcessingBacklog).
> > - When this API is invoked, it internally sends an
> > internal SourceReaderBacklogEvent to SourceCoordinator.
> > - SourceCoordinator should keep track of the latest isProcessingBacklog
> > status from all its subtasks. And for now, we will hardcode the logic
> such
> > that if any source reader says it is under backlog, then
> > execution.checkpointing.interval-during-backlog is used.
> >
> > This approach looks good to me as it can achieve the same performance
> with
> > the same number of public APIs for the target use-case. And I suppose in
> > the future we might be able to re-use this API for source reader to set
> its
> > backlog status based on its backpressure metrics, which could be an extra
> > advantage over the current approach.
> >
> > Do you think we can agree to adopt the approach described above?
>
> Yes, I think that's a viable approach. I would be perfectly fine to not
> introduce
> `SourceReaderContext#setProcessingBacklog(boolean isProcessingBacklog).`
> and sending the `SourceReaderBacklogEvent` from SourceReader to JM
> in this FLIP. It could be implemented once we would decide to add some more
> generic
> ways of detecting backlog/backpressure on the SourceReader level.
>
> I think we could also just keep the current proposal of adding
> `SplitEnumeratorContext#setIsProcessingBacklog`, and use it in the sources
> that
> can set it on the `SplitEnumerator` level. Later we could merge this with
> another
> mechanisms of detecting "isProcessingBacklog", like based on watermark lag,
> backpressure, etc, via some component running on the JM.
>
> At the same time I'm fine with having the "isProcessingBacklog" concept to
> switch
> runtime back and forth between high and low latency modes instead of
> "backpressure". In FLIP-325 I have asked:
>
> > I think there is one thing that hasn't been discussed neither here nor in
> FLIP-309. Given that we have
> > three dimensions:
> > - e2e latency/checkpointing interval
> > - enabling some kind of batching/buffering on the operator level
> > - how much resources we want to allocate to the job
> >
> > How do we want Flink to adjust itself between those three? For example:
> > a) Should we assume that given Job has a fixed amount of assigned
> resources and make it paramount that
> >   Flink doesn't exceed those available resources? So in case of
> backpressure, we
> >   should extend checkpointing intervals, emit records less frequently and
> in batches.
> > b) Or should we assume that the amount of resources is flexible (up to a
> point?), and the desired e2e latency
> >   is the paramount aspect? So in case of backpressure, we should still
> adhere to the configured e2e latency,
> >   and wait for the user or autoscaler to scale up the job?
> >
> > In case of a), I think the concept of "isProcessingBacklog" is not
> needed, we could steer the behaviour only
> > using the backpressure information.
> >
> > On the other hand, in case of b), "isProcessingBacklog" information might
> be helpful, to let Flink know that
> > we can safely decrease the e2e latency/checkpoint interval even if there
> is no backpressure, to use fewer
> > resources (and let the autoscaler scale down the job).
> >
> > Do we want to have both, or only one of those? Do a) and b) complement
> one another? If job is backpressured,
> > we should follow a) and expose to autoscaler/users information "Hey! I'm
> barely keeping up! I need more resources!".
> > While, when there is no backpressure and latency doesn't matter
> (isProcessingBacklog=true), we can limit the resource
> > usage
>
> After thinking this over:
> - the case that we don't have "isProcessingBacklog" information, but the
> source operator is
>   back pressured, must be intermittent. EIther back pressure will go away,
> or shortly we should
>   reach the "isProcessingBacklog" state anyway
> - and even if we implement some back 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-11 Thread Dong Lin
Hi Piotr,

Thank you for all the discussions! I will ask for a meeting in the future
when we have prolonged discussions like this :)

Please see my comments inline.

BTW, I am hoping we can make this feature available in Flink 1.18, which
will feature freeze soon on July 24. If this FLIP looks good overall, do
you think it is OK to open the voting thread?

Thanks,
Dong

On Tue, Jul 11, 2023 at 1:07 AM Piotr Nowojski 
wrote:

> Hi All,
>
> Me and Dong chatted offline about the above mentioned issues (thanks for
> that offline chat
> I think it helped both of us a lot). The summary is below.
>
> > Previously, I thought you meant to add a generic logic in
> SourceReaderBase
> > to read existing metrics (e.g. backpressure) and emit the
> > IsProcessingBacklogEvent to SourceCoordinator. I am sorry if I have
> > misunderstood your suggetions.
> >
> > After double-checking your previous suggestion, I am wondering if you are
> > OK with the following approach:
> >
> > - Add a job-level config execution.checkpointing.interval-during-backlog
> > - Add an API SourceReaderContext#setProcessingBacklog(boolean
> > isProcessingBacklog).
> > - When this API is invoked, it internally sends an
> > internal SourceReaderBacklogEvent to SourceCoordinator.
> > - SourceCoordinator should keep track of the latest isProcessingBacklog
> > status from all its subtasks. And for now, we will hardcode the logic
> such
> > that if any source reader says it is under backlog, then
> > execution.checkpointing.interval-during-backlog is used.
> >
> > This approach looks good to me as it can achieve the same performance
> with
> > the same number of public APIs for the target use-case. And I suppose in
> > the future we might be able to re-use this API for source reader to set
> its
> > backlog status based on its backpressure metrics, which could be an extra
> > advantage over the current approach.
> >
> > Do you think we can agree to adopt the approach described above?
>
> Yes, I think that's a viable approach. I would be perfectly fine to not
> introduce
> `SourceReaderContext#setProcessingBacklog(boolean isProcessingBacklog).`
> and sending the `SourceReaderBacklogEvent` from SourceReader to JM
> in this FLIP. It could be implemented once we would decide to add some more
> generic
> ways of detecting backlog/backpressure on the SourceReader level.


> I think we could also just keep the current proposal of adding
> `SplitEnumeratorContext#setIsProcessingBacklog`, and use it in the sources
> that
> can set it on the `SplitEnumerator` level. Later we could merge this with
> another
> mechanisms of detecting "isProcessingBacklog", like based on watermark lag,
> backpressure, etc, via some component running on the JM.
>
>
Sounds good! We will keep the FLIP as is. And yes, let's extend the API
when it is needed in the future.


> At the same time I'm fine with having the "isProcessingBacklog" concept to
> switch
> runtime back and forth between high and low latency modes instead of
> "backpressure". In FLIP-325 I have asked:


> > I think there is one thing that hasn't been discussed neither here nor in
> FLIP-309. Given that we have
> > three dimensions:
> > - e2e latency/checkpointing interval
> > - enabling some kind of batching/buffering on the operator level
> > - how much resources we want to allocate to the job
> >
> > How do we want Flink to adjust itself between those three? For example:
> > a) Should we assume that given Job has a fixed amount of assigned
> resources and make it paramount that
> >   Flink doesn't exceed those available resources? So in case of
> backpressure, we
> >   should extend checkpointing intervals, emit records less frequently and
> in batches.
> > b) Or should we assume that the amount of resources is flexible (up to a
> point?), and the desired e2e latency
> >   is the paramount aspect? So in case of backpressure, we should still
> adhere to the configured e2e latency,
> >   and wait for the user or autoscaler to scale up the job?
> >
> > In case of a), I think the concept of "isProcessingBacklog" is not
> needed, we could steer the behaviour only
> > using the backpressure information.
> >
> > On the other hand, in case of b), "isProcessingBacklog" information might
> be helpful, to let Flink know that
> > we can safely decrease the e2e latency/checkpoint interval even if there
> is no backpressure, to use fewer
> > resources (and let the autoscaler scale down the job).
> >
> > Do we want to have both, or only one of those? Do a) and b) complement
> one another? If job is backpressured,
> > we should follow a) and expose to autoscaler/users information "Hey! I'm
> barely keeping up! I need more resources!".
> > While, when there is no backpressure and latency doesn't matter
> (isProcessingBacklog=true), we can limit the resource
> > usage
>
> After thinking this over:
> - the case that we don't have "isProcessingBacklog" information, but the
> source operator is
>   back pressured, must be intermittent. 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-10 Thread Piotr Nowojski
Hi All,

Me and Dong chatted offline about the above mentioned issues (thanks for
that offline chat
I think it helped both of us a lot). The summary is below.

> Previously, I thought you meant to add a generic logic in SourceReaderBase
> to read existing metrics (e.g. backpressure) and emit the
> IsProcessingBacklogEvent to SourceCoordinator. I am sorry if I have
> misunderstood your suggetions.
>
> After double-checking your previous suggestion, I am wondering if you are
> OK with the following approach:
>
> - Add a job-level config execution.checkpointing.interval-during-backlog
> - Add an API SourceReaderContext#setProcessingBacklog(boolean
> isProcessingBacklog).
> - When this API is invoked, it internally sends an
> internal SourceReaderBacklogEvent to SourceCoordinator.
> - SourceCoordinator should keep track of the latest isProcessingBacklog
> status from all its subtasks. And for now, we will hardcode the logic such
> that if any source reader says it is under backlog, then
> execution.checkpointing.interval-during-backlog is used.
>
> This approach looks good to me as it can achieve the same performance with
> the same number of public APIs for the target use-case. And I suppose in
> the future we might be able to re-use this API for source reader to set
its
> backlog status based on its backpressure metrics, which could be an extra
> advantage over the current approach.
>
> Do you think we can agree to adopt the approach described above?

Yes, I think that's a viable approach. I would be perfectly fine to not
introduce
`SourceReaderContext#setProcessingBacklog(boolean isProcessingBacklog).`
and sending the `SourceReaderBacklogEvent` from SourceReader to JM
in this FLIP. It could be implemented once we would decide to add some more
generic
ways of detecting backlog/backpressure on the SourceReader level.

I think we could also just keep the current proposal of adding
`SplitEnumeratorContext#setIsProcessingBacklog`, and use it in the sources
that
can set it on the `SplitEnumerator` level. Later we could merge this with
another
mechanisms of detecting "isProcessingBacklog", like based on watermark lag,
backpressure, etc, via some component running on the JM.

At the same time I'm fine with having the "isProcessingBacklog" concept to
switch
runtime back and forth between high and low latency modes instead of
"backpressure". In FLIP-325 I have asked:

> I think there is one thing that hasn't been discussed neither here nor in
FLIP-309. Given that we have
> three dimensions:
> - e2e latency/checkpointing interval
> - enabling some kind of batching/buffering on the operator level
> - how much resources we want to allocate to the job
>
> How do we want Flink to adjust itself between those three? For example:
> a) Should we assume that given Job has a fixed amount of assigned
resources and make it paramount that
>   Flink doesn't exceed those available resources? So in case of
backpressure, we
>   should extend checkpointing intervals, emit records less frequently and
in batches.
> b) Or should we assume that the amount of resources is flexible (up to a
point?), and the desired e2e latency
>   is the paramount aspect? So in case of backpressure, we should still
adhere to the configured e2e latency,
>   and wait for the user or autoscaler to scale up the job?
>
> In case of a), I think the concept of "isProcessingBacklog" is not
needed, we could steer the behaviour only
> using the backpressure information.
>
> On the other hand, in case of b), "isProcessingBacklog" information might
be helpful, to let Flink know that
> we can safely decrease the e2e latency/checkpoint interval even if there
is no backpressure, to use fewer
> resources (and let the autoscaler scale down the job).
>
> Do we want to have both, or only one of those? Do a) and b) complement
one another? If job is backpressured,
> we should follow a) and expose to autoscaler/users information "Hey! I'm
barely keeping up! I need more resources!".
> While, when there is no backpressure and latency doesn't matter
(isProcessingBacklog=true), we can limit the resource
> usage

After thinking this over:
- the case that we don't have "isProcessingBacklog" information, but the
source operator is
  back pressured, must be intermittent. EIther back pressure will go away,
or shortly we should
  reach the "isProcessingBacklog" state anyway
- and even if we implement some back pressure detecting algorithm to switch
the runtime into the
  "high latency mode", we can always report that as "isProcessingBacklog"
anyway, as runtime should
   react the same way in both cases (backpressure and "isProcessingBacklog
states).

===

With a common understanding of the final solution that we want to have in
the future, I'm pretty much fine with the current
FLIP-309 proposal, with a couple of remarks:
1. Could you include in the FLIP-309 the long term solution as we have
discussed.
a) Would be nice to have some diagram showing how the

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-05 Thread Dong Lin
Hi Piotr,

I am sorry if you feel unhappy or upset with us for not following/fixing
your proposal. It is not my intention to give you this feeling. After all,
we are all trying to make Flink better, to support more use-case with the
most maintainable code. I hope you understand that just like you, I have
also been doing my best to think through various design options and taking
time to evalute the pros/cons. Eventually, we probably still need to reach
consensus by clearly listing and comparing the objective pros/cons of
different proposals and identifying the best choice.

Regarding your concern (or frustration) that we are always finding issues
in your proposal, I would say it is normal (and probably necessary) for
developers to find pros/cons in each other's solutions, so that we can
eventually pick the right one. I will appreciate anyone who can correctly
pinpoint the concrete issue in my proposal so that I can improve it or
choose an alternative solution.

Regarding your concern that we are not spending enough effort to find
solutions and that the problem in your solution can be solved in a minute,
I would like to say that is not true. For each of your previous proposals,
I typically spent 1+ hours thinking through your proposal to understand
whether it works and why it does not work, and another 1+ hour to write
down the details and explain why it does not work. And I have had a variety
of offline discussions with my colleagues discussing various proposals
(including yours) with 6+ hours in total. Maybe I am not capable enough to
fix those issues in one minute or so so. If you think your proposal can be
easily fixed in one minute or so, I would really appreciate it if you can
think through your proposal and fix it in the first place :)

For your information, I have had several long discussions with my
colleagues at Alibaba and also Becket on this FLIP. We have seriously
considered your proposals and discussed in detail what are the pros/cons
and whether we can improve these solutions. The initial version of this
FLIP (which allows the source operator to specify checkpoint intervals)
does not get enough support due to concerns of not being generic (i.e.
users need to specify checkpoint intervals on a per-source basis). It is
only after I updated the FLIP to use the job-level
execution.checkpointing.interval-during-backlog, then they agree to give +1
to the FLIP. What I want to tell you is that your suggestions have been
taken seriously, and the quality of the FLIP has been taken seriously
by all those who have voted. As a result of taking your suggestion
seriously and trying to find improvements, we updated the FLIP to use
isProcessingBacklog.

I am wondering, do you think it will be useful to discuss face-to-face via
video conference call? It is not just between you and me. We can invite the
developers who are interested to join and help with the discussion. That
might improve communication efficiency and help us understand each other
better :)

I am writing this long email to hopefully get your understanding. I care
much more about the quality of the eventual solution rather than who
proposed the solution. Please bear with me and see my comments inline, with
an explanation of the pros/cons of these proposals.


On Wed, Jul 5, 2023 at 11:06 PM Piotr Nowojski 
wrote:

> Hi Guys,
>
> I would like to ask you again, to spend a bit more effort on trying to find
> solutions, not just pointing out problems. For 1.5 months,
> the discussion doesn't go in circle, but I'm suggesting a solution, you are
> trying to undermine it with some arguments, I'm coming
> back with a fix, often an extremely easy one, only for you to try to find
> yet another "issue". It doesn't bode well, if you are finding
> a "problem" that can be solved with a minute or so of thinking or even has
> already been solved.
>
> I have provided you so far with at least three distinct solutions that
> could address your exact target use-case. Two [1][2] generic
> enough to be probably good enough for the foreseeable future, one
> intermediate and not generic [3] but which wouldn't
> require @Public API changes or some custom hidden interfaces.


> All in all:
> - [1] with added metric hints like "isProcessingBacklog" solves your target
> use case pretty well. Downside is having to improve
>   how JM is collecting/aggregating metrics
>

Here is my analysis of this proposal compared to the current approach in
the FLIP-309.

pros:
- No need to add the public API
SplitEnumeratorContext#setIsProcessingBacklog.
cons:
- Need to add a public API that subclasses of SourceReader can use to
specify its IsProcessingBacklog metric value.
- Source Coordinator needs to periodically pull the isProcessingBacklog
metrics from all TMs throughout the job execution.

Here is why I think the cons outweigh the pros:
1) JM needs to collect/aggregate metrics with extra runtime overhead, which
is not necessary for the target use-case with the push-based approach in

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-05 Thread Piotr Nowojski
Hi Guys,

I would like to ask you again, to spend a bit more effort on trying to find
solutions, not just pointing out problems. For 1.5 months,
the discussion doesn't go in circle, but I'm suggesting a solution, you are
trying to undermine it with some arguments, I'm coming
back with a fix, often an extremely easy one, only for you to try to find
yet another "issue". It doesn't bode well, if you are finding
a "problem" that can be solved with a minute or so of thinking or even has
already been solved.

I have provided you so far with at least three distinct solutions that
could address your exact target use-case. Two [1][2] generic
enough to be probably good enough for the foreseeable future, one
intermediate and not generic [3] but which wouldn't
require @Public API changes or some custom hidden interfaces.

All in all:
- [1] with added metric hints like "isProcessingBacklog" solves your target
use case pretty well. Downside is having to improve
  how JM is collecting/aggregating metrics
- [2] is basically an equivalent of [1], replacing metrics with events. It
also is a superset of your proposal
- [3] yes, it's hacky, but it's a solution that could be thrown away once
we implement [1] or [2] . The only real theoretical
  downside is that it cannot control the long checkpoint exactly (short
checkpoint interval has to be a divisor of the long checkpoint
  interval, but I simply can not imagine a practical use where that would
be a blocker for a user. Please..., someone wanting to set
  short checkpoint interval to 3min and long to 7 minutes, and that someone
can not accept the long interval to be 9 minutes?
  And that's even ignoring the fact that if someone has an issue with the 3
minutes checkpoint interval, I can hardly think that merely
  doubling the interval to 7 minutes would significantly solve any problem
for that user.

Dong a long time ago you wrote:
> Sure. Then let's decide the final solution first.

Have you thought about that? Maybe I'm wrong but I don't remember you
describing in any of your proposals how they could be
extended in the future, to cover more generic cases. Regardless if you
either don't believe in the generic solution or struggle to
grasp it, if you can come back with something that can be easily extended
in the future, up to a point where one could implement
something similar to this backpressure detecting algorithm that I mentioned
many times before, I would be happy to discuss and
support it.

Hang, about your points 1. and 2., do you think those problems are
insurmountable and blockers for that counter proposal?

> 1. It is hard to find the error checkpoint.

No it's not, please take a look at what I exactly proposed and maybe at the
code.

> 2. (...) The failed checkpoint may make them think the job is unhealthy.

Please read again what I wrote in [3]. I'm mentioning there a solution for
this exact "problem".

About the necessity of the config value, I'm still not convinced that's
needed from the start, but yes we can add some config option
if you think otherwise. This option, if named properly, could be re-used in
the future for different solutions, so that's fine by me.

Best,
Piotrek

[1] Introduced in my very first e-mail from 23 maj 2023, 16:26, and refined
later with point "2." in my e-mail from 16 June 2023, 17:58
[2] Section "2. ===" in my e-mail from 30 June 2023, 16:34
[3] Section "3. ===" in my e-mail from 30 June 2023, 16:34

All times in CEST.

śr., 5 lip 2023 o 08:46 Hang Ruan  napisał(a):

> Hi, Piotr & Dong.
>
> Thanks for the discussion.
>
> IMO, I do not think the provided counter proposal is a good idea. There are
> some concerns from my side.
>
> 1. It is hard to find the error checkpoint.
> If there are other errors causing the checkpoint failure, we have to check
> every failed checkpoint to find it.
>
> 2. It is more confused for the users.
> Some users only know the feature, but don't know how we implement it. The
> failed checkpoint may make them think the job is unhealthy.
>
> 3. Users should be able to set the checkpoint interval for the new backlog
> state.
> I think it is better to provide a setting for users to change the
> checkpoint interval at the new backlog state. The hard-code interval(5x /
> 10x) is not flexible enough.
>
> Best,
> Hang
>
> Dong Lin  于2023年7月5日周三 07:33写道:
>
> > Hi Piotr,
> >
> > Any suggestion on how we can practically move forward to address the
> target
> > use-case?
> >
> > My understanding is that the current proposal does not have any
> > correctness/performance issues. And it allows the extension to support
> all
> > the extra use-case without having to throw away the proposed APIs.
> >
> > If you prefer to have a better solution with simpler APIs and yet same or
> > better correctness/performance for the target use-case, could you please
> > kindly explain its API design so that we can continue the discussion?
> >
> >
> > Best,
> > Dong
> >
> > On Mon, Jul 3, 2023 at 6:39 PM Dong Lin  

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-05 Thread Hang Ruan
Hi, Piotr & Dong.

Thanks for the discussion.

IMO, I do not think the provided counter proposal is a good idea. There are
some concerns from my side.

1. It is hard to find the error checkpoint.
If there are other errors causing the checkpoint failure, we have to check
every failed checkpoint to find it.

2. It is more confused for the users.
Some users only know the feature, but don't know how we implement it. The
failed checkpoint may make them think the job is unhealthy.

3. Users should be able to set the checkpoint interval for the new backlog
state.
I think it is better to provide a setting for users to change the
checkpoint interval at the new backlog state. The hard-code interval(5x /
10x) is not flexible enough.

Best,
Hang

Dong Lin  于2023年7月5日周三 07:33写道:

> Hi Piotr,
>
> Any suggestion on how we can practically move forward to address the target
> use-case?
>
> My understanding is that the current proposal does not have any
> correctness/performance issues. And it allows the extension to support all
> the extra use-case without having to throw away the proposed APIs.
>
> If you prefer to have a better solution with simpler APIs and yet same or
> better correctness/performance for the target use-case, could you please
> kindly explain its API design so that we can continue the discussion?
>
>
> Best,
> Dong
>
> On Mon, Jul 3, 2023 at 6:39 PM Dong Lin  wrote:
>
> > Hi Piotr,
> >
> > Please see my comments inline.
> >
> > On Mon, Jul 3, 2023 at 5:19 PM Piotr Nowojski 
> > wrote:
> >
> >> Hi Dong,
> >>
> >> Starting from the end:
> >>
> >> > It seems that the only benefit of this approach is to avoid"
> >> > adding SplitEnumeratorContext#setIsProcessingBacklog."
> >>
> >> Yes, that's the major benefit of this counter-proposal.
> >>
> >> > In the target use-case, user still want to do checkpoint (though at a"
> >> > larger interval) when there is backlog. And HybridSource need to know
> >> the"
> >> > expected checkpoint interval during backlog in order to determine
> >> whether"
> >> > it should keep throwing CheckpointException. Thus, we still need to
> add"
> >> > execution.checkpointing.interval-during-backlog for user to specify
> >> this"
> >> > information."
> >> >
> >> > The downside of this approach is that it is hard to enforce the"
> >> > semantics specified by
> execution.checkpointing.interval-during-backlog.
> >> For"
> >> > example, suppose execution.checkpointing.interval =3 minute and"
> >> > execution.checkpointing.interval-during-backlog = 7 minutes. During
> the"
> >> > backlog phase, checkpoint coordinator will still trigger the
> checkpoint"
> >> > once every 3 minutes. HybridSource will need to reject 2 out of the 3"
> >> > checkpoint invocation, and the effective checkpoint interval will be
> 9"
> >> > minutes."
> >>
> >> Does it really matter what's the exact value of the longer interval? Can
> >> not we
> >> hard-code it to be 5x or 10x of the base checkpoint interval? If there
> is
> >> a
> >> notice
> >> able overhead from the base interval slowing down records processing
> rate,
> >> reducing this interval by a factor of 5x or 10x, would fix performance
> >> issue for
> >> vast majority of users. So a source could just skip 4 out of 5 or 9 out
> of
> >> 10
> >> checkpoints.
> >>
> >
> > Yes, I think the exact value of the longer interval matters.
> >
> > The main reason we need two intervals is for jobs which have two-phase
> > commit sink. The short interval typically represents the interval that a
> > user can accept for the two-phase commit sink to buffer data (since it
> can
> > only emit data when checkpoint is triggered). And the long interval
> > typically represents the maximum amount of duplicate work (in terms of
> > time) that a job need to re-do after failover.
> >
> > Since there is no intrinsic relationship between the data buffer interval
> > (related to processing latency) and the failover boundary, I don't think
> we
> > can hardcode it to be 5x or 10x of the base checkpoint interval.
> >
> >
> >> Alternatively we could introduce a config option like:
> >>
> >> execution.checkpointing.long-interval
> >>
> >> that might be re-used in the future, with more fancy algorithms, but I
> >> don't see
> >> much value in doing that.
> >
> >
> >> > Overall, I think the solution is a bit hacky. I think it is preferred
> >> to"
> >> > throw exception only when there is indeed error. If we don't need to
> >> check"
> >> > a checkpoint, it is preferred to not trigger the checkpoint in the
> >> first"
> >> > place. And I think adding
> SplitEnumeratorContext#setIsProcessingBacklog
> >> is"
> >> > probably not that much of a big deal."
> >>
> >> Yes it's hacky, but at least it doesn't require extending the Public API
> >> for a
> >> quite limited solution, that only targets one or two sources that are
> >> rarely used.
> >>
> >
> > I am not sure it is fair to say MySQL CDC source is "rarely used".
> > ververica/flink-cdc-connectors GitHub repo has 4K + starts. Also, 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-04 Thread Dong Lin
Hi Piotr,

Any suggestion on how we can practically move forward to address the target
use-case?

My understanding is that the current proposal does not have any
correctness/performance issues. And it allows the extension to support all
the extra use-case without having to throw away the proposed APIs.

If you prefer to have a better solution with simpler APIs and yet same or
better correctness/performance for the target use-case, could you please
kindly explain its API design so that we can continue the discussion?


Best,
Dong

On Mon, Jul 3, 2023 at 6:39 PM Dong Lin  wrote:

> Hi Piotr,
>
> Please see my comments inline.
>
> On Mon, Jul 3, 2023 at 5:19 PM Piotr Nowojski 
> wrote:
>
>> Hi Dong,
>>
>> Starting from the end:
>>
>> > It seems that the only benefit of this approach is to avoid"
>> > adding SplitEnumeratorContext#setIsProcessingBacklog."
>>
>> Yes, that's the major benefit of this counter-proposal.
>>
>> > In the target use-case, user still want to do checkpoint (though at a"
>> > larger interval) when there is backlog. And HybridSource need to know
>> the"
>> > expected checkpoint interval during backlog in order to determine
>> whether"
>> > it should keep throwing CheckpointException. Thus, we still need to add"
>> > execution.checkpointing.interval-during-backlog for user to specify
>> this"
>> > information."
>> >
>> > The downside of this approach is that it is hard to enforce the"
>> > semantics specified by execution.checkpointing.interval-during-backlog.
>> For"
>> > example, suppose execution.checkpointing.interval =3 minute and"
>> > execution.checkpointing.interval-during-backlog = 7 minutes. During the"
>> > backlog phase, checkpoint coordinator will still trigger the checkpoint"
>> > once every 3 minutes. HybridSource will need to reject 2 out of the 3"
>> > checkpoint invocation, and the effective checkpoint interval will be 9"
>> > minutes."
>>
>> Does it really matter what's the exact value of the longer interval? Can
>> not we
>> hard-code it to be 5x or 10x of the base checkpoint interval? If there is
>> a
>> notice
>> able overhead from the base interval slowing down records processing rate,
>> reducing this interval by a factor of 5x or 10x, would fix performance
>> issue for
>> vast majority of users. So a source could just skip 4 out of 5 or 9 out of
>> 10
>> checkpoints.
>>
>
> Yes, I think the exact value of the longer interval matters.
>
> The main reason we need two intervals is for jobs which have two-phase
> commit sink. The short interval typically represents the interval that a
> user can accept for the two-phase commit sink to buffer data (since it can
> only emit data when checkpoint is triggered). And the long interval
> typically represents the maximum amount of duplicate work (in terms of
> time) that a job need to re-do after failover.
>
> Since there is no intrinsic relationship between the data buffer interval
> (related to processing latency) and the failover boundary, I don't think we
> can hardcode it to be 5x or 10x of the base checkpoint interval.
>
>
>> Alternatively we could introduce a config option like:
>>
>> execution.checkpointing.long-interval
>>
>> that might be re-used in the future, with more fancy algorithms, but I
>> don't see
>> much value in doing that.
>
>
>> > Overall, I think the solution is a bit hacky. I think it is preferred
>> to"
>> > throw exception only when there is indeed error. If we don't need to
>> check"
>> > a checkpoint, it is preferred to not trigger the checkpoint in the
>> first"
>> > place. And I think adding SplitEnumeratorContext#setIsProcessingBacklog
>> is"
>> > probably not that much of a big deal."
>>
>> Yes it's hacky, but at least it doesn't require extending the Public API
>> for a
>> quite limited solution, that only targets one or two sources that are
>> rarely used.
>>
>
> I am not sure it is fair to say MySQL CDC source is "rarely used".
> ververica/flink-cdc-connectors GitHub repo has 4K + starts. Also, note that
> the proposed feature can be useful for CDC sources with an internal
> "backlog phase". Its usage is not limited to just the two sources mentioned
> in the FLIP.
>
>
>>
>> 
>>
>> About the idea of emitting "RecordAttributes(isBacklog=..)". I have a
>> feeling that
>> this is overly complicated and would require every operator/function to
>> handle that.
>>
>> Yes it would cover even more use cases, at the cost of complicating the
>> system by
>> a lot. IMO it looks like something we could do if there would indeed by a
>> high
>> demand of such a feature, after we provide some baseline generic solution,
>> that
>> doesn't require any configuration.
>>
>> I have a feeling that by just statically looking at the shape of the job
>> graph and how
>> it is connected, we could deduce almost the same things.
>>
>
> Note that pretty much every FLIP will address my use-case at the cost
> of complicating the system. I understand you have the feeling that this
> complexity is not 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-03 Thread Dong Lin
Hi Piotr,

Please see my comments inline.

On Mon, Jul 3, 2023 at 5:19 PM Piotr Nowojski 
wrote:

> Hi Dong,
>
> Starting from the end:
>
> > It seems that the only benefit of this approach is to avoid"
> > adding SplitEnumeratorContext#setIsProcessingBacklog."
>
> Yes, that's the major benefit of this counter-proposal.
>
> > In the target use-case, user still want to do checkpoint (though at a"
> > larger interval) when there is backlog. And HybridSource need to know
> the"
> > expected checkpoint interval during backlog in order to determine
> whether"
> > it should keep throwing CheckpointException. Thus, we still need to add"
> > execution.checkpointing.interval-during-backlog for user to specify this"
> > information."
> >
> > The downside of this approach is that it is hard to enforce the"
> > semantics specified by execution.checkpointing.interval-during-backlog.
> For"
> > example, suppose execution.checkpointing.interval =3 minute and"
> > execution.checkpointing.interval-during-backlog = 7 minutes. During the"
> > backlog phase, checkpoint coordinator will still trigger the checkpoint"
> > once every 3 minutes. HybridSource will need to reject 2 out of the 3"
> > checkpoint invocation, and the effective checkpoint interval will be 9"
> > minutes."
>
> Does it really matter what's the exact value of the longer interval? Can
> not we
> hard-code it to be 5x or 10x of the base checkpoint interval? If there is a
> notice
> able overhead from the base interval slowing down records processing rate,
> reducing this interval by a factor of 5x or 10x, would fix performance
> issue for
> vast majority of users. So a source could just skip 4 out of 5 or 9 out of
> 10
> checkpoints.
>

Yes, I think the exact value of the longer interval matters.

The main reason we need two intervals is for jobs which have two-phase
commit sink. The short interval typically represents the interval that a
user can accept for the two-phase commit sink to buffer data (since it can
only emit data when checkpoint is triggered). And the long interval
typically represents the maximum amount of duplicate work (in terms of
time) that a job need to re-do after failover.

Since there is no intrinsic relationship between the data buffer interval
(related to processing latency) and the failover boundary, I don't think we
can hardcode it to be 5x or 10x of the base checkpoint interval.


> Alternatively we could introduce a config option like:
>
> execution.checkpointing.long-interval
>
> that might be re-used in the future, with more fancy algorithms, but I
> don't see
> much value in doing that.


> > Overall, I think the solution is a bit hacky. I think it is preferred to"
> > throw exception only when there is indeed error. If we don't need to
> check"
> > a checkpoint, it is preferred to not trigger the checkpoint in the first"
> > place. And I think adding SplitEnumeratorContext#setIsProcessingBacklog
> is"
> > probably not that much of a big deal."
>
> Yes it's hacky, but at least it doesn't require extending the Public API
> for a
> quite limited solution, that only targets one or two sources that are
> rarely used.
>

I am not sure it is fair to say MySQL CDC source is "rarely used".
ververica/flink-cdc-connectors GitHub repo has 4K + starts. Also, note that
the proposed feature can be useful for CDC sources with an internal
"backlog phase". Its usage is not limited to just the two sources mentioned
in the FLIP.


>
> 
>
> About the idea of emitting "RecordAttributes(isBacklog=..)". I have a
> feeling that
> this is overly complicated and would require every operator/function to
> handle that.
>
> Yes it would cover even more use cases, at the cost of complicating the
> system by
> a lot. IMO it looks like something we could do if there would indeed by a
> high
> demand of such a feature, after we provide some baseline generic solution,
> that
> doesn't require any configuration.
>
> I have a feeling that by just statically looking at the shape of the job
> graph and how
> it is connected, we could deduce almost the same things.
>

Note that pretty much every FLIP will address my use-case at the cost
of complicating the system. I understand you have the feeling that this
complexity is not worthwhile. However, as we can see from the comments in
this thread and the votes in the voting thread, many
committers/developers/users actually welcome the feature introduced in this
FLIP.

I am happy to work with you together to find a more generic and simpler
solution, as long as that solution can address the target use-case without
hurting user-experience. The alternative solution which you have mentioned
so far, unfortunately, still has drawbacks as mentioned earlier.


> Also:
>
> >  - the FLIP suggests to use the long checkpointing interval as long as
> any subtask is processing the backlog. Are you sure that's the right call?
> What if other
> >  sources are producing fresh records, and those fresh records are
> 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-03 Thread Piotr Nowojski
Hi Dong,

Starting from the end:

> It seems that the only benefit of this approach is to avoid"
> adding SplitEnumeratorContext#setIsProcessingBacklog."

Yes, that's the major benefit of this counter-proposal.

> In the target use-case, user still want to do checkpoint (though at a"
> larger interval) when there is backlog. And HybridSource need to know the"
> expected checkpoint interval during backlog in order to determine whether"
> it should keep throwing CheckpointException. Thus, we still need to add"
> execution.checkpointing.interval-during-backlog for user to specify this"
> information."
>
> The downside of this approach is that it is hard to enforce the"
> semantics specified by execution.checkpointing.interval-during-backlog.
For"
> example, suppose execution.checkpointing.interval =3 minute and"
> execution.checkpointing.interval-during-backlog = 7 minutes. During the"
> backlog phase, checkpoint coordinator will still trigger the checkpoint"
> once every 3 minutes. HybridSource will need to reject 2 out of the 3"
> checkpoint invocation, and the effective checkpoint interval will be 9"
> minutes."

Does it really matter what's the exact value of the longer interval? Can
not we
hard-code it to be 5x or 10x of the base checkpoint interval? If there is a
notice
able overhead from the base interval slowing down records processing rate,
reducing this interval by a factor of 5x or 10x, would fix performance
issue for
vast majority of users. So a source could just skip 4 out of 5 or 9 out of
10
checkpoints.

Alternatively we could introduce a config option like:

execution.checkpointing.long-interval

that might be re-used in the future, with more fancy algorithms, but I
don't see
much value in doing that.

> Overall, I think the solution is a bit hacky. I think it is preferred to"
> throw exception only when there is indeed error. If we don't need to
check"
> a checkpoint, it is preferred to not trigger the checkpoint in the first"
> place. And I think adding SplitEnumeratorContext#setIsProcessingBacklog
is"
> probably not that much of a big deal."

Yes it's hacky, but at least it doesn't require extending the Public API
for a
quite limited solution, that only targets one or two sources that are
rarely used.



About the idea of emitting "RecordAttributes(isBacklog=..)". I have a
feeling that
this is overly complicated and would require every operator/function to
handle that.

Yes it would cover even more use cases, at the cost of complicating the
system by
a lot. IMO it looks like something we could do if there would indeed by a
high
demand of such a feature, after we provide some baseline generic solution,
that
doesn't require any configuration.

I have a feeling that by just statically looking at the shape of the job
graph and how
it is connected, we could deduce almost the same things.

Also:

>  - the FLIP suggests to use the long checkpointing interval as long as
any subtask is processing the backlog. Are you sure that's the right call?
What if other
>  sources are producing fresh records, and those fresh records are
reaching sinks? It could happen either with disjoint JobGraph, embarrassing
parallel
>  JobGraph (no keyBy/unions/joins), or even with keyBy. Fresh records can
slip using a not backpressured input channel through generally backpressured
>  keyBy exchange. How should we handle that? This problem I think will
affect every solution, including my previously proposed generic one, but we
should
>  discuss how to handle that as well.

By this I didn't necessarily mean that we have to solve it right now.



> The moments above seem kind of "abstract". I am hoping to understand more
> technical details behind these comments so that we can see how to address
> the concern.

Over the span of this discussion I think I have already explained many
times what
bothers me in the current proposal.

> For example, even if a FLP does not address all use-case
> (which is arguably true for every FLIP), its solution does not necessarily
> need to be thrown away later as long as it is extensible

That's my main point. I haven't yet seen how proposals from this FLIP, that
could
extend FLIP-309 to cover the generic use case and:
 - Would work out of the box, for all or majority of the properly
implemented sources.
 - Would require zero or very minimal configuration/input from the user.
Especially
   wouldn't require implementing some custom things in every source.
 - Could be made to work well enough in the (vast?) majority of the use
cases.

> So we probably need to understand specifically why the proposed APIs
would be thrown away.

As I have mentioned many times why that's the case:
1. This solution is not generic enough
2. I can see solutions that wouldn't require modification of every source
3. They would have zero overlap with the interfaces extension from this
FLIP

Best,
Piotrek

sob., 1 lip 2023 o 17:01 Dong Lin  napisał(a):

> Hi Piotr,
>
> Thank you for 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-01 Thread Dong Lin
Hi Piotr,

Thank you for providing further suggestions to help improve the API. Please
see my comments inline.

On Fri, Jun 30, 2023 at 10:35 PM Piotr Nowojski 
wrote:

> Hey,
>
> Sorry for a late reply, I was OoO for a week. I have three things to point
> out.
>
> 1. ===
>
> The updated proposal is indeed better, but to be honest I still don't like
> it, for mostly the same reasons that I have mentioned earlier:
> - only a partial solution, that doesn't address all use cases, so we would
> need to throw it away sooner or later
> - I don't see and it hasn't been discussed how to make it work out of the
> box for all sources
> - somehow complicating API for people implementing Sources
> - it should work out of the box for most of the sources, or at least to
> have that potential in the future
>

The moments above seem kind of "abstract". I am hoping to understand more
technical details behind these comments so that we can see how to address
the concern. For example, even if a FLP does not address all use-case
(which is arguably true for every FLIP), its solution does not necessarily
need to be thrown away later as long as it is extensible. So we probably
need to understand specifically why the proposed APIs would be thrown away.

Similarly, we would need to understand if there is a better design to make
the API simpler and work out of the box etc. in order to decide how to
address these comments.


> On top of that:
> - the FLIP I think is missing how to hook up SplitEnumeratorContext and
> CheckpointCoordinator to pass "isProcessingBacklog"
>

I think it can be passed via the following function chain:
- CheckpointCoordinator invokes
OperatorCoordinatorCheckpointContext#isProcessingBacklog (via
coordinatorsToCheckpoint) to get this information.
- OperatorCoordinatorHolder implements
OperatorCoordinatorCheckpointContext#isProcessingBacklog and returns
OperatorCoordinator#isProcessingBacklog (via coordinator)
- SourceCoordinator implements OperatorCoordinator#isProcessingBacklog and
returns SourceCoordinatorContext#isProcessingBacklog
- SourceCoordinatorContext will implement
SplitEnumeratorContext#setIsProcessingBacklog and stores the given
information in a variable.

Note that it involves only internal API. We might be able to find a simpler
solution with less functions on the path. As long as the above solution
works without having any performance or correctness, I think maybe we
should focus on the public API design and discuss the implementation in the
PR review?

- the FLIP suggests to use the long checkpointing interval as long as any
> subtask is processing the backlog. Are you sure that's the right call? What
> if other
>   sources are producing fresh records, and those fresh records are reaching
> sinks? It could happen either with disjoint JobGraph, embarrassing parallel
>   JobGraph (no keyBy/unions/joins), or even with keyBy. Fresh records can
> slip using a not backpressured input channel through generally
> backpressured
>   keyBy exchange. How should we handle that? This problem I think will
> affect every solution, including my previously proposed generic one, but we
> should
>   discuss how to handle that as well.
>

Good question. Here is my plan to improve the solution in a follow-up FLIP:

- Let every subtask of every source operator emit
RecordAttributes(isBacklog=..)
- Let every subtask of every operator handle the RecordAttributes received
from inputs and emit RecordAttributes to downstream operators. Flink
runtime can derive this information for every one-input operator. For an
operator with two inputs, if one input has isBacklog=true and the other has
isBacklog=false, the operator should determine the isBacklog for its output
records based on its semantics.
- If there exists a subtask of a two-phase commit operator with
isBacklog=false, the operator should let JM know so that the JM will use
the short checkpoint interval (for data freshness). Otherwise, JM will use
the long checkpoint interval.

The above solution guarantees that, if every two-input operator has
explicitly specified their isBacklog based on the inputs' isBacklog, then
the JM will use the short checkpoint interval if and only if it is useful
for at least one subtask of one two-phase commit operator.

Note that even the above solution might not be perfect. Suppose there
exists one subtask of the two-phase commit operator has isBacklog=false,
but every other subtasks of this operator has isBacklog=true, due to load
imbalance. In this case, it might be beneficial to use the long checkpoint
interval to improve the average data freshness for this operator. However,
as we get into more edge case, the solution will become more complicated
(e.g. providing more APIs for user to specify their intended strategy) and
there will be less additional benefits (because these scenarios are less
common).

Also, note that we can support the solution described above without
throwing away any public API currently proposed in 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-30 Thread Piotr Nowojski
Hey,

Sorry for a late reply, I was OoO for a week. I have three things to point
out.

1. ===

The updated proposal is indeed better, but to be honest I still don't like
it, for mostly the same reasons that I have mentioned earlier:
- only a partial solution, that doesn't address all use cases, so we would
need to throw it away sooner or later
- I don't see and it hasn't been discussed how to make it work out of the
box for all sources
- somehow complicating API for people implementing Sources
- it should work out of the box for most of the sources, or at least to
have that potential in the future

On top of that:
- the FLIP I think is missing how to hook up SplitEnumeratorContext and
CheckpointCoordinator to pass "isProcessingBacklog"
- the FLIP suggests to use the long checkpointing interval as long as any
subtask is processing the backlog. Are you sure that's the right call? What
if other
  sources are producing fresh records, and those fresh records are reaching
sinks? It could happen either with disjoint JobGraph, embarrassing parallel
  JobGraph (no keyBy/unions/joins), or even with keyBy. Fresh records can
slip using a not backpressured input channel through generally backpressured
  keyBy exchange. How should we handle that? This problem I think will
affect every solution, including my previously proposed generic one, but we
should
  discuss how to handle that as well.

2. ===

Regarding the current proposal, there might be a way to make it actually
somehow generic (but not pluggable). But it might require slightly different
interfaces. We could keep the idea that SourceCoordinator/SplitEnumerator
is responsible for switching between slow/fast processing modes. It could be
implemented to achieve something like in the FLIP-309 proposal, but apart
of that, the default behaviour would be a built in mechanism working like
this:
1. Every SourceReaderBase checks its metrics and its state, to decide if it
considers itself as "processingBacklog" or "veryBackpressured". The base
implementation could do it via a similar mechanism as I was proposing
previously, via looking at the busy/backPressuredTimeMsPerSecond,
pendingRecords and processing rate.
2. SourceReaderBase could send an event with
"processingBacklog"/"veryBackpressured" state.
3. SourceCoordinator would collect those events, and decide what should it
do, whether it should switch whole source to the
"processingBacklog"/"veryBackpressured" state or not.

That could provide eventually a generic solution that works for every
source that reports the required metrics. Each source implementation could
decide
whether to use that default behaviour, or if maybe it's better to override
the default, or combine default with something custom (like HybridSource).

And as a first step, we could implement that mechanism only on the
SourceCoordinator side, without events, without the default generic
solution and use
it in the HybridSource/MySQL CDC.

This approach has some advantages compared to my previous proposal:
  + no need to tinker with metrics and pushing metrics from TMs to JM
  + somehow communicating this information via Events seems a bit cleaner
to me and avoids problems with freshness of the metrics
And some issues:
  - I don't know if it can be made pluggable in the future. If a user could
implement a custom `CheckpointTrigger` that would automatically work with
all/most
of the pre-existing sources?
  - I don't know if it can be expanded if needed in the future, to make
decisions based on operators in the middle of a jobgraph.

3. ===

Independent of that, during some brainstorming between me, Chesnay and
Stefan Richter, an idea popped up, that I think could be a counter proposal
as
an intermediate solution that probably effectively works the same way as
current FLIP-309.

Inside a HybridSource, from it's SplitEnumerator#snapshotState method, can
not you throw an exception like
`new CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS)` or `new
CheckpointException(TRIGGER_CHECKPOINT_FAILURE)`?
Actually we could also introduce a dedicated `CheckpointFailureReason` for
that purpose and handle it some special way in some places (like maybe hide
such rejected checkpoints from the REST API/WebUI). We could elaborate on
this a bit more, but after a brief thinking  I could see it actually
working well
enough without any public facing changes. But I might be wrong here.

If this feature actually grabs traction, we could expand it to something
more sophisticated available via a public API in the future.

===

Sorry for disturbing this FLIP discussion and voting.

Best,
Piotrek

czw., 29 cze 2023 o 05:08 feng xiangyu  napisał(a):

> Hi Dong,
>
> Thanks for your quick reply. I think this has truly solved our problem and
> will enable us upgrade our existing jobs more seamless.
>
> Best,
> Xiangyu
>
> Dong Lin  于2023年6月29日周四 10:50写道:
>
> > Hi Feng,
> >
> > Thanks for the feedback. Yes, you can 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-28 Thread feng xiangyu
Hi Dong,

Thanks for your quick reply. I think this has truly solved our problem and
will enable us upgrade our existing jobs more seamless.

Best,
Xiangyu

Dong Lin  于2023年6月29日周四 10:50写道:

> Hi Feng,
>
> Thanks for the feedback. Yes, you can configure the
> execution.checkpointing.interval-during-backlog to effectively disable
> checkpoint during backlog.
>
> Prior to your comment, the FLIP allows users to do this by setting the
> config value to something large (e.g. 365 day). After thinking about this
> more, we think it is more usable to allow users to achieve this goal by
> setting the config value to 0. This is consistent with the existing
> behavior of execution.checkpointing.interval -- the checkpoint is disabled
> if user set execution.checkpointing.interval to 0.
>
> We have updated the description of
> execution.checkpointing.interval-during-backlog
> to say the following:
> ... it is not null, the value must either be 0, which means the checkpoint
> is disabled during backlog, or be larger than or equal to
> execution.checkpointing.interval.
>
> Does this address your need?
>
> Best,
> Dong
>
>
>
> On Thu, Jun 29, 2023 at 9:23 AM feng xiangyu  wrote:
>
> > Hi Dong and Yunfeng,
> >
> > Thanks for the proposal, your flip sounds very useful from my
> perspective.
> > In our business, when we using hybrid source in production we also met
> the
> > problem described in your flip.
> > In our solution, we tend to skip making any checkpoints before all batch
> > tasks have finished and resume the periodic checkpoint only in streaming
> > phrase. Within this flip, we can solve our problem in a more generic way.
> >
> > However, I am wondering if we still want to skip making any checkpoints
> > during historical phrase, can we set this configuration
> > "execution.checkpointing.interval-during-backlog" equals "-1" to cover
> this
> > case?
> >
> > Best,
> > Xiangyu
> >
> > Hang Ruan  于2023年6月28日周三 16:30写道:
> >
> > > Thanks for Dong and Yunfeng's work.
> > >
> > > The FLIP looks good to me. This new version is clearer to understand.
> > >
> > > Best,
> > > Hang
> > >
> > > Dong Lin  于2023年6月27日周二 16:53写道:
> > >
> > > > Thanks Jack, Jingsong, and Zhu for the review!
> > > >
> > > > Thanks Zhu for the suggestion. I have updated the configuration name
> as
> > > > suggested.
> > > >
> > > > On Tue, Jun 27, 2023 at 4:45 PM Zhu Zhu  wrote:
> > > >
> > > > > Thanks Dong and Yunfeng for creating this FLIP and driving this
> > > > discussion.
> > > > >
> > > > > The new design looks generally good to me. Increasing the
> checkpoint
> > > > > interval when the job is processing backlogs is easier for users to
> > > > > understand and can help in more scenarios.
> > > > >
> > > > > I have one comment about the new configuration.
> > > > > Naming the new configuration
> > > > > "execution.checkpointing.interval-during-backlog" would be better
> > > > > according to Flink config naming convention.
> > > > > It is also because that nested config keys should be avoided. See
> > > > > FLINK-29372 for more details.
> > > > >
> > > > > Thanks,
> > > > > Zhu
> > > > >
> > > > > Jingsong Li  于2023年6月27日周二 15:45写道:
> > > > > >
> > > > > > Looks good to me!
> > > > > >
> > > > > > Thanks Dong, Yunfeng and all for your discussion and design.
> > > > > >
> > > > > > Best,
> > > > > > Jingsong
> > > > > >
> > > > > > On Tue, Jun 27, 2023 at 3:35 PM Jark Wu 
> wrote:
> > > > > > >
> > > > > > > Thank you Dong for driving this FLIP.
> > > > > > >
> > > > > > > The new design looks good to me!
> > > > > > >
> > > > > > > Best,
> > > > > > > Jark
> > > > > > >
> > > > > > > > 2023年6月27日 14:38,Dong Lin  写道:
> > > > > > > >
> > > > > > > > Thank you Leonard for the review!
> > > > > > > >
> > > > > > > > Hi Piotr, do you have any comments on the latest proposal?
> > > > > > > >
> > > > > > > > I am wondering if it is OK to start the voting thread this
> > week.
> > > > > > > >
> > > > > > > > On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu <
> xbjt...@gmail.com>
> > > > > wrote:
> > > > > > > >
> > > > > > > >> Thanks Dong for driving this FLIP forward!
> > > > > > > >>
> > > > > > > >> Introducing  `backlog status` concept for flink job makes
> > sense
> > > to
> > > > > me as
> > > > > > > >> following reasons:
> > > > > > > >>
> > > > > > > >> From concept/API design perspective, it’s more general and
> > > natural
> > > > > than
> > > > > > > >> above proposals as it can be used in HybridSource for
> bounded
> > > > > records, CDC
> > > > > > > >> Source for history snapshot and general sources like
> > KafkaSource
> > > > for
> > > > > > > >> historical messages.
> > > > > > > >>
> > > > > > > >> From user cases/requirements, I’ve seen many users manually
> to
> > > set
> > > > > larger
> > > > > > > >> checkpoint interval during backfilling and then set a
> shorter
> > > > > checkpoint
> > > > > > > >> interval for real-time processing in their production
> > > environments
> > > > > as a
> > > > > > > >> flink application 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-28 Thread Dong Lin
Thanks everyone (and specifically Piotr) for your valuable suggestions and
review!

We will open the voting thread for this FLIP.  We hope to make this feature
available in Flink 1.18 release, which will feature freeze on July 11.

Piotr: we will create a followup FLIP (probably in FLIP-328
)
to allow users to determine isBacklog dynamically based on the event-time
lag and/or source backpressure metrics.



On Thu, Jun 29, 2023 at 10:49 AM Dong Lin  wrote:

> Hi Feng,
>
> Thanks for the feedback. Yes, you can configure the
> execution.checkpointing.interval-during-backlog to effectively disable
> checkpoint during backlog.
>
> Prior to your comment, the FLIP allows users to do this by setting the
> config value to something large (e.g. 365 day). After thinking about this
> more, we think it is more usable to allow users to achieve this goal by
> setting the config value to 0. This is consistent with the existing
> behavior of execution.checkpointing.interval -- the checkpoint is
> disabled if user set execution.checkpointing.interval to 0.
>
> We have updated the description of 
> execution.checkpointing.interval-during-backlog
> to say the following:
> ... it is not null, the value must either be 0, which means the checkpoint
> is disabled during backlog, or be larger than or equal to
> execution.checkpointing.interval.
>
> Does this address your need?
>
> Best,
> Dong
>
>
>
> On Thu, Jun 29, 2023 at 9:23 AM feng xiangyu  wrote:
>
>> Hi Dong and Yunfeng,
>>
>> Thanks for the proposal, your flip sounds very useful from my perspective.
>> In our business, when we using hybrid source in production we also met the
>> problem described in your flip.
>> In our solution, we tend to skip making any checkpoints before all batch
>> tasks have finished and resume the periodic checkpoint only in streaming
>> phrase. Within this flip, we can solve our problem in a more generic way.
>>
>> However, I am wondering if we still want to skip making any checkpoints
>> during historical phrase, can we set this configuration
>> "execution.checkpointing.interval-during-backlog" equals "-1" to cover
>> this
>> case?
>>
>> Best,
>> Xiangyu
>>
>> Hang Ruan  于2023年6月28日周三 16:30写道:
>>
>> > Thanks for Dong and Yunfeng's work.
>> >
>> > The FLIP looks good to me. This new version is clearer to understand.
>> >
>> > Best,
>> > Hang
>> >
>> > Dong Lin  于2023年6月27日周二 16:53写道:
>> >
>> > > Thanks Jack, Jingsong, and Zhu for the review!
>> > >
>> > > Thanks Zhu for the suggestion. I have updated the configuration name
>> as
>> > > suggested.
>> > >
>> > > On Tue, Jun 27, 2023 at 4:45 PM Zhu Zhu  wrote:
>> > >
>> > > > Thanks Dong and Yunfeng for creating this FLIP and driving this
>> > > discussion.
>> > > >
>> > > > The new design looks generally good to me. Increasing the checkpoint
>> > > > interval when the job is processing backlogs is easier for users to
>> > > > understand and can help in more scenarios.
>> > > >
>> > > > I have one comment about the new configuration.
>> > > > Naming the new configuration
>> > > > "execution.checkpointing.interval-during-backlog" would be better
>> > > > according to Flink config naming convention.
>> > > > It is also because that nested config keys should be avoided. See
>> > > > FLINK-29372 for more details.
>> > > >
>> > > > Thanks,
>> > > > Zhu
>> > > >
>> > > > Jingsong Li  于2023年6月27日周二 15:45写道:
>> > > > >
>> > > > > Looks good to me!
>> > > > >
>> > > > > Thanks Dong, Yunfeng and all for your discussion and design.
>> > > > >
>> > > > > Best,
>> > > > > Jingsong
>> > > > >
>> > > > > On Tue, Jun 27, 2023 at 3:35 PM Jark Wu  wrote:
>> > > > > >
>> > > > > > Thank you Dong for driving this FLIP.
>> > > > > >
>> > > > > > The new design looks good to me!
>> > > > > >
>> > > > > > Best,
>> > > > > > Jark
>> > > > > >
>> > > > > > > 2023年6月27日 14:38,Dong Lin  写道:
>> > > > > > >
>> > > > > > > Thank you Leonard for the review!
>> > > > > > >
>> > > > > > > Hi Piotr, do you have any comments on the latest proposal?
>> > > > > > >
>> > > > > > > I am wondering if it is OK to start the voting thread this
>> week.
>> > > > > > >
>> > > > > > > On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu > >
>> > > > wrote:
>> > > > > > >
>> > > > > > >> Thanks Dong for driving this FLIP forward!
>> > > > > > >>
>> > > > > > >> Introducing  `backlog status` concept for flink job makes
>> sense
>> > to
>> > > > me as
>> > > > > > >> following reasons:
>> > > > > > >>
>> > > > > > >> From concept/API design perspective, it’s more general and
>> > natural
>> > > > than
>> > > > > > >> above proposals as it can be used in HybridSource for bounded
>> > > > records, CDC
>> > > > > > >> Source for history snapshot and general sources like
>> KafkaSource
>> > > for
>> > > > > > >> historical messages.
>> > > > > > >>
>> > > > > > >> From user cases/requirements, I’ve seen many users 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-28 Thread Dong Lin
Hi Feng,

Thanks for the feedback. Yes, you can configure the
execution.checkpointing.interval-during-backlog to effectively disable
checkpoint during backlog.

Prior to your comment, the FLIP allows users to do this by setting the
config value to something large (e.g. 365 day). After thinking about this
more, we think it is more usable to allow users to achieve this goal by
setting the config value to 0. This is consistent with the existing
behavior of execution.checkpointing.interval -- the checkpoint is disabled
if user set execution.checkpointing.interval to 0.

We have updated the description of
execution.checkpointing.interval-during-backlog
to say the following:
... it is not null, the value must either be 0, which means the checkpoint
is disabled during backlog, or be larger than or equal to
execution.checkpointing.interval.

Does this address your need?

Best,
Dong



On Thu, Jun 29, 2023 at 9:23 AM feng xiangyu  wrote:

> Hi Dong and Yunfeng,
>
> Thanks for the proposal, your flip sounds very useful from my perspective.
> In our business, when we using hybrid source in production we also met the
> problem described in your flip.
> In our solution, we tend to skip making any checkpoints before all batch
> tasks have finished and resume the periodic checkpoint only in streaming
> phrase. Within this flip, we can solve our problem in a more generic way.
>
> However, I am wondering if we still want to skip making any checkpoints
> during historical phrase, can we set this configuration
> "execution.checkpointing.interval-during-backlog" equals "-1" to cover this
> case?
>
> Best,
> Xiangyu
>
> Hang Ruan  于2023年6月28日周三 16:30写道:
>
> > Thanks for Dong and Yunfeng's work.
> >
> > The FLIP looks good to me. This new version is clearer to understand.
> >
> > Best,
> > Hang
> >
> > Dong Lin  于2023年6月27日周二 16:53写道:
> >
> > > Thanks Jack, Jingsong, and Zhu for the review!
> > >
> > > Thanks Zhu for the suggestion. I have updated the configuration name as
> > > suggested.
> > >
> > > On Tue, Jun 27, 2023 at 4:45 PM Zhu Zhu  wrote:
> > >
> > > > Thanks Dong and Yunfeng for creating this FLIP and driving this
> > > discussion.
> > > >
> > > > The new design looks generally good to me. Increasing the checkpoint
> > > > interval when the job is processing backlogs is easier for users to
> > > > understand and can help in more scenarios.
> > > >
> > > > I have one comment about the new configuration.
> > > > Naming the new configuration
> > > > "execution.checkpointing.interval-during-backlog" would be better
> > > > according to Flink config naming convention.
> > > > It is also because that nested config keys should be avoided. See
> > > > FLINK-29372 for more details.
> > > >
> > > > Thanks,
> > > > Zhu
> > > >
> > > > Jingsong Li  于2023年6月27日周二 15:45写道:
> > > > >
> > > > > Looks good to me!
> > > > >
> > > > > Thanks Dong, Yunfeng and all for your discussion and design.
> > > > >
> > > > > Best,
> > > > > Jingsong
> > > > >
> > > > > On Tue, Jun 27, 2023 at 3:35 PM Jark Wu  wrote:
> > > > > >
> > > > > > Thank you Dong for driving this FLIP.
> > > > > >
> > > > > > The new design looks good to me!
> > > > > >
> > > > > > Best,
> > > > > > Jark
> > > > > >
> > > > > > > 2023年6月27日 14:38,Dong Lin  写道:
> > > > > > >
> > > > > > > Thank you Leonard for the review!
> > > > > > >
> > > > > > > Hi Piotr, do you have any comments on the latest proposal?
> > > > > > >
> > > > > > > I am wondering if it is OK to start the voting thread this
> week.
> > > > > > >
> > > > > > > On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu 
> > > > wrote:
> > > > > > >
> > > > > > >> Thanks Dong for driving this FLIP forward!
> > > > > > >>
> > > > > > >> Introducing  `backlog status` concept for flink job makes
> sense
> > to
> > > > me as
> > > > > > >> following reasons:
> > > > > > >>
> > > > > > >> From concept/API design perspective, it’s more general and
> > natural
> > > > than
> > > > > > >> above proposals as it can be used in HybridSource for bounded
> > > > records, CDC
> > > > > > >> Source for history snapshot and general sources like
> KafkaSource
> > > for
> > > > > > >> historical messages.
> > > > > > >>
> > > > > > >> From user cases/requirements, I’ve seen many users manually to
> > set
> > > > larger
> > > > > > >> checkpoint interval during backfilling and then set a shorter
> > > > checkpoint
> > > > > > >> interval for real-time processing in their production
> > environments
> > > > as a
> > > > > > >> flink application optimization. Now, the flink framework can
> > make
> > > > this
> > > > > > >> optimization no longer require the user to set the checkpoint
> > > > interval and
> > > > > > >> restart the job multiple times.
> > > > > > >>
> > > > > > >> Following supporting using larger checkpoint for job under
> > backlog
> > > > status
> > > > > > >> in current FLIP, we can explore supporting larger
> > > > parallelism/memory/cpu
> > > > > > >> for job under backlog status in the future.
> > > > > > >>
> > > > > 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-28 Thread feng xiangyu
Hi Dong and Yunfeng,

Thanks for the proposal, your flip sounds very useful from my perspective.
In our business, when we using hybrid source in production we also met the
problem described in your flip.
In our solution, we tend to skip making any checkpoints before all batch
tasks have finished and resume the periodic checkpoint only in streaming
phrase. Within this flip, we can solve our problem in a more generic way.

However, I am wondering if we still want to skip making any checkpoints
during historical phrase, can we set this configuration
"execution.checkpointing.interval-during-backlog" equals "-1" to cover this
case?

Best,
Xiangyu

Hang Ruan  于2023年6月28日周三 16:30写道:

> Thanks for Dong and Yunfeng's work.
>
> The FLIP looks good to me. This new version is clearer to understand.
>
> Best,
> Hang
>
> Dong Lin  于2023年6月27日周二 16:53写道:
>
> > Thanks Jack, Jingsong, and Zhu for the review!
> >
> > Thanks Zhu for the suggestion. I have updated the configuration name as
> > suggested.
> >
> > On Tue, Jun 27, 2023 at 4:45 PM Zhu Zhu  wrote:
> >
> > > Thanks Dong and Yunfeng for creating this FLIP and driving this
> > discussion.
> > >
> > > The new design looks generally good to me. Increasing the checkpoint
> > > interval when the job is processing backlogs is easier for users to
> > > understand and can help in more scenarios.
> > >
> > > I have one comment about the new configuration.
> > > Naming the new configuration
> > > "execution.checkpointing.interval-during-backlog" would be better
> > > according to Flink config naming convention.
> > > It is also because that nested config keys should be avoided. See
> > > FLINK-29372 for more details.
> > >
> > > Thanks,
> > > Zhu
> > >
> > > Jingsong Li  于2023年6月27日周二 15:45写道:
> > > >
> > > > Looks good to me!
> > > >
> > > > Thanks Dong, Yunfeng and all for your discussion and design.
> > > >
> > > > Best,
> > > > Jingsong
> > > >
> > > > On Tue, Jun 27, 2023 at 3:35 PM Jark Wu  wrote:
> > > > >
> > > > > Thank you Dong for driving this FLIP.
> > > > >
> > > > > The new design looks good to me!
> > > > >
> > > > > Best,
> > > > > Jark
> > > > >
> > > > > > 2023年6月27日 14:38,Dong Lin  写道:
> > > > > >
> > > > > > Thank you Leonard for the review!
> > > > > >
> > > > > > Hi Piotr, do you have any comments on the latest proposal?
> > > > > >
> > > > > > I am wondering if it is OK to start the voting thread this week.
> > > > > >
> > > > > > On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu 
> > > wrote:
> > > > > >
> > > > > >> Thanks Dong for driving this FLIP forward!
> > > > > >>
> > > > > >> Introducing  `backlog status` concept for flink job makes sense
> to
> > > me as
> > > > > >> following reasons:
> > > > > >>
> > > > > >> From concept/API design perspective, it’s more general and
> natural
> > > than
> > > > > >> above proposals as it can be used in HybridSource for bounded
> > > records, CDC
> > > > > >> Source for history snapshot and general sources like KafkaSource
> > for
> > > > > >> historical messages.
> > > > > >>
> > > > > >> From user cases/requirements, I’ve seen many users manually to
> set
> > > larger
> > > > > >> checkpoint interval during backfilling and then set a shorter
> > > checkpoint
> > > > > >> interval for real-time processing in their production
> environments
> > > as a
> > > > > >> flink application optimization. Now, the flink framework can
> make
> > > this
> > > > > >> optimization no longer require the user to set the checkpoint
> > > interval and
> > > > > >> restart the job multiple times.
> > > > > >>
> > > > > >> Following supporting using larger checkpoint for job under
> backlog
> > > status
> > > > > >> in current FLIP, we can explore supporting larger
> > > parallelism/memory/cpu
> > > > > >> for job under backlog status in the future.
> > > > > >>
> > > > > >> In short, the updated FLIP looks good to me.
> > > > > >>
> > > > > >>
> > > > > >> Best,
> > > > > >> Leonard
> > > > > >>
> > > > > >>
> > > > > >>> On Jun 22, 2023, at 12:07 PM, Dong Lin 
> > > wrote:
> > > > > >>>
> > > > > >>> Hi Piotr,
> > > > > >>>
> > > > > >>> Thanks again for proposing the isProcessingBacklog concept.
> > > > > >>>
> > > > > >>> After discussing with Becket Qin and thinking about this more,
> I
> > > agree it
> > > > > >>> is a better idea to add a top-level concept to all source
> > > operators to
> > > > > >>> address the target use-case.
> > > > > >>>
> > > > > >>> The main reason that changed my mind is that
> isProcessingBacklog
> > > can be
> > > > > >>> described as an inherent/nature attribute of every source
> > instance
> > > and
> > > > > >> its
> > > > > >>> semantics does not need to depend on any specific checkpointing
> > > policy.
> > > > > >>> Also, we can hardcode the isProcessingBacklog behavior for the
> > > sources we
> > > > > >>> have considered so far (e.g. HybridSource and MySQL CDC source)
> > > without
> > > > > >>> asking users to explicitly configure the per-source behavior,
> > which
> > > > > >> indeed
> 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-28 Thread Hang Ruan
Thanks for Dong and Yunfeng's work.

The FLIP looks good to me. This new version is clearer to understand.

Best,
Hang

Dong Lin  于2023年6月27日周二 16:53写道:

> Thanks Jack, Jingsong, and Zhu for the review!
>
> Thanks Zhu for the suggestion. I have updated the configuration name as
> suggested.
>
> On Tue, Jun 27, 2023 at 4:45 PM Zhu Zhu  wrote:
>
> > Thanks Dong and Yunfeng for creating this FLIP and driving this
> discussion.
> >
> > The new design looks generally good to me. Increasing the checkpoint
> > interval when the job is processing backlogs is easier for users to
> > understand and can help in more scenarios.
> >
> > I have one comment about the new configuration.
> > Naming the new configuration
> > "execution.checkpointing.interval-during-backlog" would be better
> > according to Flink config naming convention.
> > It is also because that nested config keys should be avoided. See
> > FLINK-29372 for more details.
> >
> > Thanks,
> > Zhu
> >
> > Jingsong Li  于2023年6月27日周二 15:45写道:
> > >
> > > Looks good to me!
> > >
> > > Thanks Dong, Yunfeng and all for your discussion and design.
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Tue, Jun 27, 2023 at 3:35 PM Jark Wu  wrote:
> > > >
> > > > Thank you Dong for driving this FLIP.
> > > >
> > > > The new design looks good to me!
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > > > 2023年6月27日 14:38,Dong Lin  写道:
> > > > >
> > > > > Thank you Leonard for the review!
> > > > >
> > > > > Hi Piotr, do you have any comments on the latest proposal?
> > > > >
> > > > > I am wondering if it is OK to start the voting thread this week.
> > > > >
> > > > > On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu 
> > wrote:
> > > > >
> > > > >> Thanks Dong for driving this FLIP forward!
> > > > >>
> > > > >> Introducing  `backlog status` concept for flink job makes sense to
> > me as
> > > > >> following reasons:
> > > > >>
> > > > >> From concept/API design perspective, it’s more general and natural
> > than
> > > > >> above proposals as it can be used in HybridSource for bounded
> > records, CDC
> > > > >> Source for history snapshot and general sources like KafkaSource
> for
> > > > >> historical messages.
> > > > >>
> > > > >> From user cases/requirements, I’ve seen many users manually to set
> > larger
> > > > >> checkpoint interval during backfilling and then set a shorter
> > checkpoint
> > > > >> interval for real-time processing in their production environments
> > as a
> > > > >> flink application optimization. Now, the flink framework can make
> > this
> > > > >> optimization no longer require the user to set the checkpoint
> > interval and
> > > > >> restart the job multiple times.
> > > > >>
> > > > >> Following supporting using larger checkpoint for job under backlog
> > status
> > > > >> in current FLIP, we can explore supporting larger
> > parallelism/memory/cpu
> > > > >> for job under backlog status in the future.
> > > > >>
> > > > >> In short, the updated FLIP looks good to me.
> > > > >>
> > > > >>
> > > > >> Best,
> > > > >> Leonard
> > > > >>
> > > > >>
> > > > >>> On Jun 22, 2023, at 12:07 PM, Dong Lin 
> > wrote:
> > > > >>>
> > > > >>> Hi Piotr,
> > > > >>>
> > > > >>> Thanks again for proposing the isProcessingBacklog concept.
> > > > >>>
> > > > >>> After discussing with Becket Qin and thinking about this more, I
> > agree it
> > > > >>> is a better idea to add a top-level concept to all source
> > operators to
> > > > >>> address the target use-case.
> > > > >>>
> > > > >>> The main reason that changed my mind is that isProcessingBacklog
> > can be
> > > > >>> described as an inherent/nature attribute of every source
> instance
> > and
> > > > >> its
> > > > >>> semantics does not need to depend on any specific checkpointing
> > policy.
> > > > >>> Also, we can hardcode the isProcessingBacklog behavior for the
> > sources we
> > > > >>> have considered so far (e.g. HybridSource and MySQL CDC source)
> > without
> > > > >>> asking users to explicitly configure the per-source behavior,
> which
> > > > >> indeed
> > > > >>> provides better user experience.
> > > > >>>
> > > > >>> I have updated the FLIP based on the latest suggestions. The
> > latest FLIP
> > > > >> no
> > > > >>> longer introduces per-source config that can be used by
> end-users.
> > While
> > > > >> I
> > > > >>> agree with you that CheckpointTrigger can be a useful feature to
> > address
> > > > >>> additional use-cases, I am not sure it is necessary for the
> > use-case
> > > > >>> targeted by FLIP-309. Maybe we can introduce CheckpointTrigger
> > separately
> > > > >>> in another FLIP?
> > > > >>>
> > > > >>> Can you help take another look at the updated FLIP?
> > > > >>>
> > > > >>> Best,
> > > > >>> Dong
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski <
> > pnowoj...@apache.org>
> > > > >>> wrote:
> > > > >>>
> > > >  Hi Dong,
> > > > 
> > > > > Suppose there are 1000 subtask and each subtask has 1% chance
> of
> > 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-27 Thread Dong Lin
Thanks Jack, Jingsong, and Zhu for the review!

Thanks Zhu for the suggestion. I have updated the configuration name as
suggested.

On Tue, Jun 27, 2023 at 4:45 PM Zhu Zhu  wrote:

> Thanks Dong and Yunfeng for creating this FLIP and driving this discussion.
>
> The new design looks generally good to me. Increasing the checkpoint
> interval when the job is processing backlogs is easier for users to
> understand and can help in more scenarios.
>
> I have one comment about the new configuration.
> Naming the new configuration
> "execution.checkpointing.interval-during-backlog" would be better
> according to Flink config naming convention.
> It is also because that nested config keys should be avoided. See
> FLINK-29372 for more details.
>
> Thanks,
> Zhu
>
> Jingsong Li  于2023年6月27日周二 15:45写道:
> >
> > Looks good to me!
> >
> > Thanks Dong, Yunfeng and all for your discussion and design.
> >
> > Best,
> > Jingsong
> >
> > On Tue, Jun 27, 2023 at 3:35 PM Jark Wu  wrote:
> > >
> > > Thank you Dong for driving this FLIP.
> > >
> > > The new design looks good to me!
> > >
> > > Best,
> > > Jark
> > >
> > > > 2023年6月27日 14:38,Dong Lin  写道:
> > > >
> > > > Thank you Leonard for the review!
> > > >
> > > > Hi Piotr, do you have any comments on the latest proposal?
> > > >
> > > > I am wondering if it is OK to start the voting thread this week.
> > > >
> > > > On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu 
> wrote:
> > > >
> > > >> Thanks Dong for driving this FLIP forward!
> > > >>
> > > >> Introducing  `backlog status` concept for flink job makes sense to
> me as
> > > >> following reasons:
> > > >>
> > > >> From concept/API design perspective, it’s more general and natural
> than
> > > >> above proposals as it can be used in HybridSource for bounded
> records, CDC
> > > >> Source for history snapshot and general sources like KafkaSource for
> > > >> historical messages.
> > > >>
> > > >> From user cases/requirements, I’ve seen many users manually to set
> larger
> > > >> checkpoint interval during backfilling and then set a shorter
> checkpoint
> > > >> interval for real-time processing in their production environments
> as a
> > > >> flink application optimization. Now, the flink framework can make
> this
> > > >> optimization no longer require the user to set the checkpoint
> interval and
> > > >> restart the job multiple times.
> > > >>
> > > >> Following supporting using larger checkpoint for job under backlog
> status
> > > >> in current FLIP, we can explore supporting larger
> parallelism/memory/cpu
> > > >> for job under backlog status in the future.
> > > >>
> > > >> In short, the updated FLIP looks good to me.
> > > >>
> > > >>
> > > >> Best,
> > > >> Leonard
> > > >>
> > > >>
> > > >>> On Jun 22, 2023, at 12:07 PM, Dong Lin 
> wrote:
> > > >>>
> > > >>> Hi Piotr,
> > > >>>
> > > >>> Thanks again for proposing the isProcessingBacklog concept.
> > > >>>
> > > >>> After discussing with Becket Qin and thinking about this more, I
> agree it
> > > >>> is a better idea to add a top-level concept to all source
> operators to
> > > >>> address the target use-case.
> > > >>>
> > > >>> The main reason that changed my mind is that isProcessingBacklog
> can be
> > > >>> described as an inherent/nature attribute of every source instance
> and
> > > >> its
> > > >>> semantics does not need to depend on any specific checkpointing
> policy.
> > > >>> Also, we can hardcode the isProcessingBacklog behavior for the
> sources we
> > > >>> have considered so far (e.g. HybridSource and MySQL CDC source)
> without
> > > >>> asking users to explicitly configure the per-source behavior, which
> > > >> indeed
> > > >>> provides better user experience.
> > > >>>
> > > >>> I have updated the FLIP based on the latest suggestions. The
> latest FLIP
> > > >> no
> > > >>> longer introduces per-source config that can be used by end-users.
> While
> > > >> I
> > > >>> agree with you that CheckpointTrigger can be a useful feature to
> address
> > > >>> additional use-cases, I am not sure it is necessary for the
> use-case
> > > >>> targeted by FLIP-309. Maybe we can introduce CheckpointTrigger
> separately
> > > >>> in another FLIP?
> > > >>>
> > > >>> Can you help take another look at the updated FLIP?
> > > >>>
> > > >>> Best,
> > > >>> Dong
> > > >>>
> > > >>>
> > > >>>
> > > >>> On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski <
> pnowoj...@apache.org>
> > > >>> wrote:
> > > >>>
> > >  Hi Dong,
> > > 
> > > > Suppose there are 1000 subtask and each subtask has 1% chance of
> being
> > > > "backpressured" at a given time (due to random traffic spikes).
> Then at
> > >  any
> > > > given time, the chance of the job
> > > > being considered not-backpressured = (1-0.01)^1000. Since we
> evaluate
> > > >> the
> > > > backpressure metric once a second, the estimated time for the job
> > > > to be considered not-backpressured is roughly 1 /
> ((1-0.01)^1000) =
> > > >> 23163
> > > > sec = 6.4 hours.
> > > 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-27 Thread Zhu Zhu
Thanks Dong and Yunfeng for creating this FLIP and driving this discussion.

The new design looks generally good to me. Increasing the checkpoint
interval when the job is processing backlogs is easier for users to
understand and can help in more scenarios.

I have one comment about the new configuration.
Naming the new configuration
"execution.checkpointing.interval-during-backlog" would be better
according to Flink config naming convention.
It is also because that nested config keys should be avoided. See
FLINK-29372 for more details.

Thanks,
Zhu

Jingsong Li  于2023年6月27日周二 15:45写道:
>
> Looks good to me!
>
> Thanks Dong, Yunfeng and all for your discussion and design.
>
> Best,
> Jingsong
>
> On Tue, Jun 27, 2023 at 3:35 PM Jark Wu  wrote:
> >
> > Thank you Dong for driving this FLIP.
> >
> > The new design looks good to me!
> >
> > Best,
> > Jark
> >
> > > 2023年6月27日 14:38,Dong Lin  写道:
> > >
> > > Thank you Leonard for the review!
> > >
> > > Hi Piotr, do you have any comments on the latest proposal?
> > >
> > > I am wondering if it is OK to start the voting thread this week.
> > >
> > > On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu  wrote:
> > >
> > >> Thanks Dong for driving this FLIP forward!
> > >>
> > >> Introducing  `backlog status` concept for flink job makes sense to me as
> > >> following reasons:
> > >>
> > >> From concept/API design perspective, it’s more general and natural than
> > >> above proposals as it can be used in HybridSource for bounded records, 
> > >> CDC
> > >> Source for history snapshot and general sources like KafkaSource for
> > >> historical messages.
> > >>
> > >> From user cases/requirements, I’ve seen many users manually to set larger
> > >> checkpoint interval during backfilling and then set a shorter checkpoint
> > >> interval for real-time processing in their production environments as a
> > >> flink application optimization. Now, the flink framework can make this
> > >> optimization no longer require the user to set the checkpoint interval 
> > >> and
> > >> restart the job multiple times.
> > >>
> > >> Following supporting using larger checkpoint for job under backlog status
> > >> in current FLIP, we can explore supporting larger parallelism/memory/cpu
> > >> for job under backlog status in the future.
> > >>
> > >> In short, the updated FLIP looks good to me.
> > >>
> > >>
> > >> Best,
> > >> Leonard
> > >>
> > >>
> > >>> On Jun 22, 2023, at 12:07 PM, Dong Lin  wrote:
> > >>>
> > >>> Hi Piotr,
> > >>>
> > >>> Thanks again for proposing the isProcessingBacklog concept.
> > >>>
> > >>> After discussing with Becket Qin and thinking about this more, I agree 
> > >>> it
> > >>> is a better idea to add a top-level concept to all source operators to
> > >>> address the target use-case.
> > >>>
> > >>> The main reason that changed my mind is that isProcessingBacklog can be
> > >>> described as an inherent/nature attribute of every source instance and
> > >> its
> > >>> semantics does not need to depend on any specific checkpointing policy.
> > >>> Also, we can hardcode the isProcessingBacklog behavior for the sources 
> > >>> we
> > >>> have considered so far (e.g. HybridSource and MySQL CDC source) without
> > >>> asking users to explicitly configure the per-source behavior, which
> > >> indeed
> > >>> provides better user experience.
> > >>>
> > >>> I have updated the FLIP based on the latest suggestions. The latest FLIP
> > >> no
> > >>> longer introduces per-source config that can be used by end-users. While
> > >> I
> > >>> agree with you that CheckpointTrigger can be a useful feature to address
> > >>> additional use-cases, I am not sure it is necessary for the use-case
> > >>> targeted by FLIP-309. Maybe we can introduce CheckpointTrigger 
> > >>> separately
> > >>> in another FLIP?
> > >>>
> > >>> Can you help take another look at the updated FLIP?
> > >>>
> > >>> Best,
> > >>> Dong
> > >>>
> > >>>
> > >>>
> > >>> On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski 
> > >>> wrote:
> > >>>
> >  Hi Dong,
> > 
> > > Suppose there are 1000 subtask and each subtask has 1% chance of being
> > > "backpressured" at a given time (due to random traffic spikes). Then 
> > > at
> >  any
> > > given time, the chance of the job
> > > being considered not-backpressured = (1-0.01)^1000. Since we evaluate
> > >> the
> > > backpressure metric once a second, the estimated time for the job
> > > to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) =
> > >> 23163
> > > sec = 6.4 hours.
> > >
> > > This means that the job will effectively always use the longer
> > > checkpointing interval. It looks like a real concern, right?
> > 
> >  Sorry I don't understand where you are getting those numbers from.
> >  Instead of trying to find loophole after loophole, could you try to
> > >> think
> >  how a given loophole could be improved/solved?
> > 
> > > Hmm... I honestly think it will be useful to know 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-27 Thread Jingsong Li
Looks good to me!

Thanks Dong, Yunfeng and all for your discussion and design.

Best,
Jingsong

On Tue, Jun 27, 2023 at 3:35 PM Jark Wu  wrote:
>
> Thank you Dong for driving this FLIP.
>
> The new design looks good to me!
>
> Best,
> Jark
>
> > 2023年6月27日 14:38,Dong Lin  写道:
> >
> > Thank you Leonard for the review!
> >
> > Hi Piotr, do you have any comments on the latest proposal?
> >
> > I am wondering if it is OK to start the voting thread this week.
> >
> > On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu  wrote:
> >
> >> Thanks Dong for driving this FLIP forward!
> >>
> >> Introducing  `backlog status` concept for flink job makes sense to me as
> >> following reasons:
> >>
> >> From concept/API design perspective, it’s more general and natural than
> >> above proposals as it can be used in HybridSource for bounded records, CDC
> >> Source for history snapshot and general sources like KafkaSource for
> >> historical messages.
> >>
> >> From user cases/requirements, I’ve seen many users manually to set larger
> >> checkpoint interval during backfilling and then set a shorter checkpoint
> >> interval for real-time processing in their production environments as a
> >> flink application optimization. Now, the flink framework can make this
> >> optimization no longer require the user to set the checkpoint interval and
> >> restart the job multiple times.
> >>
> >> Following supporting using larger checkpoint for job under backlog status
> >> in current FLIP, we can explore supporting larger parallelism/memory/cpu
> >> for job under backlog status in the future.
> >>
> >> In short, the updated FLIP looks good to me.
> >>
> >>
> >> Best,
> >> Leonard
> >>
> >>
> >>> On Jun 22, 2023, at 12:07 PM, Dong Lin  wrote:
> >>>
> >>> Hi Piotr,
> >>>
> >>> Thanks again for proposing the isProcessingBacklog concept.
> >>>
> >>> After discussing with Becket Qin and thinking about this more, I agree it
> >>> is a better idea to add a top-level concept to all source operators to
> >>> address the target use-case.
> >>>
> >>> The main reason that changed my mind is that isProcessingBacklog can be
> >>> described as an inherent/nature attribute of every source instance and
> >> its
> >>> semantics does not need to depend on any specific checkpointing policy.
> >>> Also, we can hardcode the isProcessingBacklog behavior for the sources we
> >>> have considered so far (e.g. HybridSource and MySQL CDC source) without
> >>> asking users to explicitly configure the per-source behavior, which
> >> indeed
> >>> provides better user experience.
> >>>
> >>> I have updated the FLIP based on the latest suggestions. The latest FLIP
> >> no
> >>> longer introduces per-source config that can be used by end-users. While
> >> I
> >>> agree with you that CheckpointTrigger can be a useful feature to address
> >>> additional use-cases, I am not sure it is necessary for the use-case
> >>> targeted by FLIP-309. Maybe we can introduce CheckpointTrigger separately
> >>> in another FLIP?
> >>>
> >>> Can you help take another look at the updated FLIP?
> >>>
> >>> Best,
> >>> Dong
> >>>
> >>>
> >>>
> >>> On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski 
> >>> wrote:
> >>>
>  Hi Dong,
> 
> > Suppose there are 1000 subtask and each subtask has 1% chance of being
> > "backpressured" at a given time (due to random traffic spikes). Then at
>  any
> > given time, the chance of the job
> > being considered not-backpressured = (1-0.01)^1000. Since we evaluate
> >> the
> > backpressure metric once a second, the estimated time for the job
> > to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) =
> >> 23163
> > sec = 6.4 hours.
> >
> > This means that the job will effectively always use the longer
> > checkpointing interval. It looks like a real concern, right?
> 
>  Sorry I don't understand where you are getting those numbers from.
>  Instead of trying to find loophole after loophole, could you try to
> >> think
>  how a given loophole could be improved/solved?
> 
> > Hmm... I honestly think it will be useful to know the APIs due to the
> > following reasons.
> 
>  Please propose something. I don't think it's needed.
> 
> > - For the use-case mentioned in FLIP-309 motivation section, would the
>  APIs
> > of this alternative approach be more or less usable?
> 
>  Everything that you originally wanted to achieve in FLIP-309, you could
> >> do
>  as well in my proposal.
>  Vide my many mentions of the "hacky solution".
> 
> > - Can these APIs reliably address the extra use-case (e.g. allow
> > checkpointing interval to change dynamically even during the unbounded
> > phase) as it claims?
> 
>  I don't see why not.
> 
> > - Can these APIs be decoupled from the APIs currently proposed in
>  FLIP-309?
> 
>  Yes
> 
> > For example, if the APIs of this alternative approach can be decoupled

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-27 Thread Jark Wu
Thank you Dong for driving this FLIP. 

The new design looks good to me!

Best,
Jark

> 2023年6月27日 14:38,Dong Lin  写道:
> 
> Thank you Leonard for the review!
> 
> Hi Piotr, do you have any comments on the latest proposal?
> 
> I am wondering if it is OK to start the voting thread this week.
> 
> On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu  wrote:
> 
>> Thanks Dong for driving this FLIP forward!
>> 
>> Introducing  `backlog status` concept for flink job makes sense to me as
>> following reasons:
>> 
>> From concept/API design perspective, it’s more general and natural than
>> above proposals as it can be used in HybridSource for bounded records, CDC
>> Source for history snapshot and general sources like KafkaSource for
>> historical messages.
>> 
>> From user cases/requirements, I’ve seen many users manually to set larger
>> checkpoint interval during backfilling and then set a shorter checkpoint
>> interval for real-time processing in their production environments as a
>> flink application optimization. Now, the flink framework can make this
>> optimization no longer require the user to set the checkpoint interval and
>> restart the job multiple times.
>> 
>> Following supporting using larger checkpoint for job under backlog status
>> in current FLIP, we can explore supporting larger parallelism/memory/cpu
>> for job under backlog status in the future.
>> 
>> In short, the updated FLIP looks good to me.
>> 
>> 
>> Best,
>> Leonard
>> 
>> 
>>> On Jun 22, 2023, at 12:07 PM, Dong Lin  wrote:
>>> 
>>> Hi Piotr,
>>> 
>>> Thanks again for proposing the isProcessingBacklog concept.
>>> 
>>> After discussing with Becket Qin and thinking about this more, I agree it
>>> is a better idea to add a top-level concept to all source operators to
>>> address the target use-case.
>>> 
>>> The main reason that changed my mind is that isProcessingBacklog can be
>>> described as an inherent/nature attribute of every source instance and
>> its
>>> semantics does not need to depend on any specific checkpointing policy.
>>> Also, we can hardcode the isProcessingBacklog behavior for the sources we
>>> have considered so far (e.g. HybridSource and MySQL CDC source) without
>>> asking users to explicitly configure the per-source behavior, which
>> indeed
>>> provides better user experience.
>>> 
>>> I have updated the FLIP based on the latest suggestions. The latest FLIP
>> no
>>> longer introduces per-source config that can be used by end-users. While
>> I
>>> agree with you that CheckpointTrigger can be a useful feature to address
>>> additional use-cases, I am not sure it is necessary for the use-case
>>> targeted by FLIP-309. Maybe we can introduce CheckpointTrigger separately
>>> in another FLIP?
>>> 
>>> Can you help take another look at the updated FLIP?
>>> 
>>> Best,
>>> Dong
>>> 
>>> 
>>> 
>>> On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski 
>>> wrote:
>>> 
 Hi Dong,
 
> Suppose there are 1000 subtask and each subtask has 1% chance of being
> "backpressured" at a given time (due to random traffic spikes). Then at
 any
> given time, the chance of the job
> being considered not-backpressured = (1-0.01)^1000. Since we evaluate
>> the
> backpressure metric once a second, the estimated time for the job
> to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) =
>> 23163
> sec = 6.4 hours.
> 
> This means that the job will effectively always use the longer
> checkpointing interval. It looks like a real concern, right?
 
 Sorry I don't understand where you are getting those numbers from.
 Instead of trying to find loophole after loophole, could you try to
>> think
 how a given loophole could be improved/solved?
 
> Hmm... I honestly think it will be useful to know the APIs due to the
> following reasons.
 
 Please propose something. I don't think it's needed.
 
> - For the use-case mentioned in FLIP-309 motivation section, would the
 APIs
> of this alternative approach be more or less usable?
 
 Everything that you originally wanted to achieve in FLIP-309, you could
>> do
 as well in my proposal.
 Vide my many mentions of the "hacky solution".
 
> - Can these APIs reliably address the extra use-case (e.g. allow
> checkpointing interval to change dynamically even during the unbounded
> phase) as it claims?
 
 I don't see why not.
 
> - Can these APIs be decoupled from the APIs currently proposed in
 FLIP-309?
 
 Yes
 
> For example, if the APIs of this alternative approach can be decoupled
 from
> the APIs currently proposed in FLIP-309, then it might be reasonable to
> work on this extra use-case with a more advanced/complicated design
> separately in a followup work.
 
 As I voiced my concerns previously, the current design of FLIP-309 would
 clog the public API and in the long run confuse the users. IMO It's
 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-27 Thread Dong Lin
Thank you Leonard for the review!

Hi Piotr, do you have any comments on the latest proposal?

I am wondering if it is OK to start the voting thread this week.

On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu  wrote:

> Thanks Dong for driving this FLIP forward!
>
> Introducing  `backlog status` concept for flink job makes sense to me as
> following reasons:
>
> From concept/API design perspective, it’s more general and natural than
> above proposals as it can be used in HybridSource for bounded records, CDC
> Source for history snapshot and general sources like KafkaSource for
> historical messages.
>
> From user cases/requirements, I’ve seen many users manually to set larger
> checkpoint interval during backfilling and then set a shorter checkpoint
> interval for real-time processing in their production environments as a
> flink application optimization. Now, the flink framework can make this
> optimization no longer require the user to set the checkpoint interval and
> restart the job multiple times.
>
> Following supporting using larger checkpoint for job under backlog status
> in current FLIP, we can explore supporting larger parallelism/memory/cpu
> for job under backlog status in the future.
>
> In short, the updated FLIP looks good to me.
>
>
> Best,
> Leonard
>
>
> > On Jun 22, 2023, at 12:07 PM, Dong Lin  wrote:
> >
> > Hi Piotr,
> >
> > Thanks again for proposing the isProcessingBacklog concept.
> >
> > After discussing with Becket Qin and thinking about this more, I agree it
> > is a better idea to add a top-level concept to all source operators to
> > address the target use-case.
> >
> > The main reason that changed my mind is that isProcessingBacklog can be
> > described as an inherent/nature attribute of every source instance and
> its
> > semantics does not need to depend on any specific checkpointing policy.
> > Also, we can hardcode the isProcessingBacklog behavior for the sources we
> > have considered so far (e.g. HybridSource and MySQL CDC source) without
> > asking users to explicitly configure the per-source behavior, which
> indeed
> > provides better user experience.
> >
> > I have updated the FLIP based on the latest suggestions. The latest FLIP
> no
> > longer introduces per-source config that can be used by end-users. While
> I
> > agree with you that CheckpointTrigger can be a useful feature to address
> > additional use-cases, I am not sure it is necessary for the use-case
> > targeted by FLIP-309. Maybe we can introduce CheckpointTrigger separately
> > in another FLIP?
> >
> > Can you help take another look at the updated FLIP?
> >
> > Best,
> > Dong
> >
> >
> >
> > On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski 
> > wrote:
> >
> >> Hi Dong,
> >>
> >>> Suppose there are 1000 subtask and each subtask has 1% chance of being
> >>> "backpressured" at a given time (due to random traffic spikes). Then at
> >> any
> >>> given time, the chance of the job
> >>> being considered not-backpressured = (1-0.01)^1000. Since we evaluate
> the
> >>> backpressure metric once a second, the estimated time for the job
> >>> to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) =
> 23163
> >>> sec = 6.4 hours.
> >>>
> >>> This means that the job will effectively always use the longer
> >>> checkpointing interval. It looks like a real concern, right?
> >>
> >> Sorry I don't understand where you are getting those numbers from.
> >> Instead of trying to find loophole after loophole, could you try to
> think
> >> how a given loophole could be improved/solved?
> >>
> >>> Hmm... I honestly think it will be useful to know the APIs due to the
> >>> following reasons.
> >>
> >> Please propose something. I don't think it's needed.
> >>
> >>> - For the use-case mentioned in FLIP-309 motivation section, would the
> >> APIs
> >>> of this alternative approach be more or less usable?
> >>
> >> Everything that you originally wanted to achieve in FLIP-309, you could
> do
> >> as well in my proposal.
> >> Vide my many mentions of the "hacky solution".
> >>
> >>> - Can these APIs reliably address the extra use-case (e.g. allow
> >>> checkpointing interval to change dynamically even during the unbounded
> >>> phase) as it claims?
> >>
> >> I don't see why not.
> >>
> >>> - Can these APIs be decoupled from the APIs currently proposed in
> >> FLIP-309?
> >>
> >> Yes
> >>
> >>> For example, if the APIs of this alternative approach can be decoupled
> >> from
> >>> the APIs currently proposed in FLIP-309, then it might be reasonable to
> >>> work on this extra use-case with a more advanced/complicated design
> >>> separately in a followup work.
> >>
> >> As I voiced my concerns previously, the current design of FLIP-309 would
> >> clog the public API and in the long run confuse the users. IMO It's
> >> addressing the
> >> problem in the wrong place.
> >>
> >>> Hmm.. do you mean we can do the following:
> >>> - Have all source operators emit a metric named "processingBacklog".
> >>> - Add a job-level config that 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-26 Thread Leonard Xu
Thanks Dong for driving this FLIP forward!

Introducing  `backlog status` concept for flink job makes sense to me as 
following reasons:

From concept/API design perspective, it’s more general and natural than above 
proposals as it can be used in HybridSource for bounded records, CDC Source for 
history snapshot and general sources like KafkaSource for historical messages.  

From user cases/requirements, I’ve seen many users manually to set larger 
checkpoint interval during backfilling and then set a shorter checkpoint 
interval for real-time processing in their production environments as a flink 
application optimization. Now, the flink framework can make this optimization 
no longer require the user to set the checkpoint interval and restart the job 
multiple times.

Following supporting using larger checkpoint for job under backlog status in 
current FLIP, we can explore supporting larger parallelism/memory/cpu for job 
under backlog status in the future.  

In short, the updated FLIP looks good to me.


Best,
Leonard


> On Jun 22, 2023, at 12:07 PM, Dong Lin  wrote:
> 
> Hi Piotr,
> 
> Thanks again for proposing the isProcessingBacklog concept.
> 
> After discussing with Becket Qin and thinking about this more, I agree it
> is a better idea to add a top-level concept to all source operators to
> address the target use-case.
> 
> The main reason that changed my mind is that isProcessingBacklog can be
> described as an inherent/nature attribute of every source instance and its
> semantics does not need to depend on any specific checkpointing policy.
> Also, we can hardcode the isProcessingBacklog behavior for the sources we
> have considered so far (e.g. HybridSource and MySQL CDC source) without
> asking users to explicitly configure the per-source behavior, which indeed
> provides better user experience.
> 
> I have updated the FLIP based on the latest suggestions. The latest FLIP no
> longer introduces per-source config that can be used by end-users. While I
> agree with you that CheckpointTrigger can be a useful feature to address
> additional use-cases, I am not sure it is necessary for the use-case
> targeted by FLIP-309. Maybe we can introduce CheckpointTrigger separately
> in another FLIP?
> 
> Can you help take another look at the updated FLIP?
> 
> Best,
> Dong
> 
> 
> 
> On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski 
> wrote:
> 
>> Hi Dong,
>> 
>>> Suppose there are 1000 subtask and each subtask has 1% chance of being
>>> "backpressured" at a given time (due to random traffic spikes). Then at
>> any
>>> given time, the chance of the job
>>> being considered not-backpressured = (1-0.01)^1000. Since we evaluate the
>>> backpressure metric once a second, the estimated time for the job
>>> to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) = 23163
>>> sec = 6.4 hours.
>>> 
>>> This means that the job will effectively always use the longer
>>> checkpointing interval. It looks like a real concern, right?
>> 
>> Sorry I don't understand where you are getting those numbers from.
>> Instead of trying to find loophole after loophole, could you try to think
>> how a given loophole could be improved/solved?
>> 
>>> Hmm... I honestly think it will be useful to know the APIs due to the
>>> following reasons.
>> 
>> Please propose something. I don't think it's needed.
>> 
>>> - For the use-case mentioned in FLIP-309 motivation section, would the
>> APIs
>>> of this alternative approach be more or less usable?
>> 
>> Everything that you originally wanted to achieve in FLIP-309, you could do
>> as well in my proposal.
>> Vide my many mentions of the "hacky solution".
>> 
>>> - Can these APIs reliably address the extra use-case (e.g. allow
>>> checkpointing interval to change dynamically even during the unbounded
>>> phase) as it claims?
>> 
>> I don't see why not.
>> 
>>> - Can these APIs be decoupled from the APIs currently proposed in
>> FLIP-309?
>> 
>> Yes
>> 
>>> For example, if the APIs of this alternative approach can be decoupled
>> from
>>> the APIs currently proposed in FLIP-309, then it might be reasonable to
>>> work on this extra use-case with a more advanced/complicated design
>>> separately in a followup work.
>> 
>> As I voiced my concerns previously, the current design of FLIP-309 would
>> clog the public API and in the long run confuse the users. IMO It's
>> addressing the
>> problem in the wrong place.
>> 
>>> Hmm.. do you mean we can do the following:
>>> - Have all source operators emit a metric named "processingBacklog".
>>> - Add a job-level config that specifies "the checkpointing interval to be
>>> used when any source is processing backlog".
>>> - The JM collects the "processingBacklog" periodically from all source
>>> operators and uses the newly added config value as appropriate.
>> 
>> Yes.
>> 
>>> The challenge with this approach is that we need to define the semantics
>> of
>>> this "processingBacklog" metric and have all source operators
>>> implement this 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-21 Thread Dong Lin
Hi Piotr,

Thanks again for proposing the isProcessingBacklog concept.

After discussing with Becket Qin and thinking about this more, I agree it
is a better idea to add a top-level concept to all source operators to
address the target use-case.

The main reason that changed my mind is that isProcessingBacklog can be
described as an inherent/nature attribute of every source instance and its
semantics does not need to depend on any specific checkpointing policy.
Also, we can hardcode the isProcessingBacklog behavior for the sources we
have considered so far (e.g. HybridSource and MySQL CDC source) without
asking users to explicitly configure the per-source behavior, which indeed
provides better user experience.

I have updated the FLIP based on the latest suggestions. The latest FLIP no
longer introduces per-source config that can be used by end-users. While I
agree with you that CheckpointTrigger can be a useful feature to address
additional use-cases, I am not sure it is necessary for the use-case
targeted by FLIP-309. Maybe we can introduce CheckpointTrigger separately
in another FLIP?

Can you help take another look at the updated FLIP?

Best,
Dong



On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski 
wrote:

> Hi Dong,
>
> > Suppose there are 1000 subtask and each subtask has 1% chance of being
> > "backpressured" at a given time (due to random traffic spikes). Then at
> any
> > given time, the chance of the job
> > being considered not-backpressured = (1-0.01)^1000. Since we evaluate the
> > backpressure metric once a second, the estimated time for the job
> > to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) = 23163
> > sec = 6.4 hours.
> >
> > This means that the job will effectively always use the longer
> > checkpointing interval. It looks like a real concern, right?
>
> Sorry I don't understand where you are getting those numbers from.
> Instead of trying to find loophole after loophole, could you try to think
> how a given loophole could be improved/solved?
>
> > Hmm... I honestly think it will be useful to know the APIs due to the
> > following reasons.
>
> Please propose something. I don't think it's needed.
>
> > - For the use-case mentioned in FLIP-309 motivation section, would the
> APIs
> > of this alternative approach be more or less usable?
>
> Everything that you originally wanted to achieve in FLIP-309, you could do
> as well in my proposal.
> Vide my many mentions of the "hacky solution".
>
> > - Can these APIs reliably address the extra use-case (e.g. allow
> > checkpointing interval to change dynamically even during the unbounded
> > phase) as it claims?
>
> I don't see why not.
>
> > - Can these APIs be decoupled from the APIs currently proposed in
> FLIP-309?
>
> Yes
>
> > For example, if the APIs of this alternative approach can be decoupled
> from
> > the APIs currently proposed in FLIP-309, then it might be reasonable to
> > work on this extra use-case with a more advanced/complicated design
> > separately in a followup work.
>
> As I voiced my concerns previously, the current design of FLIP-309 would
> clog the public API and in the long run confuse the users. IMO It's
> addressing the
> problem in the wrong place.
>
> > Hmm.. do you mean we can do the following:
> > - Have all source operators emit a metric named "processingBacklog".
> > - Add a job-level config that specifies "the checkpointing interval to be
> > used when any source is processing backlog".
> > - The JM collects the "processingBacklog" periodically from all source
> > operators and uses the newly added config value as appropriate.
>
> Yes.
>
> > The challenge with this approach is that we need to define the semantics
> of
> > this "processingBacklog" metric and have all source operators
> > implement this metric. I am not sure we are able to do this yet without
> > having users explicitly provide this information on a per-source basis.
> >
> > Suppose the job read from a bounded Kafka source, should it emit
> > "processingBacklog=true"? If yes, then the job might use long
> checkpointing
> > interval even
> > if the job is asked to process data starting from now to the next 1 hour.
> > If no, then the job might use the short checkpointing interval
> > even if the job is asked to re-process data starting from 7 days ago.
>
> Yes. The same can be said of your proposal. Your proposal has the very same
> issues
> that every source would have to implement it differently, most sources
> would
> have no idea how to properly calculate the new requested checkpoint
> interval,
> for those that do know how to do that, user would have to configure every
> source
> individually and yet again we would end up with a system, that works only
> partially in
> some special use cases (HybridSource), that's confusing the users even
> more.
>
> That's why I think the more generic solution, working primarily on the same
> metrics that are used by various auto scaling solutions (like Flink K8s
> operator's
> 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-16 Thread Dong Lin
Hi Piotr,

Thanks for the reply. Please see my comments inline.

On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski 
wrote:

> Hi Dong,
>
> > Suppose there are 1000 subtask and each subtask has 1% chance of being
> > "backpressured" at a given time (due to random traffic spikes). Then at
> any
> > given time, the chance of the job
> > being considered not-backpressured = (1-0.01)^1000. Since we evaluate the
> > backpressure metric once a second, the estimated time for the job
> > to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) = 23163
> > sec = 6.4 hours.
> >
> > This means that the job will effectively always use the longer
> > checkpointing interval. It looks like a real concern, right?
>
> Sorry I don't understand where you are getting those numbers from.
> Instead of trying to find loophole after loophole, could you try to think
> how a given loophole could be improved/solved?
>

These numbers are fabricated. I guess the point is that the algorithm can
misbehave in some cases.

Honestly speaking, I indeed hope we can find a way to fix these loopholes
so that we can use one unified approach to address more use-cases than
FLIP-309. I have tried it but still can not find a way to fix these issues.
That leaves me with the option to clarify these loopholes so hopefully we
can agree to use another approach without loopholes for users who only need
the use-case targeted by  FLIP-309.

The solution in FLIP-309 would not suffer from these loophole in the target
use-case. Note that the "hacky" solution can work reliably. But I will have
questions regarding the semantics of related configs, which I will comment
below.


> > Hmm... I honestly think it will be useful to know the APIs due to the
> > following reasons.
>
> Please propose something. I don't think it's needed.


> > - For the use-case mentioned in FLIP-309 motivation section, would the
> APIs
> > of this alternative approach be more or less usable?
>
> Everything that you originally wanted to achieve in FLIP-309, you could do
> as well in my proposal.
> Vide my many mentions of the "hacky solution".
>
> > - Can these APIs reliably address the extra use-case (e.g. allow
> > checkpointing interval to change dynamically even during the unbounded
> > phase) as it claims?
>
> I don't see why not.
>

Hmm.. I thought we had discussed the loopholes that can make the algorithm
unreliable.

For example, the algorithm can use the long checkpointing interval with
high chance when the number of source subtasks is large, even when the job
can catch up with the input traffic overall. That means the data freshness
can be negatively impacted in such cases.

The "hacky" solution can work reliably. I will have questions regarding
the semantics of related configs, which I will comment below.


>
> > - Can these APIs be decoupled from the APIs currently proposed in
> FLIP-309?
>
> Yes
>
> > For example, if the APIs of this alternative approach can be decoupled
> from
> > the APIs currently proposed in FLIP-309, then it might be reasonable to
> > work on this extra use-case with a more advanced/complicated design
> > separately in a followup work.
>
> As I voiced my concerns previously, the current design of FLIP-309 would
> clog the public API and in the long run confuse the users. IMO It's
> addressing the
> problem in the wrong place.
>

I am not sure why the current design of FLIP-309 "clog" the public API.
Note that the only end-user-facing public API introduced by FLIP-309 is
the upperBoundCheckpointingIntervalForLastSource() API
on HybridSourceBuilder. That means only a small percentage of jobs which
use HybridSource might need to use this API.

The API added on SplitEnumeratorContext is used only by developer. And the
change of execution.checkpointing.interval is effectively a simplification
of this config.

So overall it seems that public APIs changes introduced by FLIP-309 is not
much and we should be able to extend it naturally to support the
checkpoint-trigger approach later.


> > Hmm.. do you mean we can do the following:
> > - Have all source operators emit a metric named "processingBacklog".
> > - Add a job-level config that specifies "the checkpointing interval to be
> > used when any source is processing backlog".
> > - The JM collects the "processingBacklog" periodically from all source
> > operators and uses the newly added config value as appropriate.
>
> Yes.
>
> > The challenge with this approach is that we need to define the semantics
> of
> > this "processingBacklog" metric and have all source operators
> > implement this metric. I am not sure we are able to do this yet without
> > having users explicitly provide this information on a per-source basis.
> >
> > Suppose the job read from a bounded Kafka source, should it emit
> > "processingBacklog=true"? If yes, then the job might use long
> checkpointing
> > interval even
> > if the job is asked to process data starting from now to the next 1 hour.
> > If no, then the job might use 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-16 Thread Piotr Nowojski
Hi Dong,

> Suppose there are 1000 subtask and each subtask has 1% chance of being
> "backpressured" at a given time (due to random traffic spikes). Then at
any
> given time, the chance of the job
> being considered not-backpressured = (1-0.01)^1000. Since we evaluate the
> backpressure metric once a second, the estimated time for the job
> to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) = 23163
> sec = 6.4 hours.
>
> This means that the job will effectively always use the longer
> checkpointing interval. It looks like a real concern, right?

Sorry I don't understand where you are getting those numbers from.
Instead of trying to find loophole after loophole, could you try to think
how a given loophole could be improved/solved?

> Hmm... I honestly think it will be useful to know the APIs due to the
> following reasons.

Please propose something. I don't think it's needed.

> - For the use-case mentioned in FLIP-309 motivation section, would the
APIs
> of this alternative approach be more or less usable?

Everything that you originally wanted to achieve in FLIP-309, you could do
as well in my proposal.
Vide my many mentions of the "hacky solution".

> - Can these APIs reliably address the extra use-case (e.g. allow
> checkpointing interval to change dynamically even during the unbounded
> phase) as it claims?

I don't see why not.

> - Can these APIs be decoupled from the APIs currently proposed in
FLIP-309?

Yes

> For example, if the APIs of this alternative approach can be decoupled
from
> the APIs currently proposed in FLIP-309, then it might be reasonable to
> work on this extra use-case with a more advanced/complicated design
> separately in a followup work.

As I voiced my concerns previously, the current design of FLIP-309 would
clog the public API and in the long run confuse the users. IMO It's
addressing the
problem in the wrong place.

> Hmm.. do you mean we can do the following:
> - Have all source operators emit a metric named "processingBacklog".
> - Add a job-level config that specifies "the checkpointing interval to be
> used when any source is processing backlog".
> - The JM collects the "processingBacklog" periodically from all source
> operators and uses the newly added config value as appropriate.

Yes.

> The challenge with this approach is that we need to define the semantics
of
> this "processingBacklog" metric and have all source operators
> implement this metric. I am not sure we are able to do this yet without
> having users explicitly provide this information on a per-source basis.
>
> Suppose the job read from a bounded Kafka source, should it emit
> "processingBacklog=true"? If yes, then the job might use long
checkpointing
> interval even
> if the job is asked to process data starting from now to the next 1 hour.
> If no, then the job might use the short checkpointing interval
> even if the job is asked to re-process data starting from 7 days ago.

Yes. The same can be said of your proposal. Your proposal has the very same
issues
that every source would have to implement it differently, most sources would
have no idea how to properly calculate the new requested checkpoint
interval,
for those that do know how to do that, user would have to configure every
source
individually and yet again we would end up with a system, that works only
partially in
some special use cases (HybridSource), that's confusing the users even more.

That's why I think the more generic solution, working primarily on the same
metrics that are used by various auto scaling solutions (like Flink K8s
operator's
autosaler) would be better. The hacky solution I proposed to:
1. show you that the generic solution is simply a superset of your proposal
2. if you are adamant that busyness/backpressured/records processing
rate/pending records
metrics wouldn't cover your use case sufficiently (imo they can), then
you can very easily
enhance this algorithm with using some hints from the sources. Like
"processingBacklog==true"
to short circuit the main algorithm, if `processingBacklog` is
available.

Best,
Piotrek


pt., 16 cze 2023 o 04:45 Dong Lin  napisał(a):

> Hi again Piotr,
>
> Thank you for the reply. Please see my reply inline.
>
> On Fri, Jun 16, 2023 at 12:11 AM Piotr Nowojski 
> wrote:
>
> > Hi again Dong,
> >
> > > I understand that JM will get the backpressure-related metrics every
> time
> > > the RestServerEndpoint receives the REST request to get these metrics.
> > But
> > > I am not sure if RestServerEndpoint is already always receiving the
> REST
> > > metrics at regular interval (suppose there is no human manually
> > > opening/clicking the Flink Web UI). And if it does, what is the
> interval?
> >
> > Good catch, I've thought that metrics are pre-emptively sent to JM every
> 10
> > seconds.
> > Indeed that's not the case at the moment, and that would have to be
> > improved.
> >
> > > I would be surprised if Flink is already paying this much overhead just
> > for
> > > 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-15 Thread Dong Lin
Hi again Piotr,

Thank you for the reply. Please see my reply inline.

On Fri, Jun 16, 2023 at 12:11 AM Piotr Nowojski 
wrote:

> Hi again Dong,
>
> > I understand that JM will get the backpressure-related metrics every time
> > the RestServerEndpoint receives the REST request to get these metrics.
> But
> > I am not sure if RestServerEndpoint is already always receiving the REST
> > metrics at regular interval (suppose there is no human manually
> > opening/clicking the Flink Web UI). And if it does, what is the interval?
>
> Good catch, I've thought that metrics are pre-emptively sent to JM every 10
> seconds.
> Indeed that's not the case at the moment, and that would have to be
> improved.
>
> > I would be surprised if Flink is already paying this much overhead just
> for
> > metrics monitoring. That is the main reason I still doubt it is true. Can
> > you show where this 100 ms is currently configured?
> >
> > Alternatively, maybe you mean that we should add extra code to invoke the
> > REST API at 100 ms interval. Then that means we need to considerably
> > increase the network/cpu overhead at JM, where the overhead will increase
> > as the number of TM/slots increase, which may pose risk to the
> scalability
> > of the proposed design. I am not sure we should do this. What do you
> think?
>
> Sorry. I didn't mean metric should be reported every 100ms. I meant that
> "backPressuredTimeMsPerSecond (metric) would report (a value of) 100ms/s."
> once per metric interval (10s?).
>

Suppose there are 1000 subtask and each subtask has 1% chance of being
"backpressured" at a given time (due to random traffic spikes). Then at any
given time, the chance of the job
being considered not-backpressured = (1-0.01)^1000. Since we evaluate the
backpressure metric once a second, the estimated time for the job
to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) = 23163
sec = 6.4 hours.

This means that the job will effectively always use the longer
checkpointing interval. It looks like a real concern, right?


> > - What is the interface of this CheckpointTrigger? For example, are we
> > going to give CheckpointTrigger a context that it can use to fetch
> > arbitrary metric values? This can help us understand what information
> this
> > user-defined CheckpointTrigger can use to make the checkpoint decision.
>
> I honestly don't think this is important at this stage of the discussion.
> It could have
> whatever interface we would deem to be best. Required things:
>
> - access to at least a subset of metrics that the given `CheckpointTrigger`
> requests,
>   for example via some registration mechanism, so we don't have to fetch
> all of the
>   metrics all the time from TMs.
> - some way to influence `CheckpointCoordinator`. Either via manually
> triggering
>   checkpoints, and/or ability to change the checkpointing interval.
>

Hmm... I honestly think it will be useful to know the APIs due to the
following reasons.

We would need to know the concrete APIs to gauge the following:
- For the use-case mentioned in FLIP-309 motivation section, would the APIs
of this alternative approach be more or less usable?
- Can these APIs reliably address the extra use-case (e.g. allow
checkpointing interval to change dynamically even during the unbounded
phase) as it claims?
- Can these APIs be decoupled from the APIs currently proposed in FLIP-309?

For example, if the APIs of this alternative approach can be decoupled from
the APIs currently proposed in FLIP-309, then it might be reasonable to
work on this extra use-case with a more advanced/complicated design
separately in a followup work.


> > - Where is this CheckpointTrigger running? For example, is it going to
> run
> > on the subtask of every source operator? Or is it going to run on the JM?
>
> IMO on the JM.
>
> > - Are we going to provide a default implementation of this
> > CheckpointTrigger in Flink that implements the algorithm described below,
> > or do we expect each source operator developer to implement their own
> > CheckpointTrigger?
>
> As I mentioned before, I think we should provide at the very least the
> implementation
> that replaces the current triggering mechanism (statically configured
> checkpointing interval)
> and it would be great to provide the backpressure monitoring trigger as
> well.
>

I agree that if there is a good use-case that can be addressed by the
proposed CheckpointTrigger, then it is reasonable
to add CheckpointTrigger and replace the current triggering mechanism with
it.

I also agree that we will likely find such a use-case. For example, suppose
the source records have event timestamps, then it is likely
that we can use the trigger to dynamically control the checkpointing
interval based on the difference between the watermark and current system
time.

But I am not sure the addition of this CheckpointTrigger should be coupled
with FLIP-309. Whether or not it is coupled probably depends on the
concrete API design around 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-15 Thread Piotr Nowojski
Hi again Dong,

> I understand that JM will get the backpressure-related metrics every time
> the RestServerEndpoint receives the REST request to get these metrics. But
> I am not sure if RestServerEndpoint is already always receiving the REST
> metrics at regular interval (suppose there is no human manually
> opening/clicking the Flink Web UI). And if it does, what is the interval?

Good catch, I've thought that metrics are pre-emptively sent to JM every 10
seconds.
Indeed that's not the case at the moment, and that would have to be
improved.

> I would be surprised if Flink is already paying this much overhead just
for
> metrics monitoring. That is the main reason I still doubt it is true. Can
> you show where this 100 ms is currently configured?
>
> Alternatively, maybe you mean that we should add extra code to invoke the
> REST API at 100 ms interval. Then that means we need to considerably
> increase the network/cpu overhead at JM, where the overhead will increase
> as the number of TM/slots increase, which may pose risk to the scalability
> of the proposed design. I am not sure we should do this. What do you
think?

Sorry. I didn't mean metric should be reported every 100ms. I meant that
"backPressuredTimeMsPerSecond (metric) would report (a value of) 100ms/s."
once per metric interval (10s?).

> - What is the interface of this CheckpointTrigger? For example, are we
> going to give CheckpointTrigger a context that it can use to fetch
> arbitrary metric values? This can help us understand what information this
> user-defined CheckpointTrigger can use to make the checkpoint decision.

I honestly don't think this is important at this stage of the discussion.
It could have
whatever interface we would deem to be best. Required things:

- access to at least a subset of metrics that the given `CheckpointTrigger`
requests,
  for example via some registration mechanism, so we don't have to fetch
all of the
  metrics all the time from TMs.
- some way to influence `CheckpointCoordinator`. Either via manually
triggering
  checkpoints, and/or ability to change the checkpointing interval.

> - Where is this CheckpointTrigger running? For example, is it going to run
> on the subtask of every source operator? Or is it going to run on the JM?

IMO on the JM.

> - Are we going to provide a default implementation of this
> CheckpointTrigger in Flink that implements the algorithm described below,
> or do we expect each source operator developer to implement their own
> CheckpointTrigger?

As I mentioned before, I think we should provide at the very least the
implementation
that replaces the current triggering mechanism (statically configured
checkpointing interval)
and it would be great to provide the backpressure monitoring trigger as
well.
If you would be adamant that the backpressure monitoring doesn't cover well
enough your use case, I would be ok to provide the hacky version that I
also mentioned
before:

"""
Especially that if my proposed algorithm wouldn't work good enough, there is
an obvious solution, that any source could add a metric, like let say
"processingBacklog: true/false", and the `CheckpointTrigger`
could use this as an override to always switch to the
"slowCheckpointInterval". I don't think we need it, but that's always an
option
that would be basically equivalent to your original proposal.
"""

> - How can users specify the fastCheckpointInterval/slowCheckpointInterval?
> For example, will we provide APIs on the CheckpointTrigger that end-users
> can use to specify the checkpointing interval? What would that look like?

Also as I mentioned before, just like metric reporters are configured:
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/metric_reporters/
Every CheckpointTrigger could have its own custom configuration.

> Overall, my gut feel is that the alternative approach based on
> CheckpointTrigger is more complicated

Yes, as usual, more generic things are more complicated, but often more
useful in the long run.

> and harder to use.

I don't agree. Why setting in config

execution.checkpointing.trigger: BackPressureMonitoringCheckpointTrigger
execution.checkpointing.BackPressureMonitoringCheckpointTrigger.fast-interval:
1s
execution.checkpointing.BackPressureMonitoringCheckpointTrigger.slow-interval:
30s

that we could even provide a shortcut to the above construct via:

execution.checkpointing.fast-interval: 1s
execution.checkpointing.slow-interval: 30s

is harder compared to setting two/three checkpoint intervals, one in the
config/or via `env.enableCheckpointing(x)`,
secondly passing one/two (fast/slow) values on the source itself?

> And it probably also has the issues of "having two places to configure
checkpointing
> interval" and "giving flexibility for every source to implement a
different
> API" (as mentioned below).

No, it doesn't.

> IMO, it is a hard-requirement for the user-facing API to be
> clearly defined and users should be able to use the API 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-02 Thread Dong Lin
Hi Piotr,

Thanks for the explanations. I have some followup questions below.

On Fri, Jun 2, 2023 at 10:55 PM Piotr Nowojski  wrote:

> Hi All,
>
> Thanks for chipping in the discussion Ahmed!
>
> Regarding using the REST API. Currently I'm leaning towards implementing
> this feature inside the Flink itself, via some pluggable interface.
> REST API solution would be tempting, but I guess not everyone is using
> Flink Kubernetes Operator.
>
> @Dong
>
> > I am not sure metrics such as isBackPressured are already sent to JM.
>
> Fetching code path on the JM:
>
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl#queryTmMetricsFuture
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore#add
>
> Example code path accessing Task level metrics via JM using the
> `MetricStore`:
>
> org.apache.flink.runtime.rest.handler.job.metrics.AggregatingSubtasksMetricsHandler
>

Thanks for the code reference. I checked the code that invoked these two
classes and found the following information:

- AggregatingSubtasksMetricsHandler#getStoresis currently invoked only
when AggregatingJobsMetricsHandler is invoked.
- AggregatingJobsMetricsHandler is only instantiated and returned by
WebMonitorEndpoint#initializeHandlers
- WebMonitorEndpoint#initializeHandlers is only used by RestServerEndpoint.
And RestServerEndpoint invokes these handlers in response to external REST
request.

I understand that JM will get the backpressure-related metrics every time
the RestServerEndpoint receives the REST request to get these metrics. But
I am not sure if RestServerEndpoint is already always receiving the REST
metrics at regular interval (suppose there is no human manually
opening/clicking the Flink Web UI). And if it does, what is the interval?



> > For example, let's say every source operator subtask reports this metric
> to
> > JM once every 10 seconds. There are 100 source subtasks. And each subtask
> > is backpressured roughly 10% of the total time due to traffic spikes (and
> > limited buffer). Then at any given time, there are 1 - 0.9^100 = 99.997%
> > chance that there is at least one subtask that is backpressured. Then we
> > have to wait for at least 10 seconds to check again.
>
> backPressuredTimeMsPerSecond and other related metrics (like
> busyTimeMsPerSecond) are not subject to that problem.
> They are recalculated once every metric fetching interval, and they report
> accurately on average the given subtask spent busy/idling/backpressured.
> In your example, backPressuredTimeMsPerSecond would report 100ms/s.


Suppose every subtask is already reporting backPressuredTimeMsPerSecond to
JM once every 100 ms. If a job has 10 operators (that are not chained) and
each operator has 100 subtasks, then JM would need to handle 1 requests
per second to receive metrics from these 1000 subtasks. It seems like a
non-trivial overhead for medium-to-large sized jobs and can make JM the
performance bottleneck during job execution.

I would be surprised if Flink is already paying this much overhead just for
metrics monitoring. That is the main reason I still doubt it is true. Can
you show where this 100 ms is currently configured?

Alternatively, maybe you mean that we should add extra code to invoke the
REST API at 100 ms interval. Then that means we need to considerably
increase the network/cpu overhead at JM, where the overhead will increase
as the number of TM/slots increase, which may pose risk to the scalability
of the proposed design. I am not sure we should do this. What do you think?


>
> > While it will be nice to support additional use-cases
> > with one proposal, it is probably also reasonable to make incremental
> > progress and support the low-hanging-fruit use-case first. The choice
> > really depends on the complexity and the importance of supporting the
> extra
> > use-cases.
>
> That would be true, if that was a private implementation detail or if the
> low-hanging-fruit-solution would be on the direct path to the final
> solution.
> That's unfortunately not the case here. This will add public facing API,
> that we will later need to maintain, no matter what the final solution will
> be,
> and at the moment at least I don't see it being related to a "perfect"
> solution.


Sure. Then let's decide the final solution first.


> > I guess the point is that the suggested approach, which dynamically
> > determines the checkpointing interval based on the backpressure, may
> cause
> > regression when the checkpointing interval is relatively low. This makes
> it
> > hard for users to enable this feature in production. It is like an
> > auto-driving system that is not guaranteed to work
>
> Yes, creating a more generic solution that would require less configuration
> is usually more difficult then static configurations.
> It doesn't mean we shouldn't try to do that. Especially that if my proposed
> algorithm wouldn't work good enough, there is
> an obvious solution, that any source could add a 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-02 Thread Piotr Nowojski
Hi All,

Thanks for chipping in the discussion Ahmed!

Regarding using the REST API. Currently I'm leaning towards implementing
this feature inside the Flink itself, via some pluggable interface.
REST API solution would be tempting, but I guess not everyone is using
Flink Kubernetes Operator.

@Dong

> I am not sure metrics such as isBackPressured are already sent to JM.

Fetching code path on the JM:
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl#queryTmMetricsFuture
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore#add

Example code path accessing Task level metrics via JM using the
`MetricStore`:
org.apache.flink.runtime.rest.handler.job.metrics.AggregatingSubtasksMetricsHandler

> For example, let's say every source operator subtask reports this metric
to
> JM once every 10 seconds. There are 100 source subtasks. And each subtask
> is backpressured roughly 10% of the total time due to traffic spikes (and
> limited buffer). Then at any given time, there are 1 - 0.9^100 = 99.997%
> chance that there is at least one subtask that is backpressured. Then we
> have to wait for at least 10 seconds to check again.

backPressuredTimeMsPerSecond and other related metrics (like
busyTimeMsPerSecond) are not subject to that problem.
They are recalculated once every metric fetching interval, and they report
accurately on average the given subtask spent busy/idling/backpressured.
In your example, backPressuredTimeMsPerSecond would report 100ms/s.

> While it will be nice to support additional use-cases
> with one proposal, it is probably also reasonable to make incremental
> progress and support the low-hanging-fruit use-case first. The choice
> really depends on the complexity and the importance of supporting the
extra
> use-cases.

That would be true, if that was a private implementation detail or if the
low-hanging-fruit-solution would be on the direct path to the final
solution.
That's unfortunately not the case here. This will add public facing API,
that we will later need to maintain, no matter what the final solution will
be,
and at the moment at least I don't see it being related to a "perfect"
solution.

> I guess the point is that the suggested approach, which dynamically
> determines the checkpointing interval based on the backpressure, may cause
> regression when the checkpointing interval is relatively low. This makes
it
> hard for users to enable this feature in production. It is like an
> auto-driving system that is not guaranteed to work

Yes, creating a more generic solution that would require less configuration
is usually more difficult then static configurations.
It doesn't mean we shouldn't try to do that. Especially that if my proposed
algorithm wouldn't work good enough, there is
an obvious solution, that any source could add a metric, like let say
"processingBacklog: true/false", and the `CheckpointTrigger`
could use this as an override to always switch to the
"slowCheckpointInterval". I don't think we need it, but that's always an
option
that would be basically equivalent to your original proposal. Or even
source could add "suggestedCheckpointInterval : int", and
`CheckpointTrigger` could use that value if present as a hint in one way or
another.

> On the other hand, the approach currently proposed in the FLIP is much
> simpler as it does not depend on backpressure. Users specify the extra
> interval requirement on the specific sources (e.g. HybridSource, MySQL CDC
> Source) and can easily know the checkpointing interval will be used on the
> continuous phase of the corresponding source. This is pretty much same as
> how users use the existing execution.checkpointing.interval config. So
> there is no extra concern of regression caused by this approach.

To an extent, but as I have already previously mentioned I really really do
not like idea of:
  - having two places to configure checkpointing interval (config file and
in the Source builders)
  - giving flexibility for every source to implement a different API for
that purpose
  - creating a solution that is not generic enough, so that we will need a
completely different mechanism in the future anyway

> Sounds good. Looking forward to learning more ideas.

I have thought about this a bit more, and I think we don't need to check
for the backpressure status, or how much overloaded all of the operators
are.
We could just check three things for source operators:
1. pendingRecords (backlog length)
2. numRecordsInPerSecond
3. backPressuredTimeMsPerSecond

// int metricsUpdateInterval = 10s // obtained from config
// Next line calculates how many records can we consume from the backlog,
assuming
// that magically the reason behind a backpressure vanishes. We will use
this only as
// a safeguard  against scenarios like for example if backpressure was
caused by some
// intermittent failure/performance degradation.
maxRecordsConsumedWithoutBackpressure = (numRecordsInPerSecond / (1000
- 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-02 Thread Ahmed Hamdy
Hi Dong,
Thanks for the quick reply and for clarification, yeah that makes sense!
Best Regards,
Ahmed Hamdy

On Fri, 2 Jun 2023 at 02:59, Dong Lin  wrote:

> Hi Ahmed,
>
> Thanks for the comments.
>
> I agree with you and Piotr that it would be useful to provide a more
> generic approach to address more use-case in one proposal. On the other
> hand, I also think it is important to make sure that the alternative (more
> generic) approach can indeed address the extra use-cases reliably as
> expected. Then we can compare the pros/cons of these approaches and make
> the best choice for Flink users.
>
> If I understand your question correctly, you are asking whether it would be
> better to replace upperBoundCheckpointingIntervalForLastSource() with an
> API on the source/operator interface.
>
> The short answer is probably no. This is because the expected users of the
> API *HybridSourceBuilder#upperBoundCheckpointingIntervalForLastSource*()
> are end-users who use Flink API and connector API to develop Flink job. We
> probably don't want end-users to directly use the source/operator
> interface, which is generally more complicated and intended to be used by
> developers of source operators.
>
> FLIP-309 currently proposes to add the API
> *SplitEnumeratorContext#upperBoundCheckpointingInterval* for developers of
> source operators (e.g. HybridSource, MySQL CDC source) to upper-bound
> checkpointing interval. Are you suggesting that we should replace this API
> with a config on the source or operator constructor?
>
> This approach probably works for HybridSource. But I am not sure it works
> for MySQL CDC Source (which is also mentioned in the latest FLIP-309
> motivation section), which is implemented as one source operator rather
> than multiple source operators (which HybridSource does). And we need to
> enable the new checkpointing interval in the middle of this source
> operator's execution.
>
> If I misunderstood your suggestion, can you provide more details regarding
> the proposed API and explain its benefits?
>
> Best,
> Dong
>
>
>
> On Fri, Jun 2, 2023 at 2:12 AM Ahmed Hamdy  wrote:
>
> > Hi Dong,
> > Thanks for the great proposal.
> > The thread is very intuitive along with suggestions from Jing and Piotr.
> > As much as I like the simplicity of the proposed approach I think a much
> > wider benefit is achieved by taking a more generic approach similar to
> > Piotr's suggestion of having a `CheckpointTrigger`. I think this even
> > solidifies the argument you are discussing
> > >  On the other hand, the approach currently proposed in the FLIP is much
> > simpler as it does not depend on backpressure. Users specify the extra
> > interval requirement on the specific sources (e.g. HybridSource, MySQL
> CDC
> > Source) and can easily know the checkpointing interval will be used on
> the
> > continuous phase of the corresponding source.
> >
> > where the base HybridSource can use a `CheckpointTrigger` that doesn't
> > depend on backpressure.
> >
> >
> >
> >
> > I have a couple of questions for clarification.
> >
> > @Dong
> > Do you think in the solution in FLIP 309, instead of using
> > ```
> > /**
> >  * Upper-bound the checkpointing interval when the last source
> > added right before this
> >  * method invocation is reading data.
> >  */
> > public  > Source > ?, ?>>
> > HybridSourceBuilder
> > upperBoundCheckpointingIntervalForLastSource(
> > Duration duration) {
> > ...
> > }
> > ```
> >
> > We can have an upperBoundCheckpointingInterval configured in the Source
> > Interface, or even better in the Operator one.
> > then we can easily implement the one for HybridSource by relying on
> > delegation to the `currentReader`.
> >
> >
> > @Piotr
> >
> > Regarding the more general approach of adjusting based on generic
> > triggers/backpressure metrics. I saw you mentioned the resemblance with
> > FLIP-271,
> > Do you think it is worth going with the REST API proposal for dynamically
> > configuring the interval hence the trigger logic could be implemented on
> > Flink or external systems like Flink Kubernetes Operator?
> > Wdyt? I think the REST API proposal here sounds more and more
> interesting.
> >
> >
> > Best Regards,
> > Ahmed Hamdy
> >
> >
> > On Wed, 31 May 2023 at 07:59, Dong Lin  wrote:
> >
> > > Hi Piotr,
> > >
> > > Thanks for the reply. Please see my comments inline.
> > >
> > > On Wed, May 31, 2023 at 12:58 AM Piotr Nowojski 
> > > wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > First of all we don't need to send any extra signal from source (or
> non
> > > > source) operators. All of the operators are already reporting
> > > backpressured
> > > > metrics [1]
> > > > and all of the metrics are already sent to JobManager. We would only
> > need
> > > >
> > >
> > > Hmm... I am not sure metrics such as isBackPressured are already sent
> to
> > > JM. According to the doc
> > > <
> >
> 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-01 Thread Dong Lin
Hi Ahmed,

Thanks for the comments.

I agree with you and Piotr that it would be useful to provide a more
generic approach to address more use-case in one proposal. On the other
hand, I also think it is important to make sure that the alternative (more
generic) approach can indeed address the extra use-cases reliably as
expected. Then we can compare the pros/cons of these approaches and make
the best choice for Flink users.

If I understand your question correctly, you are asking whether it would be
better to replace upperBoundCheckpointingIntervalForLastSource() with an
API on the source/operator interface.

The short answer is probably no. This is because the expected users of the
API *HybridSourceBuilder#upperBoundCheckpointingIntervalForLastSource*()
are end-users who use Flink API and connector API to develop Flink job. We
probably don't want end-users to directly use the source/operator
interface, which is generally more complicated and intended to be used by
developers of source operators.

FLIP-309 currently proposes to add the API
*SplitEnumeratorContext#upperBoundCheckpointingInterval* for developers of
source operators (e.g. HybridSource, MySQL CDC source) to upper-bound
checkpointing interval. Are you suggesting that we should replace this API
with a config on the source or operator constructor?

This approach probably works for HybridSource. But I am not sure it works
for MySQL CDC Source (which is also mentioned in the latest FLIP-309
motivation section), which is implemented as one source operator rather
than multiple source operators (which HybridSource does). And we need to
enable the new checkpointing interval in the middle of this source
operator's execution.

If I misunderstood your suggestion, can you provide more details regarding
the proposed API and explain its benefits?

Best,
Dong



On Fri, Jun 2, 2023 at 2:12 AM Ahmed Hamdy  wrote:

> Hi Dong,
> Thanks for the great proposal.
> The thread is very intuitive along with suggestions from Jing and Piotr.
> As much as I like the simplicity of the proposed approach I think a much
> wider benefit is achieved by taking a more generic approach similar to
> Piotr's suggestion of having a `CheckpointTrigger`. I think this even
> solidifies the argument you are discussing
> >  On the other hand, the approach currently proposed in the FLIP is much
> simpler as it does not depend on backpressure. Users specify the extra
> interval requirement on the specific sources (e.g. HybridSource, MySQL CDC
> Source) and can easily know the checkpointing interval will be used on the
> continuous phase of the corresponding source.
>
> where the base HybridSource can use a `CheckpointTrigger` that doesn't
> depend on backpressure.
>
>
>
>
> I have a couple of questions for clarification.
>
> @Dong
> Do you think in the solution in FLIP 309, instead of using
> ```
> /**
>  * Upper-bound the checkpointing interval when the last source
> added right before this
>  * method invocation is reading data.
>  */
> public  Source ?, ?>>
> HybridSourceBuilder
> upperBoundCheckpointingIntervalForLastSource(
> Duration duration) {
> ...
> }
> ```
>
> We can have an upperBoundCheckpointingInterval configured in the Source
> Interface, or even better in the Operator one.
> then we can easily implement the one for HybridSource by relying on
> delegation to the `currentReader`.
>
>
> @Piotr
>
> Regarding the more general approach of adjusting based on generic
> triggers/backpressure metrics. I saw you mentioned the resemblance with
> FLIP-271,
> Do you think it is worth going with the REST API proposal for dynamically
> configuring the interval hence the trigger logic could be implemented on
> Flink or external systems like Flink Kubernetes Operator?
> Wdyt? I think the REST API proposal here sounds more and more interesting.
>
>
> Best Regards,
> Ahmed Hamdy
>
>
> On Wed, 31 May 2023 at 07:59, Dong Lin  wrote:
>
> > Hi Piotr,
> >
> > Thanks for the reply. Please see my comments inline.
> >
> > On Wed, May 31, 2023 at 12:58 AM Piotr Nowojski 
> > wrote:
> >
> > > Hi Dong,
> > >
> > > First of all we don't need to send any extra signal from source (or non
> > > source) operators. All of the operators are already reporting
> > backpressured
> > > metrics [1]
> > > and all of the metrics are already sent to JobManager. We would only
> need
> > >
> >
> > Hmm... I am not sure metrics such as isBackPressured are already sent to
> > JM. According to the doc
> > <
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#io
> > >,
> > this metric is only available on TaskManager. And I could't find the code
> > that sends these metrics to JM. Can you help provide link to the code and
> > doc that shows this metric is reported to JM.
> >
> > Suppose this metric is indeed reported to JM, we also need to confirm
> that
> > the frequency meets our need. For example, typically 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-01 Thread Ahmed Hamdy
Hi Dong,
Thanks for the great proposal.
The thread is very intuitive along with suggestions from Jing and Piotr.
As much as I like the simplicity of the proposed approach I think a much
wider benefit is achieved by taking a more generic approach similar to
Piotr's suggestion of having a `CheckpointTrigger`. I think this even
solidifies the argument you are discussing
>  On the other hand, the approach currently proposed in the FLIP is much
simpler as it does not depend on backpressure. Users specify the extra
interval requirement on the specific sources (e.g. HybridSource, MySQL CDC
Source) and can easily know the checkpointing interval will be used on the
continuous phase of the corresponding source.

where the base HybridSource can use a `CheckpointTrigger` that doesn't
depend on backpressure.




I have a couple of questions for clarification.

@Dong
Do you think in the solution in FLIP 309, instead of using
```
/**
 * Upper-bound the checkpointing interval when the last source
added right before this
 * method invocation is reading data.
 */
public >
HybridSourceBuilder
upperBoundCheckpointingIntervalForLastSource(
Duration duration) {
...
}
```

We can have an upperBoundCheckpointingInterval configured in the Source
Interface, or even better in the Operator one.
then we can easily implement the one for HybridSource by relying on
delegation to the `currentReader`.


@Piotr

Regarding the more general approach of adjusting based on generic
triggers/backpressure metrics. I saw you mentioned the resemblance with
FLIP-271,
Do you think it is worth going with the REST API proposal for dynamically
configuring the interval hence the trigger logic could be implemented on
Flink or external systems like Flink Kubernetes Operator?
Wdyt? I think the REST API proposal here sounds more and more interesting.


Best Regards,
Ahmed Hamdy


On Wed, 31 May 2023 at 07:59, Dong Lin  wrote:

> Hi Piotr,
>
> Thanks for the reply. Please see my comments inline.
>
> On Wed, May 31, 2023 at 12:58 AM Piotr Nowojski 
> wrote:
>
> > Hi Dong,
> >
> > First of all we don't need to send any extra signal from source (or non
> > source) operators. All of the operators are already reporting
> backpressured
> > metrics [1]
> > and all of the metrics are already sent to JobManager. We would only need
> >
>
> Hmm... I am not sure metrics such as isBackPressured are already sent to
> JM. According to the doc
>  >,
> this metric is only available on TaskManager. And I could't find the code
> that sends these metrics to JM. Can you help provide link to the code and
> doc that shows this metric is reported to JM.
>
> Suppose this metric is indeed reported to JM, we also need to confirm that
> the frequency meets our need. For example, typically metrics are updated on
> the order of seconds. The default metric reporter interval (as specified in
> MetricOptions) is 10 seconds, which is probably not sufficient for the
> suggested approach to work reliably. This is because the longer the
> interval, the more likely that the algorithm will not trigger checkpoint
> using the short interval even if all subtasks are not-backpressured.
>
> For example, let's say every source operator subtask reports this metric to
> JM once every 10 seconds. There are 100 source subtasks. And each subtask
> is backpressured roughly 10% of the total time due to traffic spikes (and
> limited buffer). Then at any given time, there are 1 - 0.9^100 = 99.997%
> chance that there is at least one subtask that is backpressured. Then we
> have to wait for at least 10 seconds to check again. The expected
> checkpointing interval can be very close to 30 minutes in the use-case
> mentioned earlier.
>
> to pass some accessor to the metrics to the `CheckpointTrigger`.
>
>
> > > execution.checkpointing.interval.no-backpressure
> >
> > Maybe that's the way to go, but as I mentioned before, I could see this
> > `CheckpointTrigger` to be a pluggable component, that could have been
> > configured
> > the same way as `MetricReporters` are right now [2]. We could just
> provide
> > out of the box two plugins, one implementing current checkpoint
> triggering
> > strategy,
> > and the other using backpressure.
> >
>
> Yes, it is possible to add a CheckpointTrigger as a pluggable component. I
> am open to this idea as long as it provides benefits over the job-level
> config (e.g. covers more use-case, or simpler configuration for
> common-case).
>
> I think we can decide how to let user specify this interval after we are
> able to address the other issues related to the feasibility and reliability
> of the suggested approach.
>
>
> > > I think the root cause of this issue is that the decision of the
> > > checkpointing interval really depends on the expected impact of a
> > > checkpoint on the throughput.
> >
> > Yes, I agree. 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-31 Thread Dong Lin
Hi Piotr,

Thanks for the reply. Please see my comments inline.

On Wed, May 31, 2023 at 12:58 AM Piotr Nowojski 
wrote:

> Hi Dong,
>
> First of all we don't need to send any extra signal from source (or non
> source) operators. All of the operators are already reporting backpressured
> metrics [1]
> and all of the metrics are already sent to JobManager. We would only need
>

Hmm... I am not sure metrics such as isBackPressured are already sent to
JM. According to the doc
,
this metric is only available on TaskManager. And I could't find the code
that sends these metrics to JM. Can you help provide link to the code and
doc that shows this metric is reported to JM.

Suppose this metric is indeed reported to JM, we also need to confirm that
the frequency meets our need. For example, typically metrics are updated on
the order of seconds. The default metric reporter interval (as specified in
MetricOptions) is 10 seconds, which is probably not sufficient for the
suggested approach to work reliably. This is because the longer the
interval, the more likely that the algorithm will not trigger checkpoint
using the short interval even if all subtasks are not-backpressured.

For example, let's say every source operator subtask reports this metric to
JM once every 10 seconds. There are 100 source subtasks. And each subtask
is backpressured roughly 10% of the total time due to traffic spikes (and
limited buffer). Then at any given time, there are 1 - 0.9^100 = 99.997%
chance that there is at least one subtask that is backpressured. Then we
have to wait for at least 10 seconds to check again. The expected
checkpointing interval can be very close to 30 minutes in the use-case
mentioned earlier.

to pass some accessor to the metrics to the `CheckpointTrigger`.


> > execution.checkpointing.interval.no-backpressure
>
> Maybe that's the way to go, but as I mentioned before, I could see this
> `CheckpointTrigger` to be a pluggable component, that could have been
> configured
> the same way as `MetricReporters` are right now [2]. We could just provide
> out of the box two plugins, one implementing current checkpoint triggering
> strategy,
> and the other using backpressure.
>

Yes, it is possible to add a CheckpointTrigger as a pluggable component. I
am open to this idea as long as it provides benefits over the job-level
config (e.g. covers more use-case, or simpler configuration for
common-case).

I think we can decide how to let user specify this interval after we are
able to address the other issues related to the feasibility and reliability
of the suggested approach.


> > I think the root cause of this issue is that the decision of the
> > checkpointing interval really depends on the expected impact of a
> > checkpoint on the throughput.
>
> Yes, I agree. Ideally we probably should adjust the checkpointing interval
> based on measured latency, for example using latency markers [3], but that
> would
> require some investigation if latency markers are indeed that costly as
> documented and if so optimizing them to solve the performance degradation
> of enabling
> e2e latency tracking.


> However, given that the new back pressure monitoring strategy would be
> optional AND users could implement their own `CheckpointTrigger` if really
> needed
> AND I have a feeling that there might be an even better solution (more
> about that later).
>

Overall I guess you are suggesting that 1) we can optimize the overhead of
latency tracking so that we can always turn it on and 2) we can use the
measured latency to dynamically determine checkpointing interval.

I can understand this intuition. Still, the devil is in the details. After
thinking more about this, I am not sure I can find a good way to make it
work. I am happy to discuss proc/cons if you provide more concrete
solutions.

Note that goals of the alternative approach include 1) support sources
other than HybridSource and 2) reduce checkpointing interval when the job
is backpressured. These goals are not necessary to achieve the use-case
targed by FLIP-309. While it will be nice to support additional use-cases
with one proposal, it is probably also reasonable to make incremental
progress and support the low-hanging-fruit use-case first. The choice
really depends on the complexity and the importance of supporting the extra
use-cases.

I am hoping that we can still be open to using the approach proposed in
FLIP-309 and we can not make the alternative approach work. What do you
think?


> > if the checkpointing overhead is
> > close to none, then it is beneficial to the e2e latency to still
> checkpoint
> > a high frequency even if there exists (intermittent) backpressure.
>
> In that case users could just configure a slow checkpointing interval to a
> lower value, or just use static checkpoint interval strategy.
>

I guess the point is that the suggested approach, which dynamically
determines the 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-30 Thread Piotr Nowojski
Hi Dong,

First of all we don't need to send any extra signal from source (or non
source) operators. All of the operators are already reporting backpressured
metrics [1]
and all of the metrics are already sent to JobManager. We would only need
to pass some accessor to the metrics to the `CheckpointTrigger`.

> execution.checkpointing.interval.no-backpressure

Maybe that's the way to go, but as I mentioned before, I could see this
`CheckpointTrigger` to be a pluggable component, that could have been
configured
the same way as `MetricReporters` are right now [2]. We could just provide
out of the box two plugins, one implementing current checkpoint triggering
strategy,
and the other using backpressure.

> I think the root cause of this issue is that the decision of the
> checkpointing interval really depends on the expected impact of a
> checkpoint on the throughput.

Yes, I agree. Ideally we probably should adjust the checkpointing interval
based on measured latency, for example using latency markers [3], but that
would
require some investigation if latency markers are indeed that costly as
documented and if so optimizing them to solve the performance degradation
of enabling
e2e latency tracking.

However, given that the new back pressure monitoring strategy would be
optional AND users could implement their own `CheckpointTrigger` if really
needed
AND I have a feeling that there might be an even better solution (more
about that later).

> if the checkpointing overhead is
> close to none, then it is beneficial to the e2e latency to still
checkpoint
> a high frequency even if there exists (intermittent) backpressure.

In that case users could just configure a slow checkpointing interval to a
lower value, or just use static checkpoint interval strategy.

> With the suggested approach, the e2e latency introduced by Flink is
roughly
> 72 seconds. This is because it takes 1 minute for 11MBps phase to end, and
> another 12 seconds for the accumulated backlog to be cleared. And Flink
can
> not do checkpoint before the backlog is cleared.

Indeed that's a valid concern. After thinking more about this issue, maybe
the proper solution would be to calculate "how much overloaded is the most
overloaded subtask".
In this case, that would be 10% (we are trying to push 110% of the
available capacity in the current job/cluster). Then we could use that
number as some kind of weighted average.
We could figure out a function mapping the overload percentage, into a
floating point number from range [0, 1]

f(overload_factor) = weight // weight is from [0, 1]

and then the desired checkpoint interval would be something like

(1 - weight) * fastCheckpointInterval + weight * slowCheckpointInterval

In your problematic example, we would like the weight to be pretty small
(<10%?), so the calculated checkpoint interval would be pretty close to the
fastCheckpointInterval.

The overload factor we could calculate the same way as FLIP-271 is
calculating how much should we rescale given operator [4].

I can think about this more and elaborate/refine this idea tomorrow.

Best,
Piotrek


[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#io
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/metric_reporters/
[3]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#end-to-end-latency-tracking
[4]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling

wt., 30 maj 2023 o 13:58 Dong Lin  napisał(a):

> Hi Piotr,
>
> Thank you for providing those details.
>
> I understand you suggested using the existing "isBackPressured" signal to
> determine whether we should use the less frequent checkpointing interval. I
> followed your thoughts and tried to make it work. Below are the issues that
> I am not able to address. Can you see if there is a way to address these
> issues?
>
> Let's will use the following use-case to make the discussion more concrete:
> a) Users want to checkpoint at least once every 30 minutes to upper-bound
> the amount of duplicate work after job failover.
> b) Users want to checkpoint at least once every 30 seconds to
> upper-bound *extra
> e2e lag introduced by the Flink job* during the continuous processing
> phase.
>
> The suggested approach is designed to do this:
> - If any of the source subtasks is backpressured, the job will checkpoint
> at 30-minutes interval.
> - If none of the source subtasks is backpressured, the job will checkpoint
> at 30-seconds interval.
>
> And we would need to add the following public APIs to implement this
> approach:
> - Add a job level config, maybe
> execution.checkpointing.interval.no-backpressure. This is the checkpointing
> interval when none of the source subtasks is backpressured.
> - Add a public API for source operator subtasks to report their
> backpressure status to the checkpointing coordinator. The subtask should
> invoke this API whenever its backpressure status changed.
>
> Now, in order to 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-30 Thread Jing Ge
Hi Piotr,


> But why do we need to have two separate mechanisms, if the dynamic
> adjustment based on the backpressure/backlog would
> achieve basically the same goal as your proposal and would solve both of
> the problems? Having two independent solutions
> in the same codebase, in the docs, that are achieving basically the same
> thing is far from ideal. It would increase both the
> complexity of the system and confuse potential users.
>

Great question! Well, my answer is: there is no silver bullet. Therefore I
don't see any problem if there are more than one mechanism to solve the
same issue but from a different perspective. Even for solving the
checkpoint duration issue, we have Unaligned checkpoints and Buffer
debloating to address back pressure (again, same goal, two mechanisms), and
incremental checkpionting, and log-based incremental checkpoints.

If we take a look at the dynamic adjustment from a different perspective,
it could be considered as a tradeoff - we have to rely on the back pressure
metrics to do a pure technical performance improvement without knowing any
usage context, because we don't know the real requirement in advance. For
some use cases, it is the only choice, i.e. a tradeoff, but for other
cases, there are better ways based on the input information in advance.

As I mentioned previously, what if the user doesn't want to trigger too
many checkpoints(know info in advance), even if there is no back pressure
at all? What if the user doesn't want to change the checkpoint interval if
there are back pressures?  Should users have the control or should the
system just wildly adjust everything and ignore users' needs? BTW, if I am
not mistaken, your proposal is the third mechanism that will be mixed up
with the above mentioned Unaligned checkpoints and Buffer debloating. All
of them will affect checkpointing. Users will be confused.

Like I mentioned previously, data driven dynamic adjustments are a good
thing, we should have them. But if the statement is that dynamic adjustment
should be the one and only one mechanism, I think, we should reconsider it
again and carefully :-)

Best regards,
Jing

On Mon, May 29, 2023 at 5:23 PM Piotr Nowojski  wrote:

> Hi
>
> @Jing
>
> > Your proposal to dynamically adjust the checkpoint intervals is elegant!
> It
> > makes sense to build it as a generic feature in Flink. Looking forward to
> > it. However, for some user cases, e.g. when users were aware of the
> bounded
> > sources (in the HybridSource) and care more about the throughput, the
> > dynamic adjustment might not be required. Just let those bounded sources
> > always have larger checkpoint intervals even when there is no back
> > pressure. Because no one cares about latency in this case, let's turn off
> > the dynamic adjustment, reduce the checkpoint frequency, have better
> > throughput, and save unnecessary source consumption. Did I miss anything
> > here?
>
>

> But why do we need to have two separate mechanisms, if the dynamic
> adjustment based on the backpressure/backlog would
> achieve basically the same goal as your proposal and would solve both of
> the problems? Having two independent solutions
> in the same codebase, in the docs, that are achieving basically the same
> thing is far from ideal. It would increase both the
> complexity of the system and confuse potential users.
>


>
> Moreover, as I have already mentioned before, I don't like the current
> proposal as it's focusing ONLY on the HybridSource,
> which can lead to even worse problem in the future, where many different
> sources would have each a completely custom
> solution to solve the same/similar problems, complicating the system and
> confusing the users even more.
>
> @Dong,
>
> > For now I am not able to come up with a good way to support this. I am
> happy to discuss the
> > pros/cons if you can provide more detail (e.g. API design) regarding how
> to support this approach
>
> I have already described such proposal:
>
> > Piotr:
> > I don't know, maybe instead of adding this logic to operator
> coordinators, `CheckpointCoordinator` should have a pluggable
> `CheckpointTrigger`,
> > that the user could configure like a `MetricReporter`. The default one
> would be just periodically triggering checkpoints. Maybe
> > `BacklogDynamicCheckpointTrigger` could look at metrics[1], check if
> `pendingRecords` for some source has exceeded the configured
> > threshold and based on that adjust the checkpointing interval
> accordingly? This would at least address some of my concerns.
>
> plus
>
> > Piotr:
> >  Either way, I would like to refine my earlier idea, and instead of using
> metrics like `pendingRecords`, I think we could switch between fast and
> > slow checkpointing intervals based on the information if the job is
> backpressured or not. My thinking is as follows:
> >
> > As a user, I would like to have my regular fast checkpointing interval
> for low latency, but the moment my system is not keeping up, if the
> 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-30 Thread Dong Lin
Hi Piotr,

Thank you for providing those details.

I understand you suggested using the existing "isBackPressured" signal to
determine whether we should use the less frequent checkpointing interval. I
followed your thoughts and tried to make it work. Below are the issues that
I am not able to address. Can you see if there is a way to address these
issues?

Let's will use the following use-case to make the discussion more concrete:
a) Users want to checkpoint at least once every 30 minutes to upper-bound
the amount of duplicate work after job failover.
b) Users want to checkpoint at least once every 30 seconds to
upper-bound *extra
e2e lag introduced by the Flink job* during the continuous processing phase.

The suggested approach is designed to do this:
- If any of the source subtasks is backpressured, the job will checkpoint
at 30-minutes interval.
- If none of the source subtasks is backpressured, the job will checkpoint
at 30-seconds interval.

And we would need to add the following public APIs to implement this
approach:
- Add a job level config, maybe
execution.checkpointing.interval.no-backpressure. This is the checkpointing
interval when none of the source subtasks is backpressured.
- Add a public API for source operator subtasks to report their
backpressure status to the checkpointing coordinator. The subtask should
invoke this API whenever its backpressure status changed.

Now, in order to make the suggested approach work for all users (i.e. no
regression), we need to make sure that whenever we use the 30-minutes
checkpointing interval, the e2e latency will be less than or equal to the
case where we use the 30-seconds checkpointing interval.

I thought about this in detail, and found the following fabricated
scenarios where this approach might cause regression:

During the continuous processing phase, the input throughput is 5MBps for 1
minute, and 11MBps for 1 minutes, in lock-steps. The maximum throughput
achievable by this job is 10Mbps. For simplicity, suppose the buffer size
can hold roughly 1 second worth-of-data, then the job is
backpressured roughly 1 minutes out of every 2 minutes.

With the suggested approach, the e2e latency introduced by Flink is roughly
72 seconds. This is because it takes 1 minute for 11MBps phase to end, and
another 12 seconds for the accumulated backlog to be cleared. And Flink can
not do checkpoint before the backlog is cleared.

On the other hand, if we continue to checkpoint at 30-seconds interval, the
e2e latency introduced by Flink is at most 42 seconds, plus the extra delay
introduced by the checkpoint overhead. The e2e latency will be better than
the suggested approach, if the impact of the checkpoint is less than 30
seconds.

I think the root cause of this issue is that the decision of the
checkpointing interval really depends on the expected impact of a
checkpoint on the throughput. For example, if the checkpointing overhead is
close to none, then it is beneficial to the e2e latency to still checkpoint
a high frequency even if there exists (intermittent) backpressure.

Here is another fabricated use-case where the suggested approach might
cause regression. Let's say user's job is
*hybridSource.keyBy(...).transform(operatorA).sinkTo(PaimonSink)*. The
parallelism is 2. As we can see, there is all-to-all edge between source
and operatorA. And due to limited resources (e.g. buffer), at any given
time, each operatorA subtask can only process data from one of its upstream
subtask at a time, meaning that the other upstream subtask will be
backpressured. So there might always be at least one source subtask that is
backpressured even though the job's throughput can catch up with the input
throughput. However, the suggested approach might end up always using the
less frequent checkpointing interval in this case.

Suppose we can find a way to address the above issues, another issue with
the suggested approach is the extra communication overhead between the
source operator subtasks and the checkpointing coordinator. The source
subtask needs to send a message to checkpointing coordinator whenever its
backpressure status changes. The more frequently we check (e.g. once every
10 ms), the larger the overhead. And if we check not so frequently (e.g.
once every second), we might be more vulnerable to random/occasional
backpressure. So there seems to be tradeoff between the reliability and the
cost of this approach.

Thanks again for the suggestion. I am looking forward to your comments.

Best,
Dong


On Tue, May 30, 2023 at 4:37 PM Piotr Nowojski  wrote:

> Hi again,
>
> Thanks Dong, yes I think your concerns are valid, and that's why I have
> previously refined my idea to use one of the backpressure measuring metrics
> that we already have.
> Either simply `isBackPressured == true` check [1], or
> `backPressuredTimeMsPerSecond >= N` (where `N ~= 990`) [2]. That would
> address your three first concerns:
>   - lack of event time
>   - event time unreliability
>   - lack of 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-30 Thread Piotr Nowojski
Hi again,

Thanks Dong, yes I think your concerns are valid, and that's why I have
previously refined my idea to use one of the backpressure measuring metrics
that we already have.
Either simply `isBackPressured == true` check [1], or
`backPressuredTimeMsPerSecond >= N` (where `N ~= 990`) [2]. That would
address your three first concerns:
  - lack of event time
  - event time unreliability
  - lack of universal threshold value for `pendingRecords`

In a bit more detail, we probably should check (using [1] or [2]) either:
  a) if any of the source subtasks is backpressured
  b) if any of the subtasks is backpressured

In most cases a == b. The only time when that's not true, if some windowed
operator in the middle of the job graph started triggering so many results
that it became backpressured,
but the backpressure didn't last long enough to propagate to sources. For
example that especially might occur if sources are idle. So probably b) is
a better and more generic option.

Regarding your last concern, with spiky traffic, I think the following
algorithm of triggering checkpoints would work pretty well:

public BackpressureDetectingCheckpointTrigger {

private long lastCheckpointTs = System.currentTimeMillis();
private long slowCheckpointInterval = ...;
private long fastCheckpointInteveral = ...;

//code executed periodically, for example once a second, once every 10ms,
or at the 1/10th of the fast checkpoint interval
void maybeTriggerCheckpoint(...) {

  long nextCheckpointTs = lastCheckpointTs;
  if (isAnySubtaskBackpressured()) {
nextCheckpointTs += slowCheckpointInterval;
  }
  else {
  nextCheckpointTs += fastCheckpointInterval;
  }

  if (nextCheckpointTs >= System.currentTimeMillis()) {
triggerCheckpoint();
lastCheckpointTs = System.currentTimeMillis();
  }
}
}

This way, if there is a spike of backpressure, it doesn't matter that much.
If the backpressure goes away until the next iteration, the next check will
trigger a checkpoint according to the
fast interval. The slow checkpoint interval will be used only if the
backpressure persists for the whole duration of the slowCheckpointInterval.

We could also go a little bit more fancy, and instead of using only fast or
slow intervals, we could use a continuous spectrum to gradually adjust the
interval, by replacing the first if/else
check with a weighted average:

  int maxBackPressureTime = getSubtaskMaxBackPressuredTimeMsPerSecond();
  long nextCheckpointTs = lastCheckpointTs + slowCheckpointInterval *
maxBackPressureTime + fastCheckpointInterval * (1000 - maxBackPressureTime);

This would further eliminate some potential jitter and make the actual
checkpoint interval a bit more predictable.

Best,
Piotrek


wt., 30 maj 2023 o 04:40 Dong Lin  napisał(a):

> Let me correct the typo in the last paragraph as below:
>
> To make the problem even harder, the incoming traffic can be spiky. And the
> overhead of triggering checkpointing can be relatively low, in which case
> it might be more performant (w.r.t. e2e lag) for the Flink job to
> checkpoint at the more frequent interval in the continuous phase in face of
> a spike in the number of pending records buffered in the source operator.
>
>
> On Tue, May 30, 2023 at 9:17 AM Dong Lin  wrote:
>
> > Hi Piotrek,
> >
> > Thanks for providing more details of the alternative approach!
> >
> > If I understand your proposal correctly, here are the requirements for it
> > to work without incurring any regression:
> >
> > 1) The source needs a way to determine whether there exists backpressure.
> > 2) If there is backpressure, then it means e2e latency is already high
> > and there should be no harm to use the less frequent checkpointing
> interval.
> > 3) The configuration of the "less frequent checkpointing interval" needs
> > to be a job-level config so that it works for sources other than
> > HybridSource.
> >
> > I would say that if we can find a way for the source to determine the
> > "existence of backpressure" and meet the requirement 2), it would indeed
> be
> > a much more elegant approach that solves more use-cases.
> >
> > The devil is in the details. I am not sure how to determine the
> "existence
> > of backpressure". Let me explain my thoughts and maybe you can help
> > provide the answers.
> >
> > To make the discussion more concrete, let's say the input records do not
> > have event timestamps. Users want to checkpoint at least once every 30
> > minutes to upper-bound the amount of duplicate work after job failover.
> And
> > users want to checkpoint at least once every 30 seconds to upper-bound
> *extra
> > e2e lag introduced by the Flink job* during the continuous processing
> > phase.
> >
> > Since the input records do not have event timestamps, we can not rely on
> > metrics such as currentFetchEventTimeLag [1] to determine the absolute
> e2e
> > lag, because currentFetchEventTimeLag depends on the existence of event
> > timestamps.
> >
> > Also note that, even if the input 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-29 Thread Dong Lin
Let me correct the typo in the last paragraph as below:

To make the problem even harder, the incoming traffic can be spiky. And the
overhead of triggering checkpointing can be relatively low, in which case
it might be more performant (w.r.t. e2e lag) for the Flink job to
checkpoint at the more frequent interval in the continuous phase in face of
a spike in the number of pending records buffered in the source operator.


On Tue, May 30, 2023 at 9:17 AM Dong Lin  wrote:

> Hi Piotrek,
>
> Thanks for providing more details of the alternative approach!
>
> If I understand your proposal correctly, here are the requirements for it
> to work without incurring any regression:
>
> 1) The source needs a way to determine whether there exists backpressure.
> 2) If there is backpressure, then it means e2e latency is already high
> and there should be no harm to use the less frequent checkpointing interval.
> 3) The configuration of the "less frequent checkpointing interval" needs
> to be a job-level config so that it works for sources other than
> HybridSource.
>
> I would say that if we can find a way for the source to determine the
> "existence of backpressure" and meet the requirement 2), it would indeed be
> a much more elegant approach that solves more use-cases.
>
> The devil is in the details. I am not sure how to determine the "existence
> of backpressure". Let me explain my thoughts and maybe you can help
> provide the answers.
>
> To make the discussion more concrete, let's say the input records do not
> have event timestamps. Users want to checkpoint at least once every 30
> minutes to upper-bound the amount of duplicate work after job failover. And
> users want to checkpoint at least once every 30 seconds to upper-bound *extra
> e2e lag introduced by the Flink job* during the continuous processing
> phase.
>
> Since the input records do not have event timestamps, we can not rely on
> metrics such as currentFetchEventTimeLag [1] to determine the absolute e2e
> lag, because currentFetchEventTimeLag depends on the existence of event
> timestamps.
>
> Also note that, even if the input records have event timestamps and we can
> measure currentFetchEventTimeLag, we still need a threshold to determine
> whether the value of currentFetchEventTimeLag is too high. One idea might
> be to use the user-specified "less frequent checkpointing interval" as
> this threshold, which in this case is 30 seconds. But this approach can
> also cause regression. For example, let's say the records go through
> several Kafka/MirrorMaker pipelines after it is generated and before it is
> received by Flink, causing its currentFetchEventTimeLag to be always higher
> than 30 seconds. Then Flink will end up always using the "less frequent
> checkpointing interval" in the continuous phase, which in this case is 30
> minutes.
>
> Other options to determine the "existence of backpressure" includes using
> the absolute number of records in the source storage system that are
> waiting to be fetched (e.g. pendingRecords [1]), or using the absolute
> number of buffered records in the source output queue. However, I find it
> hard to reliably determine "e2e latency is already high" based on the
> absolute number of records. What threshold should we choose to determine
> that the number of pending records is too many (and it is safe to increase
> the checkpointing interval)?
>
> To make the problem even harder, the incoming traffic can be spiky. And
> the overhead of triggering checkpointing can be relative low, in which case
> it might be more performance (w.r.t. e2e lag) for the Flink job to
> checkpoint at the higher interval in the continuous phase in face of a
> spike in the number of pending records buffered in the source operator.
>
> The problems described above are the main reasons that I can not find a
> way to make the alternative approach work. Any thoughts?
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
>
>
> On Mon, May 29, 2023 at 11:23 PM Piotr Nowojski 
> wrote:
>
>> Hi
>>
>> @Jing
>>
>> > Your proposal to dynamically adjust the checkpoint intervals is elegant!
>> It
>> > makes sense to build it as a generic feature in Flink. Looking forward
>> to
>> > it. However, for some user cases, e.g. when users were aware of the
>> bounded
>> > sources (in the HybridSource) and care more about the throughput, the
>> > dynamic adjustment might not be required. Just let those bounded sources
>> > always have larger checkpoint intervals even when there is no back
>> > pressure. Because no one cares about latency in this case, let's turn
>> off
>> > the dynamic adjustment, reduce the checkpoint frequency, have better
>> > throughput, and save unnecessary source consumption. Did I miss anything
>> > here?
>>
>> But why do we need to have two separate mechanisms, if the dynamic
>> adjustment based on the backpressure/backlog would
>> achieve basically the same goal as your proposal and would 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-29 Thread Dong Lin
Hi Piotrek,

Thanks for providing more details of the alternative approach!

If I understand your proposal correctly, here are the requirements for it
to work without incurring any regression:

1) The source needs a way to determine whether there exists backpressure.
2) If there is backpressure, then it means e2e latency is already high and
there should be no harm to use the less frequent checkpointing interval.
3) The configuration of the "less frequent checkpointing interval" needs to
be a job-level config so that it works for sources other than HybridSource.

I would say that if we can find a way for the source to determine the
"existence of backpressure" and meet the requirement 2), it would indeed be
a much more elegant approach that solves more use-cases.

The devil is in the details. I am not sure how to determine the "existence
of backpressure". Let me explain my thoughts and maybe you can help provide
the answers.

To make the discussion more concrete, let's say the input records do not
have event timestamps. Users want to checkpoint at least once every 30
minutes to upper-bound the amount of duplicate work after job failover. And
users want to checkpoint at least once every 30 seconds to upper-bound *extra
e2e lag introduced by the Flink job* during the continuous processing
phase.

Since the input records do not have event timestamps, we can not rely on
metrics such as currentFetchEventTimeLag [1] to determine the absolute e2e
lag, because currentFetchEventTimeLag depends on the existence of event
timestamps.

Also note that, even if the input records have event timestamps and we can
measure currentFetchEventTimeLag, we still need a threshold to determine
whether the value of currentFetchEventTimeLag is too high. One idea might
be to use the user-specified "less frequent checkpointing interval" as this
threshold, which in this case is 30 seconds. But this approach can also
cause regression. For example, let's say the records go through several
Kafka/MirrorMaker pipelines after it is generated and before it is received
by Flink, causing its currentFetchEventTimeLag to be always higher than 30
seconds. Then Flink will end up always using the "less frequent
checkpointing interval" in the continuous phase, which in this case is 30
minutes.

Other options to determine the "existence of backpressure" includes using
the absolute number of records in the source storage system that are
waiting to be fetched (e.g. pendingRecords [1]), or using the absolute
number of buffered records in the source output queue. However, I find it
hard to reliably determine "e2e latency is already high" based on the
absolute number of records. What threshold should we choose to determine
that the number of pending records is too many (and it is safe to increase
the checkpointing interval)?

To make the problem even harder, the incoming traffic can be spiky. And the
overhead of triggering checkpointing can be relative low, in which case it
might be more performance (w.r.t. e2e lag) for the Flink job to checkpoint
at the higher interval in the continuous phase in face of a spike in the
number of pending records buffered in the source operator.

The problems described above are the main reasons that I can not find a way
to make the alternative approach work. Any thoughts?

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics


On Mon, May 29, 2023 at 11:23 PM Piotr Nowojski 
wrote:

> Hi
>
> @Jing
>
> > Your proposal to dynamically adjust the checkpoint intervals is elegant!
> It
> > makes sense to build it as a generic feature in Flink. Looking forward to
> > it. However, for some user cases, e.g. when users were aware of the
> bounded
> > sources (in the HybridSource) and care more about the throughput, the
> > dynamic adjustment might not be required. Just let those bounded sources
> > always have larger checkpoint intervals even when there is no back
> > pressure. Because no one cares about latency in this case, let's turn off
> > the dynamic adjustment, reduce the checkpoint frequency, have better
> > throughput, and save unnecessary source consumption. Did I miss anything
> > here?
>
> But why do we need to have two separate mechanisms, if the dynamic
> adjustment based on the backpressure/backlog would
> achieve basically the same goal as your proposal and would solve both of
> the problems? Having two independent solutions
> in the same codebase, in the docs, that are achieving basically the same
> thing is far from ideal. It would increase both the
> complexity of the system and confuse potential users.
>
> Moreover, as I have already mentioned before, I don't like the current
> proposal as it's focusing ONLY on the HybridSource,
> which can lead to even worse problem in the future, where many different
> sources would have each a completely custom
> solution to solve the same/similar problems, complicating the system and
> confusing the users even more.
>
> @Dong,
>
> > For 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-29 Thread Piotr Nowojski
Hi

@Jing

> Your proposal to dynamically adjust the checkpoint intervals is elegant!
It
> makes sense to build it as a generic feature in Flink. Looking forward to
> it. However, for some user cases, e.g. when users were aware of the
bounded
> sources (in the HybridSource) and care more about the throughput, the
> dynamic adjustment might not be required. Just let those bounded sources
> always have larger checkpoint intervals even when there is no back
> pressure. Because no one cares about latency in this case, let's turn off
> the dynamic adjustment, reduce the checkpoint frequency, have better
> throughput, and save unnecessary source consumption. Did I miss anything
> here?

But why do we need to have two separate mechanisms, if the dynamic
adjustment based on the backpressure/backlog would
achieve basically the same goal as your proposal and would solve both of
the problems? Having two independent solutions
in the same codebase, in the docs, that are achieving basically the same
thing is far from ideal. It would increase both the
complexity of the system and confuse potential users.

Moreover, as I have already mentioned before, I don't like the current
proposal as it's focusing ONLY on the HybridSource,
which can lead to even worse problem in the future, where many different
sources would have each a completely custom
solution to solve the same/similar problems, complicating the system and
confusing the users even more.

@Dong,

> For now I am not able to come up with a good way to support this. I am
happy to discuss the
> pros/cons if you can provide more detail (e.g. API design) regarding how
to support this approach

I have already described such proposal:

> Piotr:
> I don't know, maybe instead of adding this logic to operator
coordinators, `CheckpointCoordinator` should have a pluggable
`CheckpointTrigger`,
> that the user could configure like a `MetricReporter`. The default one
would be just periodically triggering checkpoints. Maybe
> `BacklogDynamicCheckpointTrigger` could look at metrics[1], check if
`pendingRecords` for some source has exceeded the configured
> threshold and based on that adjust the checkpointing interval
accordingly? This would at least address some of my concerns.

plus

> Piotr:
>  Either way, I would like to refine my earlier idea, and instead of using
metrics like `pendingRecords`, I think we could switch between fast and
> slow checkpointing intervals based on the information if the job is
backpressured or not. My thinking is as follows:
>
> As a user, I would like to have my regular fast checkpointing interval
for low latency, but the moment my system is not keeping up, if the
backpressure
> builds up, or simply we have a huge backlog to reprocess, latency doesn't
matter anymore. Only throughput matters. So I would like the checkpointing
to slow down.
>
> I think this should cover pretty well most of the cases, what do you
think? If this backpressured based behaviour is still not enough, I would
still say
> that we should provide plugable checkpoint triggering controllers that
would work based on metrics.

> change the checkpointing interval based on the "backlog signal",

What's wrong with the job being backpressured? If job is backpressured, we
don't care about individual records latency, only about increasing
the throughput to get out of the backpressure situation ASAP.

> In the mentioned use-case, users want to have two different checkpointing
> intervals at different phases of the HybridSource. We should provide an
API
> for users to express the extra checkpointing interval in addition to the
> existing execution.checkpointing.interval. What would be the definition of
> that API with this alternative approach?

I think my proposal with `BacklogDynamicCheckpointTrigger` or
`BackpressureDetectingCheckpointTrigger` would solve your motivating use
case
just as well.

1. In the catch up phase (reading the bounded source):
  a) if we are under backpressure (common case), system would fallback to
the less frequent checkpointing interval
  b) if there is no backpressure (I hope a rare case, there is a backlog,
but the source is too slow), Flink cluster has spare resources to actually
run more
  frequent checkpointing interval. No harm should be done. But arguably
using a less frequent checkpointing interval here should be more desirable.

2. In the continuous processing phase (unbounded source)
  a) if we are under backpressure, as I mentioned above, no one cares about
checkpointing interval and the frequency of committing records to the
  output, as e2e latency is already high due to the backlog in the
sources
  b) if there is no backpressure, that's the only case where the user
actually cares about the frequency of committing records to the output, we
are
  using the more frequent checkpointing interval.

1b) I think is mostly harmless, and I think could be solved with some extra
effort
2a) and 2b) are not solved by your proposal
2a) and 2b) are 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-25 Thread Jing Ge
Hi Dong, Hi Piotr,

Thanks for the clarification.

@Dong

According to the code examples in the FLIP, I thought we are focusing on
the HybridSource scenario. With the current HybridSource implementation, we
don't even need to know the boundedness of sources in the HybridSource,
since all sources except the last one must be bounded[1], i.e. only the
last source is unbounded. This makes it much easier to set different
intervals to sources with different boundedness.

Boundedness in Flink is a top level concept. I think it should be ok to
introduce a top level config for the top level concept. I am not familiar
with MySQL CDC. For those specific cases, you are right, your proposal can
provide the feature with minimal changes, like I mentioned previously, it
is a thoughtful design.  +1

@Piotr

> For example join (windowed/temporal) of two tables backed by a hybrid
> source? I could easily see a scenario where one table with little data
> catches up much more quickly.

I am confused. I thought we were talking about HybridSource which "solves
the problem of sequentially reading input from heterogeneous sources to
produce a single input stream."[2]
I could not find any join within a HybridSource. So, your might mean
something else the join example and it should be out of the scope, if I am
not mistaken.

> About the (un)boundness of the input stream. I'm not sure if that should
> actually matter. Actually the same issue, with two frequent checkpointing
> during a catch up period or when Flink is overloaded, could affect jobs
> that are purely unbounded, like continuously reading from Kafka. Even
more,
> nothing prevents users from actually storing bounded data in a Kafka
topic.
> Either way, I would like to refine my earlier idea, and instead of using
> metrics like `pendingRecords`, I think we could switch between fast and
> slow checkpointing intervals based on the information if the job is
> backpressured or not. My thinking is as follows:

This is again a very different use case as HybridSource. Users do allow
storing bounded data in a Kafka and if it is not used as the last source in
a HybridSource, it is a bounded source and can still benefit from larger
checkpoint interval wrt the high throughput (Kafka or any other storage
does not matter). BTW, the larger checkpoint interval for bounded source is
optional, users can use it but must not use it, if they don't care about
the throughput with bounded data.

Your proposal to dynamically adjust the checkpoint intervals is elegant! It
makes sense to build it as a generic feature in Flink. Looking forward to
it. However, for some user cases, e.g. when users were aware of the bounded
sources (in the HybridSource) and care more about the throughput, the
dynamic adjustment might not be required. Just let those bounded sources
always have larger checkpoint intervals even when there is no back
pressure. Because no one cares about latency in this case, let's turn off
the dynamic adjustment, reduce the checkpoint frequency, have better
throughput, and save unnecessary source consumption. Did I miss anything
here?

Best regards,
Jing


[1]
https://github.com/apache/flink/blob/6b6df3db466d6a030d5a38ec786ac3297cb41c38/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java#L244
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/#hybrid-source


On Thu, May 25, 2023 at 3:03 PM Dong Lin  wrote:

> Hi Piotr,
>
> Thanks for the discussion. Please see my comments inline.
>
> On Thu, May 25, 2023 at 6:34 PM Piotr Nowojski 
> wrote:
>
> > Hi all,
> >
> > Thanks for the discussion.
> >
> > @Dong
> >
> > > In the target use-case, we would like to HybridSource to trigger>
> > checkpoint more frequently when it is read the Kafka Source (than when it
> > > is reading the HDFS source). We would need to set a flag for the
> > checkpoint
> > > trigger to know which source the HybridSource is reading from.
> >
> > Is this really your actual goal? Should users care if some table defined
> in
> >
>
> My actual goal is to address the use-case described in the motivation
> section. More specifically,
> my goal is to provide API that uses can use to express their needed
> checkpointing interval
> at different phases of the job. So that Flink can achieve the maximum
> throughput while also meeting
> users' need for data freshness and failover time.
>
>
> > a meta store is backed by HybridSource or not? I think the actual goal is
> > this:
>
> As a user I would like to have a self adjusting mechanism for checkpointing
> > intervals, so that during the catch up phase my job focuses on throughput
> > to catch up ASAP, while during normal processing (without a large
> backlog)
> > Flink is trying to minimize e2e latency.
> >
>
> Sure. It will be great to have a way to support this self-adjusting
> mechanism. For now I am not able
> to come up with a good way to support this. I am happy to 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-25 Thread Dong Lin
Hi Piotr,

Thanks for the discussion. Please see my comments inline.

On Thu, May 25, 2023 at 6:34 PM Piotr Nowojski  wrote:

> Hi all,
>
> Thanks for the discussion.
>
> @Dong
>
> > In the target use-case, we would like to HybridSource to trigger>
> checkpoint more frequently when it is read the Kafka Source (than when it
> > is reading the HDFS source). We would need to set a flag for the
> checkpoint
> > trigger to know which source the HybridSource is reading from.
>
> Is this really your actual goal? Should users care if some table defined in
>

My actual goal is to address the use-case described in the motivation
section. More specifically,
my goal is to provide API that uses can use to express their needed
checkpointing interval
at different phases of the job. So that Flink can achieve the maximum
throughput while also meeting
users' need for data freshness and failover time.


> a meta store is backed by HybridSource or not? I think the actual goal is
> this:

As a user I would like to have a self adjusting mechanism for checkpointing
> intervals, so that during the catch up phase my job focuses on throughput
> to catch up ASAP, while during normal processing (without a large backlog)
> Flink is trying to minimize e2e latency.
>

Sure. It will be great to have a way to support this self-adjusting
mechanism. For now I am not able
to come up with a good way to support this. I am happy to discuss the
pros/cons if you can provide
more detail (e.g. API design) regarding how to support this approach.


>
> Am I right here?
>
> > there won't exist any "*conflicting* desired checkpoint trigger" by
> definition
>
> Ok, arguably there won't be a conflict, but the decision to pick minimum
> out of the upper bounds might be sub-optimal.
>

As of today, users need checkpoint in order to address two goals. One goal
is to upper-bound
data staleness when there is sink with exactly-once semantics (e.g.
Paimon), since those sinks
can only output data when checkpoint is triggered. The other goal is to
upper-bound the amount of
duplicate work needed after failover.

In both cases, users need to upper-bound the checkpointing interval. This
makes it more intuitive
for the config to only express the checkpointing interval upper-bound.


>
> > Overall, I am not sure we always want to have a longer checkpointing
> > interval. That really depends on the specific use-case and the job graph.
>
> Yes, that's why I proposed something a little bit more generic.
>

I am not sure I fully understand the alternative proposal that is meant to
be more generic. So it is hard for me to evaluate the pros/cons.

I understand that you preferred for the source operator to use the REST API
to trigger checkpoints. This sounds
like a downside since using REST API is not as easy as using the
programming API proposed in the FLIP.

Can you help explain the generic approach more concretely, such as the APIs
you would suggest introducing? That would
allow me to evaluate the pros/cons and hopefully pick the best option.


>
> > I believe there can be use-case where
> > the proposed API is not useful, in which case users can choose not to use
> > the API without incurring any performance regression.
>
> I'm not saying that this proposal is not useful. Just that we might be able
> to solve this problem in a more flexible manner. If we introduce a
> partially working solution now at the source level, and later we will still
> need a different solution on another level to cover other use cases, that
> would clog the API and confuse users.
>

Can you explain why this is "partially working"? Is it because there are
use-cases that should
be addressed but not already covered by the proposed approach?

If so, can you help explain the use-case that would be useful to address?
With concrete
use-cases in mind, we can pick the API with minimal change to address these
use-cases.


>
> @Jing
>
> > @Piotr
> > Just out of curiosity, do you know any real use cases where real-time
> data is processed before the backlog?
>
> For example join (windowed/temporal) of two tables backed by a hybrid
> source? I could easily see a scenario where one table with little data
> catches up much more quickly.
>
> @Jing and @Dong
>
> About the (un)boundness of the input stream. I'm not sure if that should
> actually matter. Actually the same issue, with two frequent checkpointing
>

Indeed, I agree with you on this point and prefer not to have this proposal
depend on the (un)boundness.


> during a catch up period or when Flink is overloaded, could affect jobs
> that are purely unbounded, like continuously reading from Kafka. Even more,
> nothing prevents users from actually storing bounded data in a Kafka topic.
> Either way, I would like to refine my earlier idea, and instead of using
> metrics like `pendingRecords`, I think we could switch between fast and
> slow checkpointing intervals based on the information if the job is
> backpressured or not. My thinking is as follows:
>
> 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-25 Thread Piotr Nowojski
Hi all,

Thanks for the discussion.

@Dong

> In the target use-case, we would like to HybridSource to trigger>
checkpoint more frequently when it is read the Kafka Source (than when it
> is reading the HDFS source). We would need to set a flag for the
checkpoint
> trigger to know which source the HybridSource is reading from.

Is this really your actual goal? Should users care if some table defined in
a meta store is backed by HybridSource or not? I think the actual goal is
this:
As a user I would like to have a self adjusting mechanism for checkpointing
intervals, so that during the catch up phase my job focuses on throughput
to catch up ASAP, while during normal processing (without a large backlog)
Flink is trying to minimize e2e latency.

Am I right here?

> there won't exist any "*conflicting* desired checkpoint trigger" by
definition

Ok, arguably there won't be a conflict, but the decision to pick minimum
out of the upper bounds might be sub-optimal.

> Overall, I am not sure we always want to have a longer checkpointing
> interval. That really depends on the specific use-case and the job graph.

Yes, that's why I proposed something a little bit more generic.

> I believe there can be use-case where
> the proposed API is not useful, in which case users can choose not to use
> the API without incurring any performance regression.

I'm not saying that this proposal is not useful. Just that we might be able
to solve this problem in a more flexible manner. If we introduce a
partially working solution now at the source level, and later we will still
need a different solution on another level to cover other use cases, that
would clog the API and confuse users.

@Jing

> @Piotr
> Just out of curiosity, do you know any real use cases where real-time
data is processed before the backlog?

For example join (windowed/temporal) of two tables backed by a hybrid
source? I could easily see a scenario where one table with little data
catches up much more quickly.

@Jing and @Dong

About the (un)boundness of the input stream. I'm not sure if that should
actually matter. Actually the same issue, with two frequent checkpointing
during a catch up period or when Flink is overloaded, could affect jobs
that are purely unbounded, like continuously reading from Kafka. Even more,
nothing prevents users from actually storing bounded data in a Kafka topic.
Either way, I would like to refine my earlier idea, and instead of using
metrics like `pendingRecords`, I think we could switch between fast and
slow checkpointing intervals based on the information if the job is
backpressured or not. My thinking is as follows:

As a user, I would like to have my regular fast checkpointing interval for
low latency, but the moment my system is not keeping up, if the
backpressure builds up, or simply we have a huge backlog to reprocess,
latency doesn't matter anymore. Only throughput matters. So I would like
the checkpointing to slow down.

I think this should cover pretty well most of the cases, what do you think?
If this backpressured based behaviour is still not enough, I would still
say that we should provide plugable checkpoint triggering controllers that
would work based on metrics.

Best,
Piotrek

czw., 25 maj 2023 o 07:47 Dong Lin  napisał(a):

> Hi Jing,
>
> Thanks for your comments!
>
> Regarding the idea of using the existing "boundedness" attribute of
> sources, that is indeed something that we might find intuitive initially. I
> have thought about this idea, but could not find a good way to make it
> work. I will try to explain my thoughts and see if we can find a better
> solution.
>
> Here is my understanding of the idea mentioned above: provide a job level
> config execution.checkpoint.interval.bounded. Flink will use this as the
> checkpointing interval whenever there exists at least one running source
> which claims it is under the "bounded" stage.
>
> Note that we can not simply re-use the existing "boundedness" attribute of
> source operators. The reason is that for sources such as MySQL CDC, its
> boundedness can be "continuous_unbounded" because it can run continuously.
> But MySQL CDC has two phases internally, where the source needs to first
> read a snapshot (with bounded amount of data) and then read a binlog (with
> unbounded amount of data).
>
> As a result, in order to support optimization for souces like MySQL CDC, we
> need to expose an API for the source operator to declare whether it is
> running at a bounded or continuous_unbounded stage. *This introduces the
> need to define a new concept named "bounded stage".*
>
> Then, we will need to *introduce a new contract between source operators
> and the Flink runtime*, saying that if there is a source that claims it is
> running at the bounded stage, then Flink will use the "
> execution.checkpoint.interval.bounded" as the checkpointing interval.
>
> Here are the the concerns I have with this approach:
>
> - The execution.checkpoint.interval.bounded is a top-level 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-24 Thread Dong Lin
Hi Jing,

Thanks for your comments!

Regarding the idea of using the existing "boundedness" attribute of
sources, that is indeed something that we might find intuitive initially. I
have thought about this idea, but could not find a good way to make it
work. I will try to explain my thoughts and see if we can find a better
solution.

Here is my understanding of the idea mentioned above: provide a job level
config execution.checkpoint.interval.bounded. Flink will use this as the
checkpointing interval whenever there exists at least one running source
which claims it is under the "bounded" stage.

Note that we can not simply re-use the existing "boundedness" attribute of
source operators. The reason is that for sources such as MySQL CDC, its
boundedness can be "continuous_unbounded" because it can run continuously.
But MySQL CDC has two phases internally, where the source needs to first
read a snapshot (with bounded amount of data) and then read a binlog (with
unbounded amount of data).

As a result, in order to support optimization for souces like MySQL CDC, we
need to expose an API for the source operator to declare whether it is
running at a bounded or continuous_unbounded stage. *This introduces the
need to define a new concept named "bounded stage".*

Then, we will need to *introduce a new contract between source operators
and the Flink runtime*, saying that if there is a source that claims it is
running at the bounded stage, then Flink will use the "
execution.checkpoint.interval.bounded" as the checkpointing interval.

Here are the the concerns I have with this approach:

- The execution.checkpoint.interval.bounded is a top-level config, meaning
that every Flink user needs to read about its semantics. In comparison, the
proposed approach only requires users of specific sources (e.g.
HybridSource, MySQL CDC) to know the new source-specific config.

- It introduces a new top-level concept in Flink to describe the internal
stages of specific sources (e.g. MySQL CDC). In comparison, the proposed
approach only requires users of specific sources (e.g. HybridSource, MySQL
CDC) to know this concept, which not only makes the explanation much
simpler (since they are already using the specific sources), but also
limits the scope of this new concept (only these users need to know this
concept).

- It is harder to understand the existing config execution.checkpoint.interval.
Because we need to explain that it is only used when there is no source
with "bounded stage", introducing more if-else for this config. In
comparison, with the proposed approach, the semantics of
execution.checkpoint.interval is simpler without if/else, as it will always
be applied regardless which sources users are using.

I am happy to discuss if there are better approaches.

Thanks,
Dong


On Wed, May 24, 2023 at 8:23 AM Jing Ge  wrote:

> Hi Yunfeng, Hi Dong
>
> Thanks for the informative discussion! It is a rational requirement to set
> different checkpoint intervals for different sources in a hybridSource. The
> tiny downside of this proposal, at least for me, is that I have to
> understand the upper-bound definition of the interval and the built-in rule
> for Flink to choose the minimum value between it and the default interval
> setting. However, afaiac, the intention of this built-in rule is to
> minimize changes in Flink to support the request feature which is a very
> thoughtful move. Thanks for taking care of it. +1 for the Proposal.
>
> Another very rough idea was rising in my mind while I was reading the FLIP.
> I didn't do a deep dive with related source code yet, so please correct me
> if I am wrong. The use case shows that two different checkpoint intervals
> should be set for bounded(historical) stream and unbounded(fresh real-time)
> stream sources. It is a trade-off between throughput and latency, i.e.
> bounded stream with large checkpoint interval for better throughput and
> unbounded stream with small checkpoint interval for lower latency (in case
> of failover). As we could see that the different interval setting depends
> on the boundedness of streams. Since the Source API already has its own
> boundedness flag[1], is it possible to define two interval configurations
> and let Flink automatically set the related one to the source based on the
> known boundedness? The interval for bounded stream could be like
> execution.checkpoint.interval.bounded(naming could be reconsidered), and
> the other one for unbounded stream, we could use the existing one
> execution.checkpoint.interval by default, or introduce a new one like
> execution.checkpoint.interval.unbounded. In this way, no API change is
> required.
>
> @Piotr
> Just out of curiosity, do you know any real use cases where real-time data
> is processed before the backlog? Semantically, the backlog contains
> historical data that has to be processed before the real-time data is
> allowed to be processed. Otherwise, up-to-date data will be overwritten by
> out-of-date 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-23 Thread Jing Ge
Hi Yunfeng, Hi Dong

Thanks for the informative discussion! It is a rational requirement to set
different checkpoint intervals for different sources in a hybridSource. The
tiny downside of this proposal, at least for me, is that I have to
understand the upper-bound definition of the interval and the built-in rule
for Flink to choose the minimum value between it and the default interval
setting. However, afaiac, the intention of this built-in rule is to
minimize changes in Flink to support the request feature which is a very
thoughtful move. Thanks for taking care of it. +1 for the Proposal.

Another very rough idea was rising in my mind while I was reading the FLIP.
I didn't do a deep dive with related source code yet, so please correct me
if I am wrong. The use case shows that two different checkpoint intervals
should be set for bounded(historical) stream and unbounded(fresh real-time)
stream sources. It is a trade-off between throughput and latency, i.e.
bounded stream with large checkpoint interval for better throughput and
unbounded stream with small checkpoint interval for lower latency (in case
of failover). As we could see that the different interval setting depends
on the boundedness of streams. Since the Source API already has its own
boundedness flag[1], is it possible to define two interval configurations
and let Flink automatically set the related one to the source based on the
known boundedness? The interval for bounded stream could be like
execution.checkpoint.interval.bounded(naming could be reconsidered), and
the other one for unbounded stream, we could use the existing one
execution.checkpoint.interval by default, or introduce a new one like
execution.checkpoint.interval.unbounded. In this way, no API change is
required.

@Piotr
Just out of curiosity, do you know any real use cases where real-time data
is processed before the backlog? Semantically, the backlog contains
historical data that has to be processed before the real-time data is
allowed to be processed. Otherwise, up-to-date data will be overwritten by
out-of-date data which turns out to be unexpected results in real business
scenarios.


Best regards,
Jing

[1]
https://github.com/apache/flink/blob/fadde2a378aac4293676944dd513291919a481e3/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java#L41

On Tue, May 23, 2023 at 5:53 PM Dong Lin  wrote:

> Hi Piotr,
>
> Thanks for the comments. Let me try to understand your concerns and
> hopefully address the concerns.
>
> >> What would happen if there are two (or more) operator coordinators with
> conflicting desired checkpoint trigger behaviour
>
> With the proposed change, there won't exist any "*conflicting* desired
> checkpoint trigger" by definition. Both job-level config and the proposed
> API upperBoundCheckpointingInterval() means the upper-bound of the
> checkpointing interval. If there are different upper-bounds proposed by
> different source operators and the job-level config, Flink will try to
> periodically trigger checkpoints at the interval corresponding to the
> minimum of all these proposed upper-bounds.
>
> >> If one source is processing a backlog and the other is already
> processing real time data..
>
> Overall, I am not sure we always want to have a longer checkpointing
> interval. That really depends on the specific use-case and the job graph.
>
> The proposed API change mechanism for operators and users to specify
> different checkpoint intervals at different periods of the job. Users have
> the option to use the new API to get better performance in the use-case
> specified in the motivation section. I believe there can be use-case where
> the proposed API is not useful, in which case users can choose not to use
> the API without incurring any performance regression.
>
> >> it might be a bit confusing and not user friendly to have multiple
> places that can override the checkpointing behaviour in a different way
>
> Admittedly, adding more APIs always incur more complexity. But sometimes we
> have to incur this complexity to address new use-cases. Maybe we can see if
> there are more user-friendly way to address this use-case.
>
> >> already implemented and is simple from the perspective of Flink
>
> Do you mean that the HybridSource operator should invoke the rest API to
> trigger checkpoints? The downside of this approach is that it makes it hard
> for developers of source operators (e.g. MySQL CDC, HybridSource) to
> address the target use-case. AFAIK, there is no existing case where we
> require operator developers to use REST API to do their job.
>
> Can you help explain the benefit of using REST API over using the proposed
> API?
>
> Note that this approach also seems to have the same downside mentioned
> above: "multiple places that can override the checkpointing behaviour". I
> am not sure there can be a solution to address the target use-case without
> having multiple places that can affect the checkpointing behavior.
>
> >> check if 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-23 Thread Dong Lin
Hi Piotr,

Thanks for the comments. Let me try to understand your concerns and
hopefully address the concerns.

>> What would happen if there are two (or more) operator coordinators with
conflicting desired checkpoint trigger behaviour

With the proposed change, there won't exist any "*conflicting* desired
checkpoint trigger" by definition. Both job-level config and the proposed
API upperBoundCheckpointingInterval() means the upper-bound of the
checkpointing interval. If there are different upper-bounds proposed by
different source operators and the job-level config, Flink will try to
periodically trigger checkpoints at the interval corresponding to the
minimum of all these proposed upper-bounds.

>> If one source is processing a backlog and the other is already
processing real time data..

Overall, I am not sure we always want to have a longer checkpointing
interval. That really depends on the specific use-case and the job graph.

The proposed API change mechanism for operators and users to specify
different checkpoint intervals at different periods of the job. Users have
the option to use the new API to get better performance in the use-case
specified in the motivation section. I believe there can be use-case where
the proposed API is not useful, in which case users can choose not to use
the API without incurring any performance regression.

>> it might be a bit confusing and not user friendly to have multiple
places that can override the checkpointing behaviour in a different way

Admittedly, adding more APIs always incur more complexity. But sometimes we
have to incur this complexity to address new use-cases. Maybe we can see if
there are more user-friendly way to address this use-case.

>> already implemented and is simple from the perspective of Flink

Do you mean that the HybridSource operator should invoke the rest API to
trigger checkpoints? The downside of this approach is that it makes it hard
for developers of source operators (e.g. MySQL CDC, HybridSource) to
address the target use-case. AFAIK, there is no existing case where we
require operator developers to use REST API to do their job.

Can you help explain the benefit of using REST API over using the proposed
API?

Note that this approach also seems to have the same downside mentioned
above: "multiple places that can override the checkpointing behaviour". I
am not sure there can be a solution to address the target use-case without
having multiple places that can affect the checkpointing behavior.

>> check if `pendingRecords` for some source has exceeded the configured
threshold and based on that adjust the checkpointing interval accordingly

I am not sure this approach can address the target use-case in a better
way. In the target use-case, we would like to HybridSource to trigger
checkpoint more frequently when it is read the Kafka Source (than when it
is reading the HDFS source). We would need to set a flag for the checkpoint
trigger to know which source the HybridSource is reading from. But IMO the
approach is less intuitive and more complex than having the HybridSource
invoke upperBoundCheckpointingInterval() directly once it is reading Kafka
Source.

Maybe I did not understand the alternative approach rightly. I am happy to
discuss more on this topic. WDYT?


Best,
Dong

On Tue, May 23, 2023 at 10:27 PM Piotr Nowojski 
wrote:

> Hi,
>
> Thanks for the proposal. However, are you sure that the
> OperatorCoordinator is the right place to place such logic? What would
> happen if there are two (or more) operator coordinators with conflicting
> desired checkpoint trigger behaviour? If one source is processing a backlog
> and the other is already processing real time data, I would assume that in
> most use cases you would like to still have the longer checkpointing
> interval, not the shorter one. Also apart from that, it might be a bit
> confusing and not user friendly to have multiple places that can override
> the checkpointing behaviour in a different way.
>
> FIY in the past, we had some discussions about similar requests and back
> then we chose to keep the system simpler, and exposed a more generic REST
> API checkpoint triggering mechanism. I know that having to implement such
> logic outside of Flink and having to call REST calls to trigger checkpoints
> might not be ideal, but that's already implemented and is simple from the
> perspective of Flink.
>
> I don't know, maybe instead of adding this logic to operator coordinators,
> `CheckpointCoordinator` should have a pluggable `CheckpointTrigger`, that
> the user could configure like a `MetricReporter`. The default one would be
> just periodically triggering checkpoints. Maybe
> `BacklogDynamicCheckpointTrigger` could look at metrics[1], check if
> `pendingRecords` for some source has exceeded the configured threshold and
> based on that adjust the checkpointing interval accordingly? This would at
> least address some of my concerns.
>
> WDYT?
>
> Best,
> Piotrek
>
> [1]
> 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-23 Thread Piotr Nowojski
Hi,

Thanks for the proposal. However, are you sure that the OperatorCoordinator
is the right place to place such logic? What would happen if there are two
(or more) operator coordinators with conflicting desired checkpoint trigger
behaviour? If one source is processing a backlog and the other is already
processing real time data, I would assume that in most use cases you would
like to still have the longer checkpointing interval, not the shorter one.
Also apart from that, it might be a bit confusing and not user friendly to
have multiple places that can override the checkpointing behaviour in a
different way.

FIY in the past, we had some discussions about similar requests and back
then we chose to keep the system simpler, and exposed a more generic REST
API checkpoint triggering mechanism. I know that having to implement such
logic outside of Flink and having to call REST calls to trigger checkpoints
might not be ideal, but that's already implemented and is simple from the
perspective of Flink.

I don't know, maybe instead of adding this logic to operator coordinators,
`CheckpointCoordinator` should have a pluggable `CheckpointTrigger`, that
the user could configure like a `MetricReporter`. The default one would be
just periodically triggering checkpoints. Maybe
`BacklogDynamicCheckpointTrigger` could look at metrics[1], check if
`pendingRecords` for some source has exceeded the configured threshold and
based on that adjust the checkpointing interval accordingly? This would at
least address some of my concerns.

WDYT?

Best,
Piotrek

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics

wt., 9 maj 2023 o 19:11 Yunfeng Zhou 
napisał(a):

> Hi all,
>
> Dong(cc'ed) and I are opening this thread to discuss our proposal to
> support dynamically triggering checkpoints from operators, which has
> been documented in FLIP-309
> <
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255069517
> >.
>
> With the help of the ability proposed in this FLIP, users could
> improve the performance of their Flink job in cases like when the job
> needs to process both historical batch data and real-time streaming
> data, by adjusting the checkpoint triggerings in different phases of a
> HybridSource or CDC source.
>
> This proposal would be a fundamental component in the effort to
> further unify Flink's batch and stream processing ability. Please feel
> free to reply to this email thread and share with us your opinions.
>
> Best regards.
>
> Dong and Yunfeng
>


[DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-09 Thread Yunfeng Zhou
Hi all,

Dong(cc'ed) and I are opening this thread to discuss our proposal to
support dynamically triggering checkpoints from operators, which has
been documented in FLIP-309
.

With the help of the ability proposed in this FLIP, users could
improve the performance of their Flink job in cases like when the job
needs to process both historical batch data and real-time streaming
data, by adjusting the checkpoint triggerings in different phases of a
HybridSource or CDC source.

This proposal would be a fundamental component in the effort to
further unify Flink's batch and stream processing ability. Please feel
free to reply to this email thread and share with us your opinions.

Best regards.

Dong and Yunfeng