Hi again,

Thanks Dong, yes I think your concerns are valid, and that's why I have
previously refined my idea to use one of the backpressure measuring metrics
that we already have.
Either simply `isBackPressured == true` check [1], or
`backPressuredTimeMsPerSecond >= N` (where `N ~= 990`) [2]. That would
address your three first concerns:
  - lack of event time
  - event time unreliability
  - lack of universal threshold value for `pendingRecords`

In a bit more detail, we probably should check (using [1] or [2]) either:
  a) if any of the source subtasks is backpressured
  b) if any of the subtasks is backpressured

In most cases a == b. The only time when that's not true, if some windowed
operator in the middle of the job graph started triggering so many results
that it became backpressured,
but the backpressure didn't last long enough to propagate to sources. For
example that especially might occur if sources are idle. So probably b) is
a better and more generic option.

Regarding your last concern, with spiky traffic, I think the following
algorithm of triggering checkpoints would work pretty well:

public BackpressureDetectingCheckpointTrigger {

private long lastCheckpointTs = System.currentTimeMillis();
private long slowCheckpointInterval = ...;
private long fastCheckpointInteveral = ...;

//code executed periodically, for example once a second, once every 10ms,
or at the 1/10th of the fast checkpoint interval
void maybeTriggerCheckpoint(...) {

  long nextCheckpointTs = lastCheckpointTs;
  if (isAnySubtaskBackpressured()) {
    nextCheckpointTs += slowCheckpointInterval;
  else {
      nextCheckpointTs += fastCheckpointInterval;

  if (nextCheckpointTs >= System.currentTimeMillis()) {
    lastCheckpointTs = System.currentTimeMillis();

This way, if there is a spike of backpressure, it doesn't matter that much.
If the backpressure goes away until the next iteration, the next check will
trigger a checkpoint according to the
fast interval. The slow checkpoint interval will be used only if the
backpressure persists for the whole duration of the slowCheckpointInterval.

We could also go a little bit more fancy, and instead of using only fast or
slow intervals, we could use a continuous spectrum to gradually adjust the
interval, by replacing the first if/else
check with a weighted average:

  int maxBackPressureTime = getSubtaskMaxBackPressuredTimeMsPerSecond();
  long nextCheckpointTs = lastCheckpointTs + slowCheckpointInterval *
maxBackPressureTime + fastCheckpointInterval * (1000 - maxBackPressureTime);

This would further eliminate some potential jitter and make the actual
checkpoint interval a bit more predictable.


wt., 30 maj 2023 o 04:40 Dong Lin <lindon...@gmail.com> napisał(a):

> Let me correct the typo in the last paragraph as below:
> To make the problem even harder, the incoming traffic can be spiky. And the
> overhead of triggering checkpointing can be relatively low, in which case
> it might be more performant (w.r.t. e2e lag) for the Flink job to
> checkpoint at the more frequent interval in the continuous phase in face of
> a spike in the number of pending records buffered in the source operator.
> On Tue, May 30, 2023 at 9:17 AM Dong Lin <lindon...@gmail.com> wrote:
> > Hi Piotrek,
> >
> > Thanks for providing more details of the alternative approach!
> >
> > If I understand your proposal correctly, here are the requirements for it
> > to work without incurring any regression:
> >
> > 1) The source needs a way to determine whether there exists backpressure.
> > 2) If there is backpressure, then it means e2e latency is already high
> > and there should be no harm to use the less frequent checkpointing
> interval.
> > 3) The configuration of the "less frequent checkpointing interval" needs
> > to be a job-level config so that it works for sources other than
> > HybridSource.
> >
> > I would say that if we can find a way for the source to determine the
> > "existence of backpressure" and meet the requirement 2), it would indeed
> be
> > a much more elegant approach that solves more use-cases.
> >
> > The devil is in the details. I am not sure how to determine the
> "existence
> > of backpressure". Let me explain my thoughts and maybe you can help
> > provide the answers.
> >
> > To make the discussion more concrete, let's say the input records do not
> > have event timestamps. Users want to checkpoint at least once every 30
> > minutes to upper-bound the amount of duplicate work after job failover.
> And
> > users want to checkpoint at least once every 30 seconds to upper-bound
> *extra
> > e2e lag introduced by the Flink job* during the continuous processing
> > phase.
> >
> > Since the input records do not have event timestamps, we can not rely on
> > metrics such as currentFetchEventTimeLag [1] to determine the absolute
> e2e
> > lag, because currentFetchEventTimeLag depends on the existence of event
> > timestamps.
> >
> > Also note that, even if the input records have event timestamps and we
> can
> > measure currentFetchEventTimeLag, we still need a threshold to determine
> > whether the value of currentFetchEventTimeLag is too high. One idea might
> > be to use the user-specified "less frequent checkpointing interval" as
> > this threshold, which in this case is 30 seconds. But this approach can
> > also cause regression. For example, let's say the records go through
> > several Kafka/MirrorMaker pipelines after it is generated and before it
> is
> > received by Flink, causing its currentFetchEventTimeLag to be always
> higher
> > than 30 seconds. Then Flink will end up always using the "less frequent
> > checkpointing interval" in the continuous phase, which in this case is 30
> > minutes.
> >
> > Other options to determine the "existence of backpressure" includes using
> > the absolute number of records in the source storage system that are
> > waiting to be fetched (e.g. pendingRecords [1]), or using the absolute
> > number of buffered records in the source output queue. However, I find it
> > hard to reliably determine "e2e latency is already high" based on the
> > absolute number of records. What threshold should we choose to determine
> > that the number of pending records is too many (and it is safe to
> increase
> > the checkpointing interval)?
> >
> > To make the problem even harder, the incoming traffic can be spiky. And
> > the overhead of triggering checkpointing can be relative low, in which
> case
> > it might be more performance (w.r.t. e2e lag) for the Flink job to
> > checkpoint at the higher interval in the continuous phase in face of a
> > spike in the number of pending records buffered in the source operator.
> >
> > The problems described above are the main reasons that I can not find a
> > way to make the alternative approach work. Any thoughts?
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> >
> >
> > On Mon, May 29, 2023 at 11:23 PM Piotr Nowojski <pnowoj...@apache.org>
> > wrote:
> >
> >> Hi
> >>
> >> @Jing
> >>
> >> > Your proposal to dynamically adjust the checkpoint intervals is
> elegant!
> >> It
> >> > makes sense to build it as a generic feature in Flink. Looking forward
> >> to
> >> > it. However, for some user cases, e.g. when users were aware of the
> >> bounded
> >> > sources (in the HybridSource) and care more about the throughput, the
> >> > dynamic adjustment might not be required. Just let those bounded
> sources
> >> > always have larger checkpoint intervals even when there is no back
> >> > pressure. Because no one cares about latency in this case, let's turn
> >> off
> >> > the dynamic adjustment, reduce the checkpoint frequency, have better
> >> > throughput, and save unnecessary source consumption. Did I miss
> anything
> >> > here?
> >>
> >> But why do we need to have two separate mechanisms, if the dynamic
> >> adjustment based on the backpressure/backlog would
> >> achieve basically the same goal as your proposal and would solve both of
> >> the problems? Having two independent solutions
> >> in the same codebase, in the docs, that are achieving basically the same
> >> thing is far from ideal. It would increase both the
> >> complexity of the system and confuse potential users.
> >>
> >> Moreover, as I have already mentioned before, I don't like the current
> >> proposal as it's focusing ONLY on the HybridSource,
> >> which can lead to even worse problem in the future, where many different
> >> sources would have each a completely custom
> >> solution to solve the same/similar problems, complicating the system and
> >> confusing the users even more.
> >>
> >> @Dong,
> >>
> >> > For now I am not able to come up with a good way to support this. I am
> >> happy to discuss the
> >> > pros/cons if you can provide more detail (e.g. API design) regarding
> how
> >> to support this approach
> >>
> >> I have already described such proposal:
> >>
> >> > Piotr:
> >> > I don't know, maybe instead of adding this logic to operator
> >> coordinators, `CheckpointCoordinator` should have a pluggable
> >> `CheckpointTrigger`,
> >> > that the user could configure like a `MetricReporter`. The default one
> >> would be just periodically triggering checkpoints. Maybe
> >> > `BacklogDynamicCheckpointTrigger` could look at metrics[1], check if
> >> `pendingRecords` for some source has exceeded the configured
> >> > threshold and based on that adjust the checkpointing interval
> >> accordingly? This would at least address some of my concerns.
> >>
> >> plus
> >>
> >> > Piotr:
> >> >  Either way, I would like to refine my earlier idea, and instead of
> >> using
> >> metrics like `pendingRecords`, I think we could switch between fast and
> >> > slow checkpointing intervals based on the information if the job is
> >> backpressured or not. My thinking is as follows:
> >> >
> >> > As a user, I would like to have my regular fast checkpointing interval
> >> for low latency, but the moment my system is not keeping up, if the
> >> backpressure
> >> > builds up, or simply we have a huge backlog to reprocess, latency
> >> doesn't
> >> matter anymore. Only throughput matters. So I would like the
> checkpointing
> >> to slow down.
> >> >
> >> > I think this should cover pretty well most of the cases, what do you
> >> think? If this backpressured based behaviour is still not enough, I
> would
> >> still say
> >> > that we should provide plugable checkpoint triggering controllers that
> >> would work based on metrics.
> >>
> >> > change the checkpointing interval based on the "backlog signal",
> >>
> >> What's wrong with the job being backpressured? If job is backpressured,
> we
> >> don't care about individual records latency, only about increasing
> >> the throughput to get out of the backpressure situation ASAP.
> >>
> >> > In the mentioned use-case, users want to have two different
> >> checkpointing
> >> > intervals at different phases of the HybridSource. We should provide
> an
> >> API
> >> > for users to express the extra checkpointing interval in addition to
> the
> >> > existing execution.checkpointing.interval. What would be the
> definition
> >> of
> >> > that API with this alternative approach?
> >>
> >> I think my proposal with `BacklogDynamicCheckpointTrigger` or
> >> `BackpressureDetectingCheckpointTrigger` would solve your motivating use
> >> case
> >> just as well.
> >>
> >> 1. In the catch up phase (reading the bounded source):
> >>   a) if we are under backpressure (common case), system would fallback
> to
> >> the less frequent checkpointing interval
> >>   b) if there is no backpressure (I hope a rare case, there is a
> backlog,
> >> but the source is too slow), Flink cluster has spare resources to
> actually
> >> run more
> >>       frequent checkpointing interval. No harm should be done. But
> >> arguably
> >> using a less frequent checkpointing interval here should be more
> >> desirable.
> >>
> >> 2. In the continuous processing phase (unbounded source)
> >>   a) if we are under backpressure, as I mentioned above, no one cares
> >> about
> >> checkpointing interval and the frequency of committing records to the
> >>       output, as e2e latency is already high due to the backlog in the
> >> sources
> >>   b) if there is no backpressure, that's the only case where the user
> >> actually cares about the frequency of committing records to the output,
> we
> >> are
> >>       using the more frequent checkpointing interval.
> >>
> >> 1b) I think is mostly harmless, and I think could be solved with some
> >> extra
> >> effort
> >> 2a) and 2b) are not solved by your proposal
> >> 2a) and 2b) are applicable to any source, not just HybridSource, which
> is
> >> also not covered by your proposal.
> >>
> >> Best,
> >> Piotrek
> >>
> >>
> >> czw., 25 maj 2023 o 17:29 Jing Ge <j...@ververica.com.invalid>
> >> napisał(a):
> >>
> >> > Hi Dong, Hi Piotr,
> >> >
> >> > Thanks for the clarification.
> >> >
> >> > @Dong
> >> >
> >> > According to the code examples in the FLIP, I thought we are focusing
> on
> >> > the HybridSource scenario. With the current HybridSource
> >> implementation, we
> >> > don't even need to know the boundedness of sources in the
> HybridSource,
> >> > since all sources except the last one must be bounded[1], i.e. only
> the
> >> > last source is unbounded. This makes it much easier to set different
> >> > intervals to sources with different boundedness.
> >> >
> >> > Boundedness in Flink is a top level concept. I think it should be ok
> to
> >> > introduce a top level config for the top level concept. I am not
> >> familiar
> >> > with MySQL CDC. For those specific cases, you are right, your proposal
> >> can
> >> > provide the feature with minimal changes, like I mentioned previously,
> >> it
> >> > is a thoughtful design.  +1
> >> >
> >> > @Piotr
> >> >
> >> > > For example join (windowed/temporal) of two tables backed by a
> hybrid
> >> > > source? I could easily see a scenario where one table with little
> data
> >> > > catches up much more quickly.
> >> >
> >> > I am confused. I thought we were talking about HybridSource which
> >> "solves
> >> > the problem of sequentially reading input from heterogeneous sources
> to
> >> > produce a single input stream."[2]
> >> > I could not find any join within a HybridSource. So, your might mean
> >> > something else the join example and it should be out of the scope, if
> I
> >> am
> >> > not mistaken.
> >> >
> >> > > About the (un)boundness of the input stream. I'm not sure if that
> >> should
> >> > > actually matter. Actually the same issue, with two frequent
> >> checkpointing
> >> > > during a catch up period or when Flink is overloaded, could affect
> >> jobs
> >> > > that are purely unbounded, like continuously reading from Kafka.
> Even
> >> > more,
> >> > > nothing prevents users from actually storing bounded data in a Kafka
> >> > topic.
> >> > > Either way, I would like to refine my earlier idea, and instead of
> >> using
> >> > > metrics like `pendingRecords`, I think we could switch between fast
> >> and
> >> > > slow checkpointing intervals based on the information if the job is
> >> > > backpressured or not. My thinking is as follows:
> >> >
> >> > This is again a very different use case as HybridSource. Users do
> allow
> >> > storing bounded data in a Kafka and if it is not used as the last
> >> source in
> >> > a HybridSource, it is a bounded source and can still benefit from
> larger
> >> > checkpoint interval wrt the high throughput (Kafka or any other
> storage
> >> > does not matter). BTW, the larger checkpoint interval for bounded
> >> source is
> >> > optional, users can use it but must not use it, if they don't care
> about
> >> > the throughput with bounded data.
> >> >
> >> > Your proposal to dynamically adjust the checkpoint intervals is
> >> elegant! It
> >> > makes sense to build it as a generic feature in Flink. Looking forward
> >> to
> >> > it. However, for some user cases, e.g. when users were aware of the
> >> bounded
> >> > sources (in the HybridSource) and care more about the throughput, the
> >> > dynamic adjustment might not be required. Just let those bounded
> sources
> >> > always have larger checkpoint intervals even when there is no back
> >> > pressure. Because no one cares about latency in this case, let's turn
> >> off
> >> > the dynamic adjustment, reduce the checkpoint frequency, have better
> >> > throughput, and save unnecessary source consumption. Did I miss
> anything
> >> > here?
> >> >
> >> > Best regards,
> >> > Jing
> >> >
> >> >
> >> > [1]
> >> >
> >> >
> >>
> https://github.com/apache/flink/blob/6b6df3db466d6a030d5a38ec786ac3297cb41c38/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java#L244
> >> > [2]
> >> >
> >> >
> >>
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/#hybrid-source
> >> >
> >> >
> >> > On Thu, May 25, 2023 at 3:03 PM Dong Lin <lindon...@gmail.com> wrote:
> >> >
> >> > > Hi Piotr,
> >> > >
> >> > > Thanks for the discussion. Please see my comments inline.
> >> > >
> >> > > On Thu, May 25, 2023 at 6:34 PM Piotr Nowojski <
> pnowoj...@apache.org>
> >> > > wrote:
> >> > >
> >> > > > Hi all,
> >> > > >
> >> > > > Thanks for the discussion.
> >> > > >
> >> > > > @Dong
> >> > > >
> >> > > > > In the target use-case, we would like to HybridSource to
> trigger>
> >> > > > checkpoint more frequently when it is read the Kafka Source (than
> >> when
> >> > it
> >> > > > > is reading the HDFS source). We would need to set a flag for the
> >> > > > checkpoint
> >> > > > > trigger to know which source the HybridSource is reading from.
> >> > > >
> >> > > > Is this really your actual goal? Should users care if some table
> >> > defined
> >> > > in
> >> > > >
> >> > >
> >> > > My actual goal is to address the use-case described in the
> motivation
> >> > > section. More specifically,
> >> > > my goal is to provide API that uses can use to express their needed
> >> > > checkpointing interval
> >> > > at different phases of the job. So that Flink can achieve the
> maximum
> >> > > throughput while also meeting
> >> > > users' need for data freshness and failover time.
> >> > >
> >> > >
> >> > > > a meta store is backed by HybridSource or not? I think the actual
> >> goal
> >> > is
> >> > > > this:
> >> > >
> >> > > As a user I would like to have a self adjusting mechanism for
> >> > checkpointing
> >> > > > intervals, so that during the catch up phase my job focuses on
> >> > throughput
> >> > > > to catch up ASAP, while during normal processing (without a large
> >> > > backlog)
> >> > > > Flink is trying to minimize e2e latency.
> >> > > >
> >> > >
> >> > > Sure. It will be great to have a way to support this self-adjusting
> >> > > mechanism. For now I am not able
> >> > > to come up with a good way to support this. I am happy to discuss
> the
> >> > > pros/cons if you can provide
> >> > > more detail (e.g. API design) regarding how to support this
> approach.
> >> > >
> >> > >
> >> > > >
> >> > > > Am I right here?
> >> > > >
> >> > > > > there won't exist any "*conflicting* desired checkpoint trigger"
> >> by
> >> > > > definition
> >> > > >
> >> > > > Ok, arguably there won't be a conflict, but the decision to pick
> >> > minimum
> >> > > > out of the upper bounds might be sub-optimal.
> >> > > >
> >> > >
> >> > > As of today, users need checkpoint in order to address two goals.
> One
> >> > goal
> >> > > is to upper-bound
> >> > > data staleness when there is sink with exactly-once semantics (e.g.
> >> > > Paimon), since those sinks
> >> > > can only output data when checkpoint is triggered. The other goal is
> >> to
> >> > > upper-bound the amount of
> >> > > duplicate work needed after failover.
> >> > >
> >> > > In both cases, users need to upper-bound the checkpointing interval.
> >> This
> >> > > makes it more intuitive
> >> > > for the config to only express the checkpointing interval
> upper-bound.
> >> > >
> >> > >
> >> > > >
> >> > > > > Overall, I am not sure we always want to have a longer
> >> checkpointing
> >> > > > > interval. That really depends on the specific use-case and the
> job
> >> > > graph.
> >> > > >
> >> > > > Yes, that's why I proposed something a little bit more generic.
> >> > > >
> >> > >
> >> > > I am not sure I fully understand the alternative proposal that is
> >> meant
> >> > to
> >> > > be more generic. So it is hard for me to evaluate the pros/cons.
> >> > >
> >> > > I understand that you preferred for the source operator to use the
> >> REST
> >> > API
> >> > > to trigger checkpoints. This sounds
> >> > > like a downside since using REST API is not as easy as using the
> >> > > programming API proposed in the FLIP.
> >> > >
> >> > > Can you help explain the generic approach more concretely, such as
> the
> >> > APIs
> >> > > you would suggest introducing? That would
> >> > > allow me to evaluate the pros/cons and hopefully pick the best
> option.
> >> > >
> >> > >
> >> > > >
> >> > > > > I believe there can be use-case where
> >> > > > > the proposed API is not useful, in which case users can choose
> >> not to
> >> > > use
> >> > > > > the API without incurring any performance regression.
> >> > > >
> >> > > > I'm not saying that this proposal is not useful. Just that we
> might
> >> be
> >> > > able
> >> > > > to solve this problem in a more flexible manner. If we introduce a
> >> > > > partially working solution now at the source level, and later we
> >> will
> >> > > still
> >> > > > need a different solution on another level to cover other use
> cases,
> >> > that
> >> > > > would clog the API and confuse users.
> >> > > >
> >> > >
> >> > > Can you explain why this is "partially working"? Is it because there
> >> are
> >> > > use-cases that should
> >> > > be addressed but not already covered by the proposed approach?
> >> > >
> >> > > If so, can you help explain the use-case that would be useful to
> >> address?
> >> > > With concrete
> >> > > use-cases in mind, we can pick the API with minimal change to
> address
> >> > these
> >> > > use-cases.
> >> > >
> >> > >
> >> > > >
> >> > > > @Jing
> >> > > >
> >> > > > > @Piotr
> >> > > > > Just out of curiosity, do you know any real use cases where
> >> real-time
> >> > > > data is processed before the backlog?
> >> > > >
> >> > > > For example join (windowed/temporal) of two tables backed by a
> >> hybrid
> >> > > > source? I could easily see a scenario where one table with little
> >> data
> >> > > > catches up much more quickly.
> >> > > >
> >> > > > @Jing and @Dong
> >> > > >
> >> > > > About the (un)boundness of the input stream. I'm not sure if that
> >> > should
> >> > > > actually matter. Actually the same issue, with two frequent
> >> > checkpointing
> >> > > >
> >> > >
> >> > > Indeed, I agree with you on this point and prefer not to have this
> >> > proposal
> >> > > depend on the (un)boundness.
> >> > >
> >> > >
> >> > > > during a catch up period or when Flink is overloaded, could affect
> >> jobs
> >> > > > that are purely unbounded, like continuously reading from Kafka.
> >> Even
> >> > > more,
> >> > > > nothing prevents users from actually storing bounded data in a
> Kafka
> >> > > topic.
> >> > > > Either way, I would like to refine my earlier idea, and instead of
> >> > using
> >> > > > metrics like `pendingRecords`, I think we could switch between
> fast
> >> and
> >> > > > slow checkpointing intervals based on the information if the job
> is
> >> > > > backpressured or not. My thinking is as follows:
> >> > > >
> >> > > > As a user, I would like to have my regular fast checkpointing
> >> interval
> >> > > for
> >> > > > low latency, but the moment my system is not keeping up, if the
> >> > > > backpressure builds up, or simply we have a huge backlog to
> >> reprocess,
> >> > > > latency doesn't matter anymore. Only throughput matters. So I
> would
> >> > like
> >> > > > the checkpointing to slow down.
> >> > > >
> >> > > > I think this should cover pretty well most of the cases, what do
> you
> >> > > think?
> >> > > >
> >> > >
> >> > > Thank you for all the comments and this idea. I like this idea. We
> >> > actually
> >> > > thought about this idea before proposing this FLIP.
> >> > >
> >> > > In order to make this idea work, we need to come-up with a good
> >> algorithm
> >> > > that can dynamically change the checkpointing interval based on the
> >> > > "backlog signal", without causing regression w.r.t. failover time
> and
> >> > data
> >> > > freshness. I find it hard to come up with this algorithm due to
> >> > > insufficient "backlog signal".
> >> > >
> >> > > For the use-case mentioned in the motivation section, the data in
> the
> >> > > source does not have event timestamps to help determine the amount
> of
> >> > > backlog. So the only source-of-truth for determining backlog is the
> >> > amount
> >> > > of data buffered in operators. But the buffer size is typically
> >> chosen to
> >> > > be proportional to round-trip-time and throughput. Having a full
> >> buffer
> >> > > does not necessarily mean that the data is lagging behind. And
> >> increasing
> >> > > the checkpointing interval with insufficient "backlog signal" can
> >> have a
> >> > > negative impact on data freshness and failover time.
> >> > >
> >> > > In order to make this idea work, we would need to *provide* that the
> >> > > algorithm would not negatively hurt data freshness and failover time
> >> when
> >> > > it decides to increase checkpointing intervals. For now I cold not
> >> come
> >> > up
> >> > > with such an algorithm.
> >> > >
> >> > > If this backpressured based behaviour is still not enough, I would
> >> still
> >> > > > say that we should provide plugable checkpoint triggering
> >> controllers
> >> > > that
> >> > > > would work based on metrics.
> >> > >
> >> > >
> >> > > I am not sure how to address the use-case mentioned in the
> motivation
> >> > > section, with the pluggable checkpoint trigger + metrics. Can you
> help
> >> > > provide the definition of these APIs and kindly explain how that
> >> works to
> >> > > address the mentioned use-case.
> >> > >
> >> > > In the mentioned use-case, users want to have two different
> >> checkpointing
> >> > > intervals at different phases of the HybridSource. We should provide
> >> an
> >> > API
> >> > > for users to express the extra checkpointing interval in addition to
> >> the
> >> > > existing execution.checkpointing.interval. What would be the
> >> definition
> >> > of
> >> > > that API with this alternative approach?
> >> > >
> >> > > Best,
> >> > > Dong
> >> > >
> >> > >
> >> > > >
> >> > > > Best,
> >> > > > Piotrek
> >> > > >
> >> > > > czw., 25 maj 2023 o 07:47 Dong Lin <lindon...@gmail.com>
> >> napisał(a):
> >> > > >
> >> > > > > Hi Jing,
> >> > > > >
> >> > > > > Thanks for your comments!
> >> > > > >
> >> > > > > Regarding the idea of using the existing "boundedness" attribute
> >> of
> >> > > > > sources, that is indeed something that we might find intuitive
> >> > > > initially. I
> >> > > > > have thought about this idea, but could not find a good way to
> >> make
> >> > it
> >> > > > > work. I will try to explain my thoughts and see if we can find a
> >> > better
> >> > > > > solution.
> >> > > > >
> >> > > > > Here is my understanding of the idea mentioned above: provide a
> >> job
> >> > > level
> >> > > > > config execution.checkpoint.interval.bounded. Flink will use
> this
> >> as
> >> > > the
> >> > > > > checkpointing interval whenever there exists at least one
> running
> >> > > source
> >> > > > > which claims it is under the "bounded" stage.
> >> > > > >
> >> > > > > Note that we can not simply re-use the existing "boundedness"
> >> > attribute
> >> > > > of
> >> > > > > source operators. The reason is that for sources such as MySQL
> >> CDC,
> >> > its
> >> > > > > boundedness can be "continuous_unbounded" because it can run
> >> > > > continuously.
> >> > > > > But MySQL CDC has two phases internally, where the source needs
> to
> >> > > first
> >> > > > > read a snapshot (with bounded amount of data) and then read a
> >> binlog
> >> > > > (with
> >> > > > > unbounded amount of data).
> >> > > > >
> >> > > > > As a result, in order to support optimization for souces like
> >> MySQL
> >> > > CDC,
> >> > > > we
> >> > > > > need to expose an API for the source operator to declare whether
> >> it
> >> > is
> >> > > > > running at a bounded or continuous_unbounded stage. *This
> >> introduces
> >> > > the
> >> > > > > need to define a new concept named "bounded stage".*
> >> > > > >
> >> > > > > Then, we will need to *introduce a new contract between source
> >> > > operators
> >> > > > > and the Flink runtime*, saying that if there is a source that
> >> claims
> >> > it
> >> > > > is
> >> > > > > running at the bounded stage, then Flink will use the "
> >> > > > > execution.checkpoint.interval.bounded" as the checkpointing
> >> interval.
> >> > > > >
> >> > > > > Here are the the concerns I have with this approach:
> >> > > > >
> >> > > > > - The execution.checkpoint.interval.bounded is a top-level
> config,
> >> > > > meaning
> >> > > > > that every Flink user needs to read about its semantics. In
> >> > comparison,
> >> > > > the
> >> > > > > proposed approach only requires users of specific sources (e.g.
> >> > > > > HybridSource, MySQL CDC) to know the new source-specific config.
> >> > > > >
> >> > > > > - It introduces a new top-level concept in Flink to describe the
> >> > > internal
> >> > > > > stages of specific sources (e.g. MySQL CDC). In comparison, the
> >> > > proposed
> >> > > > > approach only requires users of specific sources (e.g.
> >> HybridSource,
> >> > > > MySQL
> >> > > > > CDC) to know this concept, which not only makes the explanation
> >> much
> >> > > > > simpler (since they are already using the specific sources), but
> >> also
> >> > > > > limits the scope of this new concept (only these users need to
> >> know
> >> > > this
> >> > > > > concept).
> >> > > > >
> >> > > > > - It is harder to understand the existing config
> >> > > > > execution.checkpoint.interval.
> >> > > > > Because we need to explain that it is only used when there is no
> >> > source
> >> > > > > with "bounded stage", introducing more if-else for this config.
> In
> >> > > > > comparison, with the proposed approach, the semantics of
> >> > > > > execution.checkpoint.interval is simpler without if/else, as it
> >> will
> >> > > > always
> >> > > > > be applied regardless which sources users are using.
> >> > > > >
> >> > > > > I am happy to discuss if there are better approaches.
> >> > > > >
> >> > > > > Thanks,
> >> > > > > Dong
> >> > > > >
> >> > > > >
> >> > > > > On Wed, May 24, 2023 at 8:23 AM Jing Ge
> >> <j...@ververica.com.invalid>
> >> > > > > wrote:
> >> > > > >
> >> > > > > > Hi Yunfeng, Hi Dong
> >> > > > > >
> >> > > > > > Thanks for the informative discussion! It is a rational
> >> requirement
> >> > > to
> >> > > > > set
> >> > > > > > different checkpoint intervals for different sources in a
> >> > > hybridSource.
> >> > > > > The
> >> > > > > > tiny downside of this proposal, at least for me, is that I
> have
> >> to
> >> > > > > > understand the upper-bound definition of the interval and the
> >> > > built-in
> >> > > > > rule
> >> > > > > > for Flink to choose the minimum value between it and the
> default
> >> > > > interval
> >> > > > > > setting. However, afaiac, the intention of this built-in rule
> >> is to
> >> > > > > > minimize changes in Flink to support the request feature which
> >> is a
> >> > > > very
> >> > > > > > thoughtful move. Thanks for taking care of it. +1 for the
> >> Proposal.
> >> > > > > >
> >> > > > > > Another very rough idea was rising in my mind while I was
> >> reading
> >> > the
> >> > > > > FLIP.
> >> > > > > > I didn't do a deep dive with related source code yet, so
> please
> >> > > correct
> >> > > > > me
> >> > > > > > if I am wrong. The use case shows that two different
> checkpoint
> >> > > > intervals
> >> > > > > > should be set for bounded(historical) stream and
> unbounded(fresh
> >> > > > > real-time)
> >> > > > > > stream sources. It is a trade-off between throughput and
> >> latency,
> >> > > i.e.
> >> > > > > > bounded stream with large checkpoint interval for better
> >> throughput
> >> > > and
> >> > > > > > unbounded stream with small checkpoint interval for lower
> >> latency
> >> > (in
> >> > > > > case
> >> > > > > > of failover). As we could see that the different interval
> >> setting
> >> > > > depends
> >> > > > > > on the boundedness of streams. Since the Source API already
> has
> >> its
> >> > > own
> >> > > > > > boundedness flag[1], is it possible to define two interval
> >> > > > configurations
> >> > > > > > and let Flink automatically set the related one to the source
> >> based
> >> > > on
> >> > > > > the
> >> > > > > > known boundedness? The interval for bounded stream could be
> like
> >> > > > > > execution.checkpoint.interval.bounded(naming could be
> >> > reconsidered),
> >> > > > and
> >> > > > > > the other one for unbounded stream, we could use the existing
> >> one
> >> > > > > > execution.checkpoint.interval by default, or introduce a new
> one
> >> > like
> >> > > > > > execution.checkpoint.interval.unbounded. In this way, no API
> >> change
> >> > > is
> >> > > > > > required.
> >> > > > > >
> >> > > > > > @Piotr
> >> > > > > > Just out of curiosity, do you know any real use cases where
> >> > real-time
> >> > > > > data
> >> > > > > > is processed before the backlog? Semantically, the backlog
> >> contains
> >> > > > > > historical data that has to be processed before the real-time
> >> data
> >> > is
> >> > > > > > allowed to be processed. Otherwise, up-to-date data will be
> >> > > overwritten
> >> > > > > by
> >> > > > > > out-of-date data which turns out to be unexpected results in
> >> real
> >> > > > > business
> >> > > > > > scenarios.
> >> > > > > >
> >> > > > > >
> >> > > > > > Best regards,
> >> > > > > > Jing
> >> > > > > >
> >> > > > > > [1]
> >> > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/apache/flink/blob/fadde2a378aac4293676944dd513291919a481e3/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java#L41
> >> > > > > >
> >> > > > > > On Tue, May 23, 2023 at 5:53 PM Dong Lin <lindon...@gmail.com
> >
> >> > > wrote:
> >> > > > > >
> >> > > > > > > Hi Piotr,
> >> > > > > > >
> >> > > > > > > Thanks for the comments. Let me try to understand your
> >> concerns
> >> > and
> >> > > > > > > hopefully address the concerns.
> >> > > > > > >
> >> > > > > > > >> What would happen if there are two (or more) operator
> >> > > coordinators
> >> > > > > > with
> >> > > > > > > conflicting desired checkpoint trigger behaviour
> >> > > > > > >
> >> > > > > > > With the proposed change, there won't exist any
> "*conflicting*
> >> > > > desired
> >> > > > > > > checkpoint trigger" by definition. Both job-level config and
> >> the
> >> > > > > proposed
> >> > > > > > > API upperBoundCheckpointingInterval() means the upper-bound
> of
> >> > the
> >> > > > > > > checkpointing interval. If there are different upper-bounds
> >> > > proposed
> >> > > > by
> >> > > > > > > different source operators and the job-level config, Flink
> >> will
> >> > try
> >> > > > to
> >> > > > > > > periodically trigger checkpoints at the interval
> >> corresponding to
> >> > > the
> >> > > > > > > minimum of all these proposed upper-bounds.
> >> > > > > > >
> >> > > > > > > >> If one source is processing a backlog and the other is
> >> already
> >> > > > > > > processing real time data..
> >> > > > > > >
> >> > > > > > > Overall, I am not sure we always want to have a longer
> >> > > checkpointing
> >> > > > > > > interval. That really depends on the specific use-case and
> the
> >> > job
> >> > > > > graph.
> >> > > > > > >
> >> > > > > > > The proposed API change mechanism for operators and users to
> >> > > specify
> >> > > > > > > different checkpoint intervals at different periods of the
> >> job.
> >> > > Users
> >> > > > > > have
> >> > > > > > > the option to use the new API to get better performance in
> the
> >> > > > use-case
> >> > > > > > > specified in the motivation section. I believe there can be
> >> > > use-case
> >> > > > > > where
> >> > > > > > > the proposed API is not useful, in which case users can
> choose
> >> > not
> >> > > to
> >> > > > > use
> >> > > > > > > the API without incurring any performance regression.
> >> > > > > > >
> >> > > > > > > >> it might be a bit confusing and not user friendly to have
> >> > > multiple
> >> > > > > > > places that can override the checkpointing behaviour in a
> >> > different
> >> > > > way
> >> > > > > > >
> >> > > > > > > Admittedly, adding more APIs always incur more complexity.
> But
> >> > > > > sometimes
> >> > > > > > we
> >> > > > > > > have to incur this complexity to address new use-cases.
> Maybe
> >> we
> >> > > can
> >> > > > > see
> >> > > > > > if
> >> > > > > > > there are more user-friendly way to address this use-case.
> >> > > > > > >
> >> > > > > > > >> already implemented and is simple from the perspective of
> >> > Flink
> >> > > > > > >
> >> > > > > > > Do you mean that the HybridSource operator should invoke the
> >> rest
> >> > > API
> >> > > > > to
> >> > > > > > > trigger checkpoints? The downside of this approach is that
> it
> >> > makes
> >> > > > it
> >> > > > > > hard
> >> > > > > > > for developers of source operators (e.g. MySQL CDC,
> >> HybridSource)
> >> > > to
> >> > > > > > > address the target use-case. AFAIK, there is no existing
> case
> >> > where
> >> > > > we
> >> > > > > > > require operator developers to use REST API to do their job.
> >> > > > > > >
> >> > > > > > > Can you help explain the benefit of using REST API over
> using
> >> the
> >> > > > > > proposed
> >> > > > > > > API?
> >> > > > > > >
> >> > > > > > > Note that this approach also seems to have the same downside
> >> > > > mentioned
> >> > > > > > > above: "multiple places that can override the checkpointing
> >> > > > > behaviour". I
> >> > > > > > > am not sure there can be a solution to address the target
> >> > use-case
> >> > > > > > without
> >> > > > > > > having multiple places that can affect the checkpointing
> >> > behavior.
> >> > > > > > >
> >> > > > > > > >> check if `pendingRecords` for some source has exceeded
> the
> >> > > > > configured
> >> > > > > > > threshold and based on that adjust the checkpointing
> interval
> >> > > > > accordingly
> >> > > > > > >
> >> > > > > > > I am not sure this approach can address the target use-case
> >> in a
> >> > > > better
> >> > > > > > > way. In the target use-case, we would like to HybridSource
> to
> >> > > trigger
> >> > > > > > > checkpoint more frequently when it is read the Kafka Source
> >> (than
> >> > > > when
> >> > > > > it
> >> > > > > > > is reading the HDFS source). We would need to set a flag for
> >> the
> >> > > > > > checkpoint
> >> > > > > > > trigger to know which source the HybridSource is reading
> from.
> >> > But
> >> > > > IMO
> >> > > > > > the
> >> > > > > > > approach is less intuitive and more complex than having the
> >> > > > > HybridSource
> >> > > > > > > invoke upperBoundCheckpointingInterval() directly once it is
> >> > > reading
> >> > > > > > Kafka
> >> > > > > > > Source.
> >> > > > > > >
> >> > > > > > > Maybe I did not understand the alternative approach rightly.
> >> I am
> >> > > > happy
> >> > > > > > to
> >> > > > > > > discuss more on this topic. WDYT?
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > Best,
> >> > > > > > > Dong
> >> > > > > > >
> >> > > > > > > On Tue, May 23, 2023 at 10:27 PM Piotr Nowojski <
> >> > > > pnowoj...@apache.org>
> >> > > > > > > wrote:
> >> > > > > > >
> >> > > > > > > > Hi,
> >> > > > > > > >
> >> > > > > > > > Thanks for the proposal. However, are you sure that the
> >> > > > > > > > OperatorCoordinator is the right place to place such
> logic?
> >> > What
> >> > > > > would
> >> > > > > > > > happen if there are two (or more) operator coordinators
> with
> >> > > > > > conflicting
> >> > > > > > > > desired checkpoint trigger behaviour? If one source is
> >> > > processing a
> >> > > > > > > backlog
> >> > > > > > > > and the other is already processing real time data, I
> would
> >> > > assume
> >> > > > > that
> >> > > > > > > in
> >> > > > > > > > most use cases you would like to still have the longer
> >> > > > checkpointing
> >> > > > > > > > interval, not the shorter one. Also apart from that, it
> >> might
> >> > be
> >> > > a
> >> > > > > bit
> >> > > > > > > > confusing and not user friendly to have multiple places
> that
> >> > can
> >> > > > > > override
> >> > > > > > > > the checkpointing behaviour in a different way.
> >> > > > > > > >
> >> > > > > > > > FIY in the past, we had some discussions about similar
> >> requests
> >> > > and
> >> > > > > > back
> >> > > > > > > > then we chose to keep the system simpler, and exposed a
> more
> >> > > > generic
> >> > > > > > REST
> >> > > > > > > > API checkpoint triggering mechanism. I know that having to
> >> > > > implement
> >> > > > > > such
> >> > > > > > > > logic outside of Flink and having to call REST calls to
> >> trigger
> >> > > > > > > checkpoints
> >> > > > > > > > might not be ideal, but that's already implemented and is
> >> > simple
> >> > > > from
> >> > > > > > the
> >> > > > > > > > perspective of Flink.
> >> > > > > > > >
> >> > > > > > > > I don't know, maybe instead of adding this logic to
> operator
> >> > > > > > > coordinators,
> >> > > > > > > > `CheckpointCoordinator` should have a pluggable
> >> > > > `CheckpointTrigger`,
> >> > > > > > that
> >> > > > > > > > the user could configure like a `MetricReporter`. The
> >> default
> >> > one
> >> > > > > would
> >> > > > > > > be
> >> > > > > > > > just periodically triggering checkpoints. Maybe
> >> > > > > > > > `BacklogDynamicCheckpointTrigger` could look at
> metrics[1],
> >> > check
> >> > > > if
> >> > > > > > > > `pendingRecords` for some source has exceeded the
> configured
> >> > > > > threshold
> >> > > > > > > and
> >> > > > > > > > based on that adjust the checkpointing interval
> accordingly?
> >> > This
> >> > > > > would
> >> > > > > > > at
> >> > > > > > > > least address some of my concerns.
> >> > > > > > > >
> >> > > > > > > > WDYT?
> >> > > > > > > >
> >> > > > > > > > Best,
> >> > > > > > > > Piotrek
> >> > > > > > > >
> >> > > > > > > > [1]
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> >> > > > > > > >
> >> > > > > > > > wt., 9 maj 2023 o 19:11 Yunfeng Zhou <
> >> > > flink.zhouyunf...@gmail.com>
> >> > > > > > > > napisał(a):
> >> > > > > > > >
> >> > > > > > > >> Hi all,
> >> > > > > > > >>
> >> > > > > > > >> Dong(cc'ed) and I are opening this thread to discuss our
> >> > > proposal
> >> > > > to
> >> > > > > > > >> support dynamically triggering checkpoints from
> operators,
> >> > which
> >> > > > has
> >> > > > > > > >> been documented in FLIP-309
> >> > > > > > > >> <
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255069517
> >> > > > > > > >> >.
> >> > > > > > > >>
> >> > > > > > > >> With the help of the ability proposed in this FLIP, users
> >> > could
> >> > > > > > > >> improve the performance of their Flink job in cases like
> >> when
> >> > > the
> >> > > > > job
> >> > > > > > > >> needs to process both historical batch data and real-time
> >> > > > streaming
> >> > > > > > > >> data, by adjusting the checkpoint triggerings in
> different
> >> > > phases
> >> > > > > of a
> >> > > > > > > >> HybridSource or CDC source.
> >> > > > > > > >>
> >> > > > > > > >> This proposal would be a fundamental component in the
> >> effort
> >> > to
> >> > > > > > > >> further unify Flink's batch and stream processing
> ability.
> >> > > Please
> >> > > > > feel
> >> > > > > > > >> free to reply to this email thread and share with us your
> >> > > > opinions.
> >> > > > > > > >>
> >> > > > > > > >> Best regards.
> >> > > > > > > >>
> >> > > > > > > >> Dong and Yunfeng
> >> > > > > > > >>
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >

Reply via email to