Hi Piotr,

Thanks for the discussion. Please see my comments inline.

On Thu, May 25, 2023 at 6:34 PM Piotr Nowojski <pnowoj...@apache.org> wrote:

> Hi all,
>
> Thanks for the discussion.
>
> @Dong
>
> > In the target use-case, we would like to HybridSource to trigger>
> checkpoint more frequently when it is read the Kafka Source (than when it
> > is reading the HDFS source). We would need to set a flag for the
> checkpoint
> > trigger to know which source the HybridSource is reading from.
>
> Is this really your actual goal? Should users care if some table defined in
>

My actual goal is to address the use-case described in the motivation
section. More specifically,
my goal is to provide API that uses can use to express their needed
checkpointing interval
at different phases of the job. So that Flink can achieve the maximum
throughput while also meeting
users' need for data freshness and failover time.


> a meta store is backed by HybridSource or not? I think the actual goal is
> this:

As a user I would like to have a self adjusting mechanism for checkpointing
> intervals, so that during the catch up phase my job focuses on throughput
> to catch up ASAP, while during normal processing (without a large backlog)
> Flink is trying to minimize e2e latency.
>

Sure. It will be great to have a way to support this self-adjusting
mechanism. For now I am not able
to come up with a good way to support this. I am happy to discuss the
pros/cons if you can provide
more detail (e.g. API design) regarding how to support this approach.


>
> Am I right here?
>
> > there won't exist any "*conflicting* desired checkpoint trigger" by
> definition
>
> Ok, arguably there won't be a conflict, but the decision to pick minimum
> out of the upper bounds might be sub-optimal.
>

As of today, users need checkpoint in order to address two goals. One goal
is to upper-bound
data staleness when there is sink with exactly-once semantics (e.g.
Paimon), since those sinks
can only output data when checkpoint is triggered. The other goal is to
upper-bound the amount of
duplicate work needed after failover.

In both cases, users need to upper-bound the checkpointing interval. This
makes it more intuitive
for the config to only express the checkpointing interval upper-bound.


>
> > Overall, I am not sure we always want to have a longer checkpointing
> > interval. That really depends on the specific use-case and the job graph.
>
> Yes, that's why I proposed something a little bit more generic.
>

I am not sure I fully understand the alternative proposal that is meant to
be more generic. So it is hard for me to evaluate the pros/cons.

I understand that you preferred for the source operator to use the REST API
to trigger checkpoints. This sounds
like a downside since using REST API is not as easy as using the
programming API proposed in the FLIP.

Can you help explain the generic approach more concretely, such as the APIs
you would suggest introducing? That would
allow me to evaluate the pros/cons and hopefully pick the best option.


>
> > I believe there can be use-case where
> > the proposed API is not useful, in which case users can choose not to use
> > the API without incurring any performance regression.
>
> I'm not saying that this proposal is not useful. Just that we might be able
> to solve this problem in a more flexible manner. If we introduce a
> partially working solution now at the source level, and later we will still
> need a different solution on another level to cover other use cases, that
> would clog the API and confuse users.
>

Can you explain why this is "partially working"? Is it because there are
use-cases that should
be addressed but not already covered by the proposed approach?

If so, can you help explain the use-case that would be useful to address?
With concrete
use-cases in mind, we can pick the API with minimal change to address these
use-cases.


>
> @Jing
>
> > @Piotr
> > Just out of curiosity, do you know any real use cases where real-time
> data is processed before the backlog?
>
> For example join (windowed/temporal) of two tables backed by a hybrid
> source? I could easily see a scenario where one table with little data
> catches up much more quickly.
>
> @Jing and @Dong
>
> About the (un)boundness of the input stream. I'm not sure if that should
> actually matter. Actually the same issue, with two frequent checkpointing
>

Indeed, I agree with you on this point and prefer not to have this proposal
depend on the (un)boundness.


> during a catch up period or when Flink is overloaded, could affect jobs
> that are purely unbounded, like continuously reading from Kafka. Even more,
> nothing prevents users from actually storing bounded data in a Kafka topic.
> Either way, I would like to refine my earlier idea, and instead of using
> metrics like `pendingRecords`, I think we could switch between fast and
> slow checkpointing intervals based on the information if the job is
> backpressured or not. My thinking is as follows:
>
> As a user, I would like to have my regular fast checkpointing interval for
> low latency, but the moment my system is not keeping up, if the
> backpressure builds up, or simply we have a huge backlog to reprocess,
> latency doesn't matter anymore. Only throughput matters. So I would like
> the checkpointing to slow down.
>
> I think this should cover pretty well most of the cases, what do you think?
>

Thank you for all the comments and this idea. I like this idea. We actually
thought about this idea before proposing this FLIP.

In order to make this idea work, we need to come-up with a good algorithm
that can dynamically change the checkpointing interval based on the
"backlog signal", without causing regression w.r.t. failover time and data
freshness. I find it hard to come up with this algorithm due to
insufficient "backlog signal".

For the use-case mentioned in the motivation section, the data in the
source does not have event timestamps to help determine the amount of
backlog. So the only source-of-truth for determining backlog is the amount
of data buffered in operators. But the buffer size is typically chosen to
be proportional to round-trip-time and throughput. Having a full buffer
does not necessarily mean that the data is lagging behind. And increasing
the checkpointing interval with insufficient "backlog signal" can have a
negative impact on data freshness and failover time.

In order to make this idea work, we would need to *provide* that the
algorithm would not negatively hurt data freshness and failover time when
it decides to increase checkpointing intervals. For now I cold not come up
with such an algorithm.

If this backpressured based behaviour is still not enough, I would still
> say that we should provide plugable checkpoint triggering controllers that
> would work based on metrics.


I am not sure how to address the use-case mentioned in the motivation
section, with the pluggable checkpoint trigger + metrics. Can you help
provide the definition of these APIs and kindly explain how that works to
address the mentioned use-case.

In the mentioned use-case, users want to have two different checkpointing
intervals at different phases of the HybridSource. We should provide an API
for users to express the extra checkpointing interval in addition to the
existing execution.checkpointing.interval. What would be the definition of
that API with this alternative approach?

Best,
Dong


>
> Best,
> Piotrek
>
> czw., 25 maj 2023 o 07:47 Dong Lin <lindon...@gmail.com> napisał(a):
>
> > Hi Jing,
> >
> > Thanks for your comments!
> >
> > Regarding the idea of using the existing "boundedness" attribute of
> > sources, that is indeed something that we might find intuitive
> initially. I
> > have thought about this idea, but could not find a good way to make it
> > work. I will try to explain my thoughts and see if we can find a better
> > solution.
> >
> > Here is my understanding of the idea mentioned above: provide a job level
> > config execution.checkpoint.interval.bounded. Flink will use this as the
> > checkpointing interval whenever there exists at least one running source
> > which claims it is under the "bounded" stage.
> >
> > Note that we can not simply re-use the existing "boundedness" attribute
> of
> > source operators. The reason is that for sources such as MySQL CDC, its
> > boundedness can be "continuous_unbounded" because it can run
> continuously.
> > But MySQL CDC has two phases internally, where the source needs to first
> > read a snapshot (with bounded amount of data) and then read a binlog
> (with
> > unbounded amount of data).
> >
> > As a result, in order to support optimization for souces like MySQL CDC,
> we
> > need to expose an API for the source operator to declare whether it is
> > running at a bounded or continuous_unbounded stage. *This introduces the
> > need to define a new concept named "bounded stage".*
> >
> > Then, we will need to *introduce a new contract between source operators
> > and the Flink runtime*, saying that if there is a source that claims it
> is
> > running at the bounded stage, then Flink will use the "
> > execution.checkpoint.interval.bounded" as the checkpointing interval.
> >
> > Here are the the concerns I have with this approach:
> >
> > - The execution.checkpoint.interval.bounded is a top-level config,
> meaning
> > that every Flink user needs to read about its semantics. In comparison,
> the
> > proposed approach only requires users of specific sources (e.g.
> > HybridSource, MySQL CDC) to know the new source-specific config.
> >
> > - It introduces a new top-level concept in Flink to describe the internal
> > stages of specific sources (e.g. MySQL CDC). In comparison, the proposed
> > approach only requires users of specific sources (e.g. HybridSource,
> MySQL
> > CDC) to know this concept, which not only makes the explanation much
> > simpler (since they are already using the specific sources), but also
> > limits the scope of this new concept (only these users need to know this
> > concept).
> >
> > - It is harder to understand the existing config
> > execution.checkpoint.interval.
> > Because we need to explain that it is only used when there is no source
> > with "bounded stage", introducing more if-else for this config. In
> > comparison, with the proposed approach, the semantics of
> > execution.checkpoint.interval is simpler without if/else, as it will
> always
> > be applied regardless which sources users are using.
> >
> > I am happy to discuss if there are better approaches.
> >
> > Thanks,
> > Dong
> >
> >
> > On Wed, May 24, 2023 at 8:23 AM Jing Ge <j...@ververica.com.invalid>
> > wrote:
> >
> > > Hi Yunfeng, Hi Dong
> > >
> > > Thanks for the informative discussion! It is a rational requirement to
> > set
> > > different checkpoint intervals for different sources in a hybridSource.
> > The
> > > tiny downside of this proposal, at least for me, is that I have to
> > > understand the upper-bound definition of the interval and the built-in
> > rule
> > > for Flink to choose the minimum value between it and the default
> interval
> > > setting. However, afaiac, the intention of this built-in rule is to
> > > minimize changes in Flink to support the request feature which is a
> very
> > > thoughtful move. Thanks for taking care of it. +1 for the Proposal.
> > >
> > > Another very rough idea was rising in my mind while I was reading the
> > FLIP.
> > > I didn't do a deep dive with related source code yet, so please correct
> > me
> > > if I am wrong. The use case shows that two different checkpoint
> intervals
> > > should be set for bounded(historical) stream and unbounded(fresh
> > real-time)
> > > stream sources. It is a trade-off between throughput and latency, i.e.
> > > bounded stream with large checkpoint interval for better throughput and
> > > unbounded stream with small checkpoint interval for lower latency (in
> > case
> > > of failover). As we could see that the different interval setting
> depends
> > > on the boundedness of streams. Since the Source API already has its own
> > > boundedness flag[1], is it possible to define two interval
> configurations
> > > and let Flink automatically set the related one to the source based on
> > the
> > > known boundedness? The interval for bounded stream could be like
> > > execution.checkpoint.interval.bounded(naming could be reconsidered),
> and
> > > the other one for unbounded stream, we could use the existing one
> > > execution.checkpoint.interval by default, or introduce a new one like
> > > execution.checkpoint.interval.unbounded. In this way, no API change is
> > > required.
> > >
> > > @Piotr
> > > Just out of curiosity, do you know any real use cases where real-time
> > data
> > > is processed before the backlog? Semantically, the backlog contains
> > > historical data that has to be processed before the real-time data is
> > > allowed to be processed. Otherwise, up-to-date data will be overwritten
> > by
> > > out-of-date data which turns out to be unexpected results in real
> > business
> > > scenarios.
> > >
> > >
> > > Best regards,
> > > Jing
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/apache/flink/blob/fadde2a378aac4293676944dd513291919a481e3/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java#L41
> > >
> > > On Tue, May 23, 2023 at 5:53 PM Dong Lin <lindon...@gmail.com> wrote:
> > >
> > > > Hi Piotr,
> > > >
> > > > Thanks for the comments. Let me try to understand your concerns and
> > > > hopefully address the concerns.
> > > >
> > > > >> What would happen if there are two (or more) operator coordinators
> > > with
> > > > conflicting desired checkpoint trigger behaviour
> > > >
> > > > With the proposed change, there won't exist any "*conflicting*
> desired
> > > > checkpoint trigger" by definition. Both job-level config and the
> > proposed
> > > > API upperBoundCheckpointingInterval() means the upper-bound of the
> > > > checkpointing interval. If there are different upper-bounds proposed
> by
> > > > different source operators and the job-level config, Flink will try
> to
> > > > periodically trigger checkpoints at the interval corresponding to the
> > > > minimum of all these proposed upper-bounds.
> > > >
> > > > >> If one source is processing a backlog and the other is already
> > > > processing real time data..
> > > >
> > > > Overall, I am not sure we always want to have a longer checkpointing
> > > > interval. That really depends on the specific use-case and the job
> > graph.
> > > >
> > > > The proposed API change mechanism for operators and users to specify
> > > > different checkpoint intervals at different periods of the job. Users
> > > have
> > > > the option to use the new API to get better performance in the
> use-case
> > > > specified in the motivation section. I believe there can be use-case
> > > where
> > > > the proposed API is not useful, in which case users can choose not to
> > use
> > > > the API without incurring any performance regression.
> > > >
> > > > >> it might be a bit confusing and not user friendly to have multiple
> > > > places that can override the checkpointing behaviour in a different
> way
> > > >
> > > > Admittedly, adding more APIs always incur more complexity. But
> > sometimes
> > > we
> > > > have to incur this complexity to address new use-cases. Maybe we can
> > see
> > > if
> > > > there are more user-friendly way to address this use-case.
> > > >
> > > > >> already implemented and is simple from the perspective of Flink
> > > >
> > > > Do you mean that the HybridSource operator should invoke the rest API
> > to
> > > > trigger checkpoints? The downside of this approach is that it makes
> it
> > > hard
> > > > for developers of source operators (e.g. MySQL CDC, HybridSource) to
> > > > address the target use-case. AFAIK, there is no existing case where
> we
> > > > require operator developers to use REST API to do their job.
> > > >
> > > > Can you help explain the benefit of using REST API over using the
> > > proposed
> > > > API?
> > > >
> > > > Note that this approach also seems to have the same downside
> mentioned
> > > > above: "multiple places that can override the checkpointing
> > behaviour". I
> > > > am not sure there can be a solution to address the target use-case
> > > without
> > > > having multiple places that can affect the checkpointing behavior.
> > > >
> > > > >> check if `pendingRecords` for some source has exceeded the
> > configured
> > > > threshold and based on that adjust the checkpointing interval
> > accordingly
> > > >
> > > > I am not sure this approach can address the target use-case in a
> better
> > > > way. In the target use-case, we would like to HybridSource to trigger
> > > > checkpoint more frequently when it is read the Kafka Source (than
> when
> > it
> > > > is reading the HDFS source). We would need to set a flag for the
> > > checkpoint
> > > > trigger to know which source the HybridSource is reading from. But
> IMO
> > > the
> > > > approach is less intuitive and more complex than having the
> > HybridSource
> > > > invoke upperBoundCheckpointingInterval() directly once it is reading
> > > Kafka
> > > > Source.
> > > >
> > > > Maybe I did not understand the alternative approach rightly. I am
> happy
> > > to
> > > > discuss more on this topic. WDYT?
> > > >
> > > >
> > > > Best,
> > > > Dong
> > > >
> > > > On Tue, May 23, 2023 at 10:27 PM Piotr Nowojski <
> pnowoj...@apache.org>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > Thanks for the proposal. However, are you sure that the
> > > > > OperatorCoordinator is the right place to place such logic? What
> > would
> > > > > happen if there are two (or more) operator coordinators with
> > > conflicting
> > > > > desired checkpoint trigger behaviour? If one source is processing a
> > > > backlog
> > > > > and the other is already processing real time data, I would assume
> > that
> > > > in
> > > > > most use cases you would like to still have the longer
> checkpointing
> > > > > interval, not the shorter one. Also apart from that, it might be a
> > bit
> > > > > confusing and not user friendly to have multiple places that can
> > > override
> > > > > the checkpointing behaviour in a different way.
> > > > >
> > > > > FIY in the past, we had some discussions about similar requests and
> > > back
> > > > > then we chose to keep the system simpler, and exposed a more
> generic
> > > REST
> > > > > API checkpoint triggering mechanism. I know that having to
> implement
> > > such
> > > > > logic outside of Flink and having to call REST calls to trigger
> > > > checkpoints
> > > > > might not be ideal, but that's already implemented and is simple
> from
> > > the
> > > > > perspective of Flink.
> > > > >
> > > > > I don't know, maybe instead of adding this logic to operator
> > > > coordinators,
> > > > > `CheckpointCoordinator` should have a pluggable
> `CheckpointTrigger`,
> > > that
> > > > > the user could configure like a `MetricReporter`. The default one
> > would
> > > > be
> > > > > just periodically triggering checkpoints. Maybe
> > > > > `BacklogDynamicCheckpointTrigger` could look at metrics[1], check
> if
> > > > > `pendingRecords` for some source has exceeded the configured
> > threshold
> > > > and
> > > > > based on that adjust the checkpointing interval accordingly? This
> > would
> > > > at
> > > > > least address some of my concerns.
> > > > >
> > > > > WDYT?
> > > > >
> > > > > Best,
> > > > > Piotrek
> > > > >
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > > >
> > > > > wt., 9 maj 2023 o 19:11 Yunfeng Zhou <flink.zhouyunf...@gmail.com>
> > > > > napisał(a):
> > > > >
> > > > >> Hi all,
> > > > >>
> > > > >> Dong(cc'ed) and I are opening this thread to discuss our proposal
> to
> > > > >> support dynamically triggering checkpoints from operators, which
> has
> > > > >> been documented in FLIP-309
> > > > >> <
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255069517
> > > > >> >.
> > > > >>
> > > > >> With the help of the ability proposed in this FLIP, users could
> > > > >> improve the performance of their Flink job in cases like when the
> > job
> > > > >> needs to process both historical batch data and real-time
> streaming
> > > > >> data, by adjusting the checkpoint triggerings in different phases
> > of a
> > > > >> HybridSource or CDC source.
> > > > >>
> > > > >> This proposal would be a fundamental component in the effort to
> > > > >> further unify Flink's batch and stream processing ability. Please
> > feel
> > > > >> free to reply to this email thread and share with us your
> opinions.
> > > > >>
> > > > >> Best regards.
> > > > >>
> > > > >> Dong and Yunfeng
> > > > >>
> > > > >
> > > >
> > >
> >
>

Reply via email to