Hi, Piotr & Dong.

Thanks for the discussion.

IMO, I do not think the provided counter proposal is a good idea. There are
some concerns from my side.

1. It is hard to find the error checkpoint.
If there are other errors causing the checkpoint failure, we have to check
every failed checkpoint to find it.

2. It is more confused for the users.
Some users only know the feature, but don't know how we implement it. The
failed checkpoint may make them think the job is unhealthy.

3. Users should be able to set the checkpoint interval for the new backlog
state.
I think it is better to provide a setting for users to change the
checkpoint interval at the new backlog state. The hard-code interval(5x /
10x) is not flexible enough.

Best,
Hang

Dong Lin <lindon...@gmail.com> 于2023年7月5日周三 07:33写道:

> Hi Piotr,
>
> Any suggestion on how we can practically move forward to address the target
> use-case?
>
> My understanding is that the current proposal does not have any
> correctness/performance issues. And it allows the extension to support all
> the extra use-case without having to throw away the proposed APIs.
>
> If you prefer to have a better solution with simpler APIs and yet same or
> better correctness/performance for the target use-case, could you please
> kindly explain its API design so that we can continue the discussion?
>
>
> Best,
> Dong
>
> On Mon, Jul 3, 2023 at 6:39 PM Dong Lin <lindon...@gmail.com> wrote:
>
> > Hi Piotr,
> >
> > Please see my comments inline.
> >
> > On Mon, Jul 3, 2023 at 5:19 PM Piotr Nowojski <piotr.nowoj...@gmail.com>
> > wrote:
> >
> >> Hi Dong,
> >>
> >> Starting from the end:
> >>
> >> > It seems that the only benefit of this approach is to avoid"
> >> > adding SplitEnumeratorContext#setIsProcessingBacklog."
> >>
> >> Yes, that's the major benefit of this counter-proposal.
> >>
> >> > In the target use-case, user still want to do checkpoint (though at a"
> >> > larger interval) when there is backlog. And HybridSource need to know
> >> the"
> >> > expected checkpoint interval during backlog in order to determine
> >> whether"
> >> > it should keep throwing CheckpointException. Thus, we still need to
> add"
> >> > execution.checkpointing.interval-during-backlog for user to specify
> >> this"
> >> > information."
> >> >
> >> > The downside of this approach is that it is hard to enforce the"
> >> > semantics specified by
> execution.checkpointing.interval-during-backlog.
> >> For"
> >> > example, suppose execution.checkpointing.interval =3 minute and"
> >> > execution.checkpointing.interval-during-backlog = 7 minutes. During
> the"
> >> > backlog phase, checkpoint coordinator will still trigger the
> checkpoint"
> >> > once every 3 minutes. HybridSource will need to reject 2 out of the 3"
> >> > checkpoint invocation, and the effective checkpoint interval will be
> 9"
> >> > minutes."
> >>
> >> Does it really matter what's the exact value of the longer interval? Can
> >> not we
> >> hard-code it to be 5x or 10x of the base checkpoint interval? If there
> is
> >> a
> >> notice
> >> able overhead from the base interval slowing down records processing
> rate,
> >> reducing this interval by a factor of 5x or 10x, would fix performance
> >> issue for
> >> vast majority of users. So a source could just skip 4 out of 5 or 9 out
> of
> >> 10
> >> checkpoints.
> >>
> >
> > Yes, I think the exact value of the longer interval matters.
> >
> > The main reason we need two intervals is for jobs which have two-phase
> > commit sink. The short interval typically represents the interval that a
> > user can accept for the two-phase commit sink to buffer data (since it
> can
> > only emit data when checkpoint is triggered). And the long interval
> > typically represents the maximum amount of duplicate work (in terms of
> > time) that a job need to re-do after failover.
> >
> > Since there is no intrinsic relationship between the data buffer interval
> > (related to processing latency) and the failover boundary, I don't think
> we
> > can hardcode it to be 5x or 10x of the base checkpoint interval.
> >
> >
> >> Alternatively we could introduce a config option like:
> >>
> >> execution.checkpointing.long-interval
> >>
> >> that might be re-used in the future, with more fancy algorithms, but I
> >> don't see
> >> much value in doing that.
> >
> >
> >> > Overall, I think the solution is a bit hacky. I think it is preferred
> >> to"
> >> > throw exception only when there is indeed error. If we don't need to
> >> check"
> >> > a checkpoint, it is preferred to not trigger the checkpoint in the
> >> first"
> >> > place. And I think adding
> SplitEnumeratorContext#setIsProcessingBacklog
> >> is"
> >> > probably not that much of a big deal."
> >>
> >> Yes it's hacky, but at least it doesn't require extending the Public API
> >> for a
> >> quite limited solution, that only targets one or two sources that are
> >> rarely used.
> >>
> >
> > I am not sure it is fair to say MySQL CDC source is "rarely used".
> > ververica/flink-cdc-connectors GitHub repo has 4K + starts. Also, note
> that
> > the proposed feature can be useful for CDC sources with an internal
> > "backlog phase". Its usage is not limited to just the two sources
> mentioned
> > in the FLIP.
> >
> >
> >>
> >> ================
> >>
> >> About the idea of emitting "RecordAttributes(isBacklog=..)". I have a
> >> feeling that
> >> this is overly complicated and would require every operator/function to
> >> handle that.
> >>
> >> Yes it would cover even more use cases, at the cost of complicating the
> >> system by
> >> a lot. IMO it looks like something we could do if there would indeed by
> a
> >> high
> >> demand of such a feature, after we provide some baseline generic
> solution,
> >> that
> >> doesn't require any configuration.
> >>
> >> I have a feeling that by just statically looking at the shape of the job
> >> graph and how
> >> it is connected, we could deduce almost the same things.
> >>
> >
> > Note that pretty much every FLIP will address my use-case at the cost
> > of complicating the system. I understand you have the feeling that this
> > complexity is not worthwhile. However, as we can see from the comments in
> > this thread and the votes in the voting thread, many
> > committers/developers/users actually welcome the feature introduced in
> this
> > FLIP.
> >
> > I am happy to work with you together to find a more generic and simpler
> > solution, as long as that solution can address the target use-case
> without
> > hurting user-experience. The alternative solution which you have
> mentioned
> > so far, unfortunately, still has drawbacks as mentioned earlier.
> >
> >
> >> Also:
> >>
> >> >  - the FLIP suggests to use the long checkpointing interval as long as
> >> any subtask is processing the backlog. Are you sure that's the right
> call?
> >> What if other
> >> >  sources are producing fresh records, and those fresh records are
> >> reaching sinks? It could happen either with disjoint JobGraph,
> >> embarrassing
> >> parallel
> >> >  JobGraph (no keyBy/unions/joins), or even with keyBy. Fresh records
> can
> >> slip using a not backpressured input channel through generally
> >> backpressured
> >> >  keyBy exchange. How should we handle that? This problem I think will
> >> affect every solution, including my previously proposed generic one, but
> >> we
> >> should
> >> >  discuss how to handle that as well.
> >>
> >> By this I didn't necessarily mean that we have to solve it right now.
> >>
> >> ================
> >>
> >> > The moments above seem kind of "abstract". I am hoping to understand
> >> more
> >> > technical details behind these comments so that we can see how to
> >> address
> >> > the concern.
> >>
> >> Over the span of this discussion I think I have already explained many
> >> times what
> >> bothers me in the current proposal.
> >>
> >
> > If I understand you correctly, your concern is that the solution is not
> > generic, can not address the extra use-case you want, and is too
> > complicated.
> >
> > I think it is fair to say that no FLIP can be fully generic to address
> all
> > use-case, just like the fact that Flink is still not perfect and still
> need
> > FLIPs to improve its performance/usage. And whether an API is too
> > complicated really depends on whether there exists a better option.
> >
> > Fairly speaking, different people can have different opinions on whether
> a
> > proposal is generic and whether it is too complicated. At least from the
> > comments from other developers in this thread and in the voting thread,
> > many developers and users actually like this current proposal. I hope you
> > understand that your concerns mentioned above are subjective and not
> > unnecessarily shared by other developers.
> >
> > Honestly speaking, if we block this FLIP just because anyone thinks it
> can
> > be better (yet without any concrete proposal for making it better), I
> feel
> > it is not a good result to other developers and users who would like to
> > have this feature to address their existing pain points.
> >
> > I am wondering if you can be a bit more lenient, consider the opinion of
> > other developers (not just me) who have voted, and allow us to make
> > incremental progress even though you might find it not meeting your
> > expectations in its current form?
> >
> >
> >> > For example, even if a FLP does not address all use-case
> >> > (which is arguably true for every FLIP), its solution does not
> >> necessarily
> >> > need to be thrown away later as long as it is extensible
> >>
> >> That's my main point. I haven't yet seen how proposals from this FLIP,
> >> that
> >> could
> >> extend FLIP-309 to cover the generic use case and:
> >>  - Would work out of the box, for all or majority of the properly
> >> implemented sources.
> >>
> >
> > I would argue that for the target use-case mentioned in the motivation
> > section, it is impossible to address the use-case without any code change
> > from the source, and still have the same stability as the current
> proposal.
> > The reason is that when the source do not have event-time, we can not
> > correctly derive whether the MySQL CDC source is in the snapshot/binlog
> > phase by just looking at the processing time related metrics or
> > backpressure.
> >
> > We have discussed this issue in detail in the earlier emails of this
> > thread. I also mentioned that I will add follow-up FLIPs to make use of
> the
> > event-time metrics and backpressure metrics to derive backlog status. But
> > that can not replace the capability for source to explicitly specify its
> > metrics.
> >
> >
> >
> >>  - Would require zero or very minimal configuration/input from the user.
> >> Especially
> >>
> >
> > Note that the current proposal only requires the user to specify one
> extra
> > config, namely execution.checkpointing.interval-during-backlog. I don't
> > think we are able to reduce the extra config to be zero, due to the
> reason
> > explained above (i.e. separate interval for failover and data freshness).
> > Therefore, the configuration from user is already minimal.
> >
> >
> >>    wouldn't require implementing some custom things in every source.
> >>  - Could be made to work well enough in the (vast?) majority of the use
> >> cases.
> >
> >
> > I think you are talking about the case of delaying checkpoint interval
> > when backpressure is high etc. I would argue this is a use-case not
> > targeted by this FLIP and it can be addressed in a follow-up FLIP.
> >
> >
> >>
> >
> >
> >> > So we probably need to understand specifically why the proposed APIs
> >> would be thrown away.
> >>
> >> As I have mentioned many times why that's the case:
> >> 1. This solution is not generic enough
> >> 2. I can see solutions that wouldn't require modification of every
> source
> >> 3. They would have zero overlap with the interfaces extension from this
> >> FLIP
> >>
> >
> >
> > I have the feeling our discussion is kind of in a loop, where you ask for
> > a solution without any change to the source (so that it is generic), I
> > explain why I am not able to find such a solution and the drawback of
> your
> > proposed solution, and then you repeat the same ask and insist this is
> > possible.
> >
> > If you can find a solution that wouldn't require modification of every
> > source and still address the target use-case well, could you please
> kindly
> > rephrase your solution so that we can revisit it?
> >
> > I assume this solution would not require extra config from users, would
> > not cause the job to use long checkpoint interval due to random/short
> > traffic spikes, and would not cause the job to use the short interval
> when
> > the job is still reading backlog data.
> >
> > I would be happy to be proven wrong if you else can provide such a
> > solution without the aforementioned drawbacks. I just hope we don't block
> > the FLIP forever for a goal that no one can address.
> >
> > Best,
> > Dong
> >
> >
> >>
> >> Best,
> >> Piotrek
> >>
> >> sob., 1 lip 2023 o 17:01 Dong Lin <lindon...@gmail.com> napisał(a):
> >>
> >> > Hi Piotr,
> >> >
> >> > Thank you for providing further suggestions to help improve the API.
> >> Please
> >> > see my comments inline.
> >> >
> >> > On Fri, Jun 30, 2023 at 10:35 PM Piotr Nowojski <pnowoj...@apache.org
> >
> >> > wrote:
> >> >
> >> > > Hey,
> >> > >
> >> > > Sorry for a late reply, I was OoO for a week. I have three things to
> >> > point
> >> > > out.
> >> > >
> >> > > 1. ===============
> >> > >
> >> > > The updated proposal is indeed better, but to be honest I still
> don't
> >> > like
> >> > > it, for mostly the same reasons that I have mentioned earlier:
> >> > > - only a partial solution, that doesn't address all use cases, so we
> >> > would
> >> > > need to throw it away sooner or later
> >> > > - I don't see and it hasn't been discussed how to make it work out
> of
> >> the
> >> > > box for all sources
> >> > > - somehow complicating API for people implementing Sources
> >> > > - it should work out of the box for most of the sources, or at least
> >> to
> >> > > have that potential in the future
> >> > >
> >> >
> >> > The moments above seem kind of "abstract". I am hoping to understand
> >> more
> >> > technical details behind these comments so that we can see how to
> >> address
> >> > the concern. For example, even if a FLP does not address all use-case
> >> > (which is arguably true for every FLIP), its solution does not
> >> necessarily
> >> > need to be thrown away later as long as it is extensible. So we
> probably
> >> > need to understand specifically why the proposed APIs would be thrown
> >> away.
> >> >
> >> > Similarly, we would need to understand if there is a better design to
> >> make
> >> > the API simpler and work out of the box etc. in order to decide how to
> >> > address these comments.
> >> >
> >> >
> >> > > On top of that:
> >> > > - the FLIP I think is missing how to hook up SplitEnumeratorContext
> >> and
> >> > > CheckpointCoordinator to pass "isProcessingBacklog"
> >> > >
> >> >
> >> > I think it can be passed via the following function chain:
> >> > - CheckpointCoordinator invokes
> >> > OperatorCoordinatorCheckpointContext#isProcessingBacklog (via
> >> > coordinatorsToCheckpoint) to get this information.
> >> > - OperatorCoordinatorHolder implements
> >> > OperatorCoordinatorCheckpointContext#isProcessingBacklog and returns
> >> > OperatorCoordinator#isProcessingBacklog (via coordinator)
> >> > - SourceCoordinator implements OperatorCoordinator#isProcessingBacklog
> >> and
> >> > returns SourceCoordinatorContext#isProcessingBacklog
> >> > - SourceCoordinatorContext will implement
> >> > SplitEnumeratorContext#setIsProcessingBacklog and stores the given
> >> > information in a variable.
> >> >
> >> > Note that it involves only internal API. We might be able to find a
> >> simpler
> >> > solution with less functions on the path. As long as the above
> solution
> >> > works without having any performance or correctness, I think maybe we
> >> > should focus on the public API design and discuss the implementation
> in
> >> the
> >> > PR review?
> >> >
> >> > - the FLIP suggests to use the long checkpointing interval as long as
> >> any
> >> > > subtask is processing the backlog. Are you sure that's the right
> call?
> >> > What
> >> > > if other
> >> > >   sources are producing fresh records, and those fresh records are
> >> > reaching
> >> > > sinks? It could happen either with disjoint JobGraph, embarrassing
> >> > parallel
> >> > >   JobGraph (no keyBy/unions/joins), or even with keyBy. Fresh
> records
> >> can
> >> > > slip using a not backpressured input channel through generally
> >> > > backpressured
> >> > >   keyBy exchange. How should we handle that? This problem I think
> will
> >> > > affect every solution, including my previously proposed generic one,
> >> but
> >> > we
> >> > > should
> >> > >   discuss how to handle that as well.
> >> > >
> >> >
> >> > Good question. Here is my plan to improve the solution in a follow-up
> >> FLIP:
> >> >
> >> > - Let every subtask of every source operator emit
> >> > RecordAttributes(isBacklog=..)
> >> > - Let every subtask of every operator handle the RecordAttributes
> >> received
> >> > from inputs and emit RecordAttributes to downstream operators. Flink
> >> > runtime can derive this information for every one-input operator. For
> an
> >> > operator with two inputs, if one input has isBacklog=true and the
> other
> >> has
> >> > isBacklog=false, the operator should determine the isBacklog for its
> >> output
> >> > records based on its semantics.
> >> > - If there exists a subtask of a two-phase commit operator with
> >> > isBacklog=false, the operator should let JM know so that the JM will
> use
> >> > the short checkpoint interval (for data freshness). Otherwise, JM will
> >> use
> >> > the long checkpoint interval.
> >> >
> >> > The above solution guarantees that, if every two-input operator has
> >> > explicitly specified their isBacklog based on the inputs' isBacklog,
> >> then
> >> > the JM will use the short checkpoint interval if and only if it is
> >> useful
> >> > for at least one subtask of one two-phase commit operator.
> >> >
> >> > Note that even the above solution might not be perfect. Suppose there
> >> > exists one subtask of the two-phase commit operator has
> isBacklog=false,
> >> > but every other subtasks of this operator has isBacklog=true, due to
> >> load
> >> > imbalance. In this case, it might be beneficial to use the long
> >> checkpoint
> >> > interval to improve the average data freshness for this operator.
> >> However,
> >> > as we get into more edge case, the solution will become more
> complicated
> >> > (e.g. providing more APIs for user to specify their intended strategy)
> >> and
> >> > there will be less additional benefits (because these scenarios are
> less
> >> > common).
> >> >
> >> > Also, note that we can support the solution described above without
> >> > throwing away any public API currently proposed in FLIP-309. More
> >> > specifically, we still
> >> > need execution.checkpointing.interval-during-backlog. Sources such as
> >> > HybridSource and MySQL CDC source can still use
> >> setIsProcessingBacklog() to
> >> > specify the backlog status. We just need to update
> >> setIsProcessingBacklog()
> >> > to emit RecordAttributes(isBacklog=..) upon invocation.
> >> >
> >> > I hope the above solution is reasonable and can address most of your
> >> > concerns. And I hope we can use FLIP-309 to solve a large chunk of the
> >> > existing problems in Flink 1.18 release and make further improvements
> in
> >> > followup FLIPs. What do you think?
> >> >
> >> >
> >> >
> >> > > 2. ===============
> >> > >
> >> > > Regarding the current proposal, there might be a way to make it
> >> actually
> >> > > somehow generic (but not pluggable). But it might require slightly
> >> > > different
> >> > > interfaces. We could keep the idea that
> >> SourceCoordinator/SplitEnumerator
> >> > > is responsible for switching between slow/fast processing modes. It
> >> could
> >> > > be
> >> > > implemented to achieve something like in the FLIP-309 proposal, but
> >> apart
> >> > > of that, the default behaviour would be a built in mechanism working
> >> like
> >> > > this:
> >> > > 1. Every SourceReaderBase checks its metrics and its state, to
> decide
> >> if
> >> > it
> >> > > considers itself as "processingBacklog" or "veryBackpressured". The
> >> base
> >> > >     implementation could do it via a similar mechanism as I was
> >> proposing
> >> > > previously, via looking at the busy/backPressuredTimeMsPerSecond,
> >> > >     pendingRecords and processing rate.
> >> > > 2. SourceReaderBase could send an event with
> >> > > "processingBacklog"/"veryBackpressured" state.
> >> > > 3. SourceCoordinator would collect those events, and decide what
> >> should
> >> > it
> >> > > do, whether it should switch whole source to the
> >> > >     "processingBacklog"/"veryBackpressured" state or not.
> >> > >
> >> > That could provide eventually a generic solution that works fo every
> >> > > source that reports the required metrics. Each source implementation
> >> > could
> >> > > decide
> >> > > whether to use that default behaviour, or if maybe it's better to
> >> > override
> >> > > the default, or combine default with something custom (like
> >> > HybridSource).
> >> > >
> >> > > And as a first step, we could implement that mechanism only on the
> >> > > SourceCoordinator side, without events, without the default generic
> >> > > solution and use
> >> > > it in the HybridSource/MySQL CDC.
> >> > >
> >> > > This approach has some advantages compared to my previous proposal:
> >> > >   + no need to tinker with metrics and pushing metrics from TMs to
> JM
> >> > >   + somehow communicating this information via Events seems a bit
> >> cleaner
> >> > > to me and avoids problems with freshness of the metrics
> >> > > And some issues:
> >> > >   - I don't know if it can be made pluggable in the future. If a
> user
> >> > could
> >> > > implement a custom `CheckpointTrigger` that would automatically work
> >> with
> >> > > all/most
> >> > >     of the pre-existing sources?
> >> > >   - I don't know if it can be expanded if needed in the future, to
> >> make
> >> > > decisions based on operators in the middle of a jobgraph.
> >> > >
> >> >
> >> > Thanks for the proposal. Overall, I agree it is valuable to be able to
> >> > determine the isProcessingBacklog based on the source reader metrics.
> >> >
> >> > I will probably suggest making the following changes upon your idea:
> >> > - Instead of letting the source reader send events to the source
> >> > coordinator, the source reader can emit RecordAttributes(isBacklog=..)
> >> as
> >> > described earlier. We will let two-phase commit operator to decide
> >> whether
> >> > they need the short checkpoint interval.
> >> > - We consider isProcessingBacklog=true when watermarkLag is larger
> than
> >> a
> >> > threshold.
> >> >
> >> > This is a nice addition. But I think we still need extra information
> >> from
> >> > user (e.g. the threshold whether the watermarkLag or
> >> > backPressuredTimeMsPerSecond is too high) with extra public APIs for
> >> this
> >> > feature to work reliably. This is because there is no default
> algorithm
> >> > that works in all cases without extra specification from users, due to
> >> the
> >> > issues around the default algorithm we discussed previously.
> >> >
> >> > Overall, I think the current proposal in FLIP-309 is a first step
> >> towards
> >> > addressing these problems. The API for source enumerator to explicitly
> >> set
> >> > isProcessingBacklog based on its status is useful even if we can
> support
> >> > metrics-based solutions.
> >> >
> >> > If that looks reasonable, can we agree to make incremental improvement
> >> and
> >> > work on the metrics-based solution in a followup FLIP?
> >> >
> >> >
> >> > >
> >> > > 3. ===============
> >> > >
> >> > > Independent of that, during some brainstorming between me, Chesnay
> and
> >> > > Stefan Richter, an idea popped up, that I think could be a counter
> >> > proposal
> >> > > as
> >> > > an intermediate solution that probably effectively works the same
> way
> >> as
> >> > > current FLIP-309.
> >> > >
> >> > > Inside a HybridSource, from it's SplitEnumerator#snapshotState
> method,
> >> > can
> >> > > not you throw an exception like
> >> > > `new CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS)` or `new
> >> > > CheckpointException(TRIGGER_CHECKPOINT_FAILURE)`?
> >> > > Actually we could also introduce a dedicated
> `CheckpointFailureReason`
> >> > for
> >> > > that purpose and handle it some special way in some places (like
> maybe
> >> > hide
> >> > > such rejected checkpoints from the REST API/WebUI). We could
> >> elaborate on
> >> > > this a bit more, but after a brief thinking  I could see it actually
> >> > > working well
> >> > > enough without any public facing changes. But I might be wrong here.
> >> > >
> >> > > If this feature actually grabs traction, we could expand it to
> >> something
> >> > > more sophisticated available via a public API in the future.
> >> > >
> >> >
> >> > In the target use-case, user still want to do checkpoint (though at a
> >> > larger interval) when there is backlog. And HybridSource need to know
> >> the
> >> > expected checkpoint interval during backlog in order to determine
> >> whether
> >> > it should keep throwing CheckpointException. Thus, we still need to
> add
> >> > execution.checkpointing.interval-during-backlog for user to specify
> this
> >> > information.
> >> >
> >> > It seems that the only benefit of this approach is to avoid
> >> > adding SplitEnumeratorContext#setIsProcessingBacklog.
> >> >
> >> > The downside of this approach is that it is hard to enforce the
> >> > semantics specified by
> execution.checkpointing.interval-during-backlog.
> >> For
> >> > example, suppose execution.checkpointing.interval =3 minute and
> >> > execution.checkpointing.interval-during-backlog = 7 minutes. During
> the
> >> > backlog phase, checkpoint coordinator will still trigger the
> checkpoint
> >> > once every 3 minutes. HybridSource will need to reject 2 out of the 3
> >> > checkpoint invocation, and the effective checkpoint interval will be 9
> >> > minutes.
> >> >
> >> > Overall, I think the solution is a bit hacky. I think it is preferred
> to
> >> > throw exception only when there is indeed error. If we don't need to
> >> check
> >> > a checkpoint, it is preferred to not trigger the checkpoint in the
> first
> >> > place. And I think adding
> SplitEnumeratorContext#setIsProcessingBacklog
> >> is
> >> > probably not that much of a big deal.
> >> >
> >> > Thanks for all the comments. I am looking forward to your thoughts.
> >> >
> >> > Best,
> >> > Dong
> >> >
> >> >
> >> > >
> >> > > ===============
> >> > >
> >> > > Sorry for disturbing this FLIP discussion and voting.
> >> > >
> >> > > Best,
> >> > > Piotrek
> >> > >
> >> > > czw., 29 cze 2023 o 05:08 feng xiangyu <xiangyu...@gmail.com>
> >> > napisał(a):
> >> > >
> >> > > > Hi Dong,
> >> > > >
> >> > > > Thanks for your quick reply. I think this has truly solved our
> >> problem
> >> > > and
> >> > > > will enable us upgrade our existing jobs more seamless.
> >> > > >
> >> > > > Best,
> >> > > > Xiangyu
> >> > > >
> >> > > > Dong Lin <lindon...@gmail.com> 于2023年6月29日周四 10:50写道:
> >> > > >
> >> > > > > Hi Feng,
> >> > > > >
> >> > > > > Thanks for the feedback. Yes, you can configure the
> >> > > > > execution.checkpointing.interval-during-backlog to effectively
> >> > disable
> >> > > > > checkpoint during backlog.
> >> > > > >
> >> > > > > Prior to your comment, the FLIP allows users to do this by
> setting
> >> > the
> >> > > > > config value to something large (e.g. 365 day). After thinking
> >> about
> >> > > this
> >> > > > > more, we think it is more usable to allow users to achieve this
> >> goal
> >> > by
> >> > > > > setting the config value to 0. This is consistent with the
> >> existing
> >> > > > > behavior of execution.checkpointing.interval -- the checkpoint
> is
> >> > > > disabled
> >> > > > > if user set execution.checkpointing.interval to 0.
> >> > > > >
> >> > > > > We have updated the description of
> >> > > > > execution.checkpointing.interval-during-backlog
> >> > > > > to say the following:
> >> > > > > ... it is not null, the value must either be 0, which means the
> >> > > > checkpoint
> >> > > > > is disabled during backlog, or be larger than or equal to
> >> > > > > execution.checkpointing.interval.
> >> > > > >
> >> > > > > Does this address your need?
> >> > > > >
> >> > > > > Best,
> >> > > > > Dong
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > On Thu, Jun 29, 2023 at 9:23 AM feng xiangyu <
> >> xiangyu...@gmail.com>
> >> > > > wrote:
> >> > > > >
> >> > > > > > Hi Dong and Yunfeng,
> >> > > > > >
> >> > > > > > Thanks for the proposal, your flip sounds very useful from my
> >> > > > > perspective.
> >> > > > > > In our business, when we using hybrid source in production we
> >> also
> >> > > met
> >> > > > > the
> >> > > > > > problem described in your flip.
> >> > > > > > In our solution, we tend to skip making any checkpoints before
> >> all
> >> > > > batch
> >> > > > > > tasks have finished and resume the periodic checkpoint only in
> >> > > > streaming
> >> > > > > > phrase. Within this flip, we can solve our problem in a more
> >> > generic
> >> > > > way.
> >> > > > > >
> >> > > > > > However, I am wondering if we still want to skip making any
> >> > > checkpoints
> >> > > > > > during historical phrase, can we set this configuration
> >> > > > > > "execution.checkpointing.interval-during-backlog" equals "-1"
> to
> >> > > cover
> >> > > > > this
> >> > > > > > case?
> >> > > > > >
> >> > > > > > Best,
> >> > > > > > Xiangyu
> >> > > > > >
> >> > > > > > Hang Ruan <ruanhang1...@gmail.com> 于2023年6月28日周三 16:30写道:
> >> > > > > >
> >> > > > > > > Thanks for Dong and Yunfeng's work.
> >> > > > > > >
> >> > > > > > > The FLIP looks good to me. This new version is clearer to
> >> > > understand.
> >> > > > > > >
> >> > > > > > > Best,
> >> > > > > > > Hang
> >> > > > > > >
> >> > > > > > > Dong Lin <lindon...@gmail.com> 于2023年6月27日周二 16:53写道:
> >> > > > > > >
> >> > > > > > > > Thanks Jack, Jingsong, and Zhu for the review!
> >> > > > > > > >
> >> > > > > > > > Thanks Zhu for the suggestion. I have updated the
> >> configuration
> >> > > > name
> >> > > > > as
> >> > > > > > > > suggested.
> >> > > > > > > >
> >> > > > > > > > On Tue, Jun 27, 2023 at 4:45 PM Zhu Zhu <
> reed...@gmail.com>
> >> > > wrote:
> >> > > > > > > >
> >> > > > > > > > > Thanks Dong and Yunfeng for creating this FLIP and
> driving
> >> > this
> >> > > > > > > > discussion.
> >> > > > > > > > >
> >> > > > > > > > > The new design looks generally good to me. Increasing
> the
> >> > > > > checkpoint
> >> > > > > > > > > interval when the job is processing backlogs is easier
> for
> >> > > users
> >> > > > to
> >> > > > > > > > > understand and can help in more scenarios.
> >> > > > > > > > >
> >> > > > > > > > > I have one comment about the new configuration.
> >> > > > > > > > > Naming the new configuration
> >> > > > > > > > > "execution.checkpointing.interval-during-backlog" would
> be
> >> > > better
> >> > > > > > > > > according to Flink config naming convention.
> >> > > > > > > > > It is also because that nested config keys should be
> >> avoided.
> >> > > See
> >> > > > > > > > > FLINK-29372 for more details.
> >> > > > > > > > >
> >> > > > > > > > > Thanks,
> >> > > > > > > > > Zhu
> >> > > > > > > > >
> >> > > > > > > > > Jingsong Li <jingsongl...@gmail.com> 于2023年6月27日周二
> >> 15:45写道:
> >> > > > > > > > > >
> >> > > > > > > > > > Looks good to me!
> >> > > > > > > > > >
> >> > > > > > > > > > Thanks Dong, Yunfeng and all for your discussion and
> >> > design.
> >> > > > > > > > > >
> >> > > > > > > > > > Best,
> >> > > > > > > > > > Jingsong
> >> > > > > > > > > >
> >> > > > > > > > > > On Tue, Jun 27, 2023 at 3:35 PM Jark Wu <
> >> imj...@gmail.com>
> >> > > > > wrote:
> >> > > > > > > > > > >
> >> > > > > > > > > > > Thank you Dong for driving this FLIP.
> >> > > > > > > > > > >
> >> > > > > > > > > > > The new design looks good to me!
> >> > > > > > > > > > >
> >> > > > > > > > > > > Best,
> >> > > > > > > > > > > Jark
> >> > > > > > > > > > >
> >> > > > > > > > > > > > 2023年6月27日 14:38,Dong Lin <lindon...@gmail.com>
> 写道:
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > Thank you Leonard for the review!
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > Hi Piotr, do you have any comments on the latest
> >> > > proposal?
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > I am wondering if it is OK to start the voting
> >> thread
> >> > > this
> >> > > > > > week.
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu <
> >> > > > > xbjt...@gmail.com>
> >> > > > > > > > > wrote:
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >> Thanks Dong for driving this FLIP forward!
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >> Introducing  `backlog status` concept for flink
> job
> >> > > makes
> >> > > > > > sense
> >> > > > > > > to
> >> > > > > > > > > me as
> >> > > > > > > > > > > >> following reasons:
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >> From concept/API design perspective, it’s more
> >> general
> >> > > and
> >> > > > > > > natural
> >> > > > > > > > > than
> >> > > > > > > > > > > >> above proposals as it can be used in HybridSource
> >> for
> >> > > > > bounded
> >> > > > > > > > > records, CDC
> >> > > > > > > > > > > >> Source for history snapshot and general sources
> >> like
> >> > > > > > KafkaSource
> >> > > > > > > > for
> >> > > > > > > > > > > >> historical messages.
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >> From user cases/requirements, I’ve seen many
> users
> >> > > > manually
> >> > > > > to
> >> > > > > > > set
> >> > > > > > > > > larger
> >> > > > > > > > > > > >> checkpoint interval during backfilling and then
> >> set a
> >> > > > > shorter
> >> > > > > > > > > checkpoint
> >> > > > > > > > > > > >> interval for real-time processing in their
> >> production
> >> > > > > > > environments
> >> > > > > > > > > as a
> >> > > > > > > > > > > >> flink application optimization. Now, the flink
> >> > framework
> >> > > > can
> >> > > > > > > make
> >> > > > > > > > > this
> >> > > > > > > > > > > >> optimization no longer require the user to set
> the
> >> > > > > checkpoint
> >> > > > > > > > > interval and
> >> > > > > > > > > > > >> restart the job multiple times.
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >> Following supporting using larger checkpoint for
> >> job
> >> > > under
> >> > > > > > > backlog
> >> > > > > > > > > status
> >> > > > > > > > > > > >> in current FLIP, we can explore supporting larger
> >> > > > > > > > > parallelism/memory/cpu
> >> > > > > > > > > > > >> for job under backlog status in the future.
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >> In short, the updated FLIP looks good to me.
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >> Best,
> >> > > > > > > > > > > >> Leonard
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >>> On Jun 22, 2023, at 12:07 PM, Dong Lin <
> >> > > > > lindon...@gmail.com>
> >> > > > > > > > > wrote:
> >> > > > > > > > > > > >>>
> >> > > > > > > > > > > >>> Hi Piotr,
> >> > > > > > > > > > > >>>
> >> > > > > > > > > > > >>> Thanks again for proposing the
> isProcessingBacklog
> >> > > > concept.
> >> > > > > > > > > > > >>>
> >> > > > > > > > > > > >>> After discussing with Becket Qin and thinking
> >> about
> >> > > this
> >> > > > > > more,
> >> > > > > > > I
> >> > > > > > > > > agree it
> >> > > > > > > > > > > >>> is a better idea to add a top-level concept to
> all
> >> > > source
> >> > > > > > > > > operators to
> >> > > > > > > > > > > >>> address the target use-case.
> >> > > > > > > > > > > >>>
> >> > > > > > > > > > > >>> The main reason that changed my mind is that
> >> > > > > > > isProcessingBacklog
> >> > > > > > > > > can be
> >> > > > > > > > > > > >>> described as an inherent/nature attribute of
> every
> >> > > source
> >> > > > > > > > instance
> >> > > > > > > > > and
> >> > > > > > > > > > > >> its
> >> > > > > > > > > > > >>> semantics does not need to depend on any
> specific
> >> > > > > > checkpointing
> >> > > > > > > > > policy.
> >> > > > > > > > > > > >>> Also, we can hardcode the isProcessingBacklog
> >> > behavior
> >> > > > for
> >> > > > > > the
> >> > > > > > > > > sources we
> >> > > > > > > > > > > >>> have considered so far (e.g. HybridSource and
> >> MySQL
> >> > CDC
> >> > > > > > source)
> >> > > > > > > > > without
> >> > > > > > > > > > > >>> asking users to explicitly configure the
> >> per-source
> >> > > > > behavior,
> >> > > > > > > > which
> >> > > > > > > > > > > >> indeed
> >> > > > > > > > > > > >>> provides better user experience.
> >> > > > > > > > > > > >>>
> >> > > > > > > > > > > >>> I have updated the FLIP based on the latest
> >> > > suggestions.
> >> > > > > The
> >> > > > > > > > > latest FLIP
> >> > > > > > > > > > > >> no
> >> > > > > > > > > > > >>> longer introduces per-source config that can be
> >> used
> >> > by
> >> > > > > > > > end-users.
> >> > > > > > > > > While
> >> > > > > > > > > > > >> I
> >> > > > > > > > > > > >>> agree with you that CheckpointTrigger can be a
> >> useful
> >> > > > > feature
> >> > > > > > > to
> >> > > > > > > > > address
> >> > > > > > > > > > > >>> additional use-cases, I am not sure it is
> >> necessary
> >> > for
> >> > > > the
> >> > > > > > > > > use-case
> >> > > > > > > > > > > >>> targeted by FLIP-309. Maybe we can introduce
> >> > > > > > CheckpointTrigger
> >> > > > > > > > > separately
> >> > > > > > > > > > > >>> in another FLIP?
> >> > > > > > > > > > > >>>
> >> > > > > > > > > > > >>> Can you help take another look at the updated
> >> FLIP?
> >> > > > > > > > > > > >>>
> >> > > > > > > > > > > >>> Best,
> >> > > > > > > > > > > >>> Dong
> >> > > > > > > > > > > >>>
> >> > > > > > > > > > > >>>
> >> > > > > > > > > > > >>>
> >> > > > > > > > > > > >>> On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski
> <
> >> > > > > > > > > pnowoj...@apache.org>
> >> > > > > > > > > > > >>> wrote:
> >> > > > > > > > > > > >>>
> >> > > > > > > > > > > >>>> Hi Dong,
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>>>> Suppose there are 1000 subtask and each
> subtask
> >> has
> >> > > 1%
> >> > > > > > chance
> >> > > > > > > > of
> >> > > > > > > > > being
> >> > > > > > > > > > > >>>>> "backpressured" at a given time (due to random
> >> > > traffic
> >> > > > > > > spikes).
> >> > > > > > > > > Then at
> >> > > > > > > > > > > >>>> any
> >> > > > > > > > > > > >>>>> given time, the chance of the job
> >> > > > > > > > > > > >>>>> being considered not-backpressured =
> >> (1-0.01)^1000.
> >> > > > Since
> >> > > > > > we
> >> > > > > > > > > evaluate
> >> > > > > > > > > > > >> the
> >> > > > > > > > > > > >>>>> backpressure metric once a second, the
> estimated
> >> > time
> >> > > > for
> >> > > > > > the
> >> > > > > > > > job
> >> > > > > > > > > > > >>>>> to be considered not-backpressured is roughly
> 1
> >> /
> >> > > > > > > > > ((1-0.01)^1000) =
> >> > > > > > > > > > > >> 23163
> >> > > > > > > > > > > >>>>> sec = 6.4 hours.
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> This means that the job will effectively
> always
> >> use
> >> > > the
> >> > > > > > > longer
> >> > > > > > > > > > > >>>>> checkpointing interval. It looks like a real
> >> > concern,
> >> > > > > > right?
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>>> Sorry I don't understand where you are getting
> >> those
> >> > > > > numbers
> >> > > > > > > > from.
> >> > > > > > > > > > > >>>> Instead of trying to find loophole after
> >> loophole,
> >> > > could
> >> > > > > you
> >> > > > > > > try
> >> > > > > > > > > to
> >> > > > > > > > > > > >> think
> >> > > > > > > > > > > >>>> how a given loophole could be improved/solved?
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>>>> Hmm... I honestly think it will be useful to
> >> know
> >> > the
> >> > > > > APIs
> >> > > > > > > due
> >> > > > > > > > > to the
> >> > > > > > > > > > > >>>>> following reasons.
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>>> Please propose something. I don't think it's
> >> needed.
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>>>> - For the use-case mentioned in FLIP-309
> >> motivation
> >> > > > > > section,
> >> > > > > > > > > would the
> >> > > > > > > > > > > >>>> APIs
> >> > > > > > > > > > > >>>>> of this alternative approach be more or less
> >> > usable?
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>>> Everything that you originally wanted to
> achieve
> >> in
> >> > > > > > FLIP-309,
> >> > > > > > > > you
> >> > > > > > > > > could
> >> > > > > > > > > > > >> do
> >> > > > > > > > > > > >>>> as well in my proposal.
> >> > > > > > > > > > > >>>> Vide my many mentions of the "hacky solution".
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>>>> - Can these APIs reliably address the extra
> >> > use-case
> >> > > > > (e.g.
> >> > > > > > > > allow
> >> > > > > > > > > > > >>>>> checkpointing interval to change dynamically
> >> even
> >> > > > during
> >> > > > > > the
> >> > > > > > > > > unbounded
> >> > > > > > > > > > > >>>>> phase) as it claims?
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>>> I don't see why not.
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>>>> - Can these APIs be decoupled from the APIs
> >> > currently
> >> > > > > > > proposed
> >> > > > > > > > in
> >> > > > > > > > > > > >>>> FLIP-309?
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>>> Yes
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>>>> For example, if the APIs of this alternative
> >> > approach
> >> > > > can
> >> > > > > > be
> >> > > > > > > > > decoupled
> >> > > > > > > > > > > >>>> from
> >> > > > > > > > > > > >>>>> the APIs currently proposed in FLIP-309, then
> it
> >> > > might
> >> > > > be
> >> > > > > > > > > reasonable to
> >> > > > > > > > > > > >>>>> work on this extra use-case with a more
> >> > > > > > advanced/complicated
> >> > > > > > > > > design
> >> > > > > > > > > > > >>>>> separately in a followup work.
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>>> As I voiced my concerns previously, the current
> >> > design
> >> > > > of
> >> > > > > > > > > FLIP-309 would
> >> > > > > > > > > > > >>>> clog the public API and in the long run confuse
> >> the
> >> > > > users.
> >> > > > > > IMO
> >> > > > > > > > > It's
> >> > > > > > > > > > > >>>> addressing the
> >> > > > > > > > > > > >>>> problem in the wrong place.
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>>>> Hmm.. do you mean we can do the following:
> >> > > > > > > > > > > >>>>> - Have all source operators emit a metric
> named
> >> > > > > > > > > "processingBacklog".
> >> > > > > > > > > > > >>>>> - Add a job-level config that specifies "the
> >> > > > > checkpointing
> >> > > > > > > > > interval to
> >> > > > > > > > > > > >> be
> >> > > > > > > > > > > >>>>> used when any source is processing backlog".
> >> > > > > > > > > > > >>>>> - The JM collects the "processingBacklog"
> >> > > periodically
> >> > > > > from
> >> > > > > > > all
> >> > > > > > > > > source
> >> > > > > > > > > > > >>>>> operators and uses the newly added config
> value
> >> as
> >> > > > > > > appropriate.
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>>> Yes.
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>>>> The challenge with this approach is that we
> >> need to
> >> > > > > define
> >> > > > > > > the
> >> > > > > > > > > > > >> semantics
> >> > > > > > > > > > > >>>> of
> >> > > > > > > > > > > >>>>> this "processingBacklog" metric and have all
> >> source
> >> > > > > > operators
> >> > > > > > > > > > > >>>>> implement this metric. I am not sure we are
> >> able to
> >> > > do
> >> > > > > this
> >> > > > > > > yet
> >> > > > > > > > > without
> >> > > > > > > > > > > >>>>> having users explicitly provide this
> information
> >> > on a
> >> > > > > > > > per-source
> >> > > > > > > > > basis.
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> Suppose the job read from a bounded Kafka
> >> source,
> >> > > > should
> >> > > > > it
> >> > > > > > > > emit
> >> > > > > > > > > > > >>>>> "processingBacklog=true"? If yes, then the job
> >> > might
> >> > > > use
> >> > > > > > long
> >> > > > > > > > > > > >>>> checkpointing
> >> > > > > > > > > > > >>>>> interval even
> >> > > > > > > > > > > >>>>> if the job is asked to process data starting
> >> from
> >> > now
> >> > > > to
> >> > > > > > the
> >> > > > > > > > > next 1
> >> > > > > > > > > > > >> hour.
> >> > > > > > > > > > > >>>>> If no, then the job might use the short
> >> > checkpointing
> >> > > > > > > interval
> >> > > > > > > > > > > >>>>> even if the job is asked to re-process data
> >> > starting
> >> > > > > from 7
> >> > > > > > > > days
> >> > > > > > > > > ago.
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>>> Yes. The same can be said of your proposal.
> Your
> >> > > > proposal
> >> > > > > > has
> >> > > > > > > > the
> >> > > > > > > > > very
> >> > > > > > > > > > > >> same
> >> > > > > > > > > > > >>>> issues
> >> > > > > > > > > > > >>>> that every source would have to implement it
> >> > > > differently,
> >> > > > > > most
> >> > > > > > > > > sources
> >> > > > > > > > > > > >>>> would
> >> > > > > > > > > > > >>>> have no idea how to properly calculate the new
> >> > > requested
> >> > > > > > > > > checkpoint
> >> > > > > > > > > > > >>>> interval,
> >> > > > > > > > > > > >>>> for those that do know how to do that, user
> would
> >> > have
> >> > > > to
> >> > > > > > > > > configure
> >> > > > > > > > > > > >> every
> >> > > > > > > > > > > >>>> source
> >> > > > > > > > > > > >>>> individually and yet again we would end up
> with a
> >> > > > system,
> >> > > > > > that
> >> > > > > > > > > works
> >> > > > > > > > > > > >> only
> >> > > > > > > > > > > >>>> partially in
> >> > > > > > > > > > > >>>> some special use cases (HybridSource), that's
> >> > > confusing
> >> > > > > the
> >> > > > > > > > users
> >> > > > > > > > > even
> >> > > > > > > > > > > >>>> more.
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>>> That's why I think the more generic solution,
> >> > working
> >> > > > > > > primarily
> >> > > > > > > > > on the
> >> > > > > > > > > > > >> same
> >> > > > > > > > > > > >>>> metrics that are used by various auto scaling
> >> > > solutions
> >> > > > > > (like
> >> > > > > > > > > Flink K8s
> >> > > > > > > > > > > >>>> operator's
> >> > > > > > > > > > > >>>> autosaler) would be better. The hacky solution
> I
> >> > > > proposed
> >> > > > > > to:
> >> > > > > > > > > > > >>>> 1. show you that the generic solution is
> simply a
> >> > > > superset
> >> > > > > > of
> >> > > > > > > > your
> >> > > > > > > > > > > >> proposal
> >> > > > > > > > > > > >>>> 2. if you are adamant that
> >> > > > busyness/backpressured/records
> >> > > > > > > > > processing
> >> > > > > > > > > > > >>>> rate/pending records
> >> > > > > > > > > > > >>>>   metrics wouldn't cover your use case
> >> sufficiently
> >> > > (imo
> >> > > > > > they
> >> > > > > > > > > can),
> >> > > > > > > > > > > >> then
> >> > > > > > > > > > > >>>> you can very easily
> >> > > > > > > > > > > >>>>   enhance this algorithm with using some hints
> >> from
> >> > > the
> >> > > > > > > sources.
> >> > > > > > > > > Like
> >> > > > > > > > > > > >>>> "processingBacklog==true"
> >> > > > > > > > > > > >>>>   to short circuit the main algorithm, if
> >> > > > > > `processingBacklog`
> >> > > > > > > is
> >> > > > > > > > > > > >>>> available.
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>>> Best,
> >> > > > > > > > > > > >>>> Piotrek
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>>> pt., 16 cze 2023 o 04:45 Dong Lin <
> >> > > lindon...@gmail.com>
> >> > > > > > > > > napisał(a):
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>>>> Hi again Piotr,
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> Thank you for the reply. Please see my reply
> >> > inline.
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> On Fri, Jun 16, 2023 at 12:11 AM Piotr
> Nowojski
> >> <
> >> > > > > > > > > > > >>>> piotr.nowoj...@gmail.com>
> >> > > > > > > > > > > >>>>> wrote:
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>>> Hi again Dong,
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>>> I understand that JM will get the
> >> > > > backpressure-related
> >> > > > > > > > metrics
> >> > > > > > > > > every
> >> > > > > > > > > > > >>>>> time
> >> > > > > > > > > > > >>>>>>> the RestServerEndpoint receives the REST
> >> request
> >> > to
> >> > > > get
> >> > > > > > > these
> >> > > > > > > > > > > >>>> metrics.
> >> > > > > > > > > > > >>>>>> But
> >> > > > > > > > > > > >>>>>>> I am not sure if RestServerEndpoint is
> already
> >> > > always
> >> > > > > > > > > receiving the
> >> > > > > > > > > > > >>>>> REST
> >> > > > > > > > > > > >>>>>>> metrics at regular interval (suppose there
> is
> >> no
> >> > > > human
> >> > > > > > > > manually
> >> > > > > > > > > > > >>>>>>> opening/clicking the Flink Web UI). And if
> it
> >> > does,
> >> > > > > what
> >> > > > > > is
> >> > > > > > > > the
> >> > > > > > > > > > > >>>>> interval?
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>> Good catch, I've thought that metrics are
> >> > > > pre-emptively
> >> > > > > > sent
> >> > > > > > > > to
> >> > > > > > > > > JM
> >> > > > > > > > > > > >>>> every
> >> > > > > > > > > > > >>>>> 10
> >> > > > > > > > > > > >>>>>> seconds.
> >> > > > > > > > > > > >>>>>> Indeed that's not the case at the moment, and
> >> that
> >> > > > would
> >> > > > > > > have
> >> > > > > > > > > to be
> >> > > > > > > > > > > >>>>>> improved.
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>>> I would be surprised if Flink is already
> >> paying
> >> > > this
> >> > > > > much
> >> > > > > > > > > overhead
> >> > > > > > > > > > > >>>> just
> >> > > > > > > > > > > >>>>>> for
> >> > > > > > > > > > > >>>>>>> metrics monitoring. That is the main reason
> I
> >> > still
> >> > > > > doubt
> >> > > > > > > it
> >> > > > > > > > > is true.
> >> > > > > > > > > > > >>>>> Can
> >> > > > > > > > > > > >>>>>>> you show where this 100 ms is currently
> >> > configured?
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> Alternatively, maybe you mean that we should
> >> add
> >> > > > extra
> >> > > > > > code
> >> > > > > > > > to
> >> > > > > > > > > invoke
> >> > > > > > > > > > > >>>>> the
> >> > > > > > > > > > > >>>>>>> REST API at 100 ms interval. Then that means
> >> we
> >> > > need
> >> > > > to
> >> > > > > > > > > considerably
> >> > > > > > > > > > > >>>>>>> increase the network/cpu overhead at JM,
> where
> >> > the
> >> > > > > > overhead
> >> > > > > > > > > will
> >> > > > > > > > > > > >>>>> increase
> >> > > > > > > > > > > >>>>>>> as the number of TM/slots increase, which
> may
> >> > pose
> >> > > > risk
> >> > > > > > to
> >> > > > > > > > the
> >> > > > > > > > > > > >>>>>> scalability
> >> > > > > > > > > > > >>>>>>> of the proposed design. I am not sure we
> >> should
> >> > do
> >> > > > > this.
> >> > > > > > > What
> >> > > > > > > > > do you
> >> > > > > > > > > > > >>>>>> think?
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>> Sorry. I didn't mean metric should be
> reported
> >> > every
> >> > > > > > 100ms.
> >> > > > > > > I
> >> > > > > > > > > meant
> >> > > > > > > > > > > >>>> that
> >> > > > > > > > > > > >>>>>> "backPressuredTimeMsPerSecond (metric) would
> >> > report
> >> > > (a
> >> > > > > > value
> >> > > > > > > > of)
> >> > > > > > > > > > > >>>>> 100ms/s."
> >> > > > > > > > > > > >>>>>> once per metric interval (10s?).
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> Suppose there are 1000 subtask and each
> subtask
> >> has
> >> > > 1%
> >> > > > > > chance
> >> > > > > > > > of
> >> > > > > > > > > being
> >> > > > > > > > > > > >>>>> "backpressured" at a given time (due to random
> >> > > traffic
> >> > > > > > > spikes).
> >> > > > > > > > > Then at
> >> > > > > > > > > > > >>>> any
> >> > > > > > > > > > > >>>>> given time, the chance of the job
> >> > > > > > > > > > > >>>>> being considered not-backpressured =
> >> (1-0.01)^1000.
> >> > > > Since
> >> > > > > > we
> >> > > > > > > > > evaluate
> >> > > > > > > > > > > >> the
> >> > > > > > > > > > > >>>>> backpressure metric once a second, the
> estimated
> >> > time
> >> > > > for
> >> > > > > > the
> >> > > > > > > > job
> >> > > > > > > > > > > >>>>> to be considered not-backpressured is roughly
> 1
> >> /
> >> > > > > > > > > ((1-0.01)^1000) =
> >> > > > > > > > > > > >> 23163
> >> > > > > > > > > > > >>>>> sec = 6.4 hours.
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> This means that the job will effectively
> always
> >> use
> >> > > the
> >> > > > > > > longer
> >> > > > > > > > > > > >>>>> checkpointing interval. It looks like a real
> >> > concern,
> >> > > > > > right?
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>>>> - What is the interface of this
> >> > CheckpointTrigger?
> >> > > > For
> >> > > > > > > > > example, are
> >> > > > > > > > > > > >>>> we
> >> > > > > > > > > > > >>>>>>> going to give CheckpointTrigger a context
> >> that it
> >> > > can
> >> > > > > use
> >> > > > > > > to
> >> > > > > > > > > fetch
> >> > > > > > > > > > > >>>>>>> arbitrary metric values? This can help us
> >> > > understand
> >> > > > > what
> >> > > > > > > > > information
> >> > > > > > > > > > > >>>>>> this
> >> > > > > > > > > > > >>>>>>> user-defined CheckpointTrigger can use to
> make
> >> > the
> >> > > > > > > checkpoint
> >> > > > > > > > > > > >>>> decision.
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>> I honestly don't think this is important at
> >> this
> >> > > stage
> >> > > > > of
> >> > > > > > > the
> >> > > > > > > > > > > >>>> discussion.
> >> > > > > > > > > > > >>>>>> It could have
> >> > > > > > > > > > > >>>>>> whatever interface we would deem to be best.
> >> > > Required
> >> > > > > > > things:
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>> - access to at least a subset of metrics that
> >> the
> >> > > > given
> >> > > > > > > > > > > >>>>> `CheckpointTrigger`
> >> > > > > > > > > > > >>>>>> requests,
> >> > > > > > > > > > > >>>>>> for example via some registration mechanism,
> >> so we
> >> > > > don't
> >> > > > > > > have
> >> > > > > > > > to
> >> > > > > > > > > > > >>>> fetch
> >> > > > > > > > > > > >>>>>> all of the
> >> > > > > > > > > > > >>>>>> metrics all the time from TMs.
> >> > > > > > > > > > > >>>>>> - some way to influence
> >> `CheckpointCoordinator`.
> >> > > > Either
> >> > > > > > via
> >> > > > > > > > > manually
> >> > > > > > > > > > > >>>>>> triggering
> >> > > > > > > > > > > >>>>>> checkpoints, and/or ability to change the
> >> > > > checkpointing
> >> > > > > > > > > interval.
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> Hmm... I honestly think it will be useful to
> >> know
> >> > the
> >> > > > > APIs
> >> > > > > > > due
> >> > > > > > > > > to the
> >> > > > > > > > > > > >>>>> following reasons.
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> We would need to know the concrete APIs to
> gauge
> >> > the
> >> > > > > > > following:
> >> > > > > > > > > > > >>>>> - For the use-case mentioned in FLIP-309
> >> motivation
> >> > > > > > section,
> >> > > > > > > > > would the
> >> > > > > > > > > > > >>>> APIs
> >> > > > > > > > > > > >>>>> of this alternative approach be more or less
> >> > usable?
> >> > > > > > > > > > > >>>>> - Can these APIs reliably address the extra
> >> > use-case
> >> > > > > (e.g.
> >> > > > > > > > allow
> >> > > > > > > > > > > >>>>> checkpointing interval to change dynamically
> >> even
> >> > > > during
> >> > > > > > the
> >> > > > > > > > > unbounded
> >> > > > > > > > > > > >>>>> phase) as it claims?
> >> > > > > > > > > > > >>>>> - Can these APIs be decoupled from the APIs
> >> > currently
> >> > > > > > > proposed
> >> > > > > > > > in
> >> > > > > > > > > > > >>>> FLIP-309?
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> For example, if the APIs of this alternative
> >> > approach
> >> > > > can
> >> > > > > > be
> >> > > > > > > > > decoupled
> >> > > > > > > > > > > >>>> from
> >> > > > > > > > > > > >>>>> the APIs currently proposed in FLIP-309, then
> it
> >> > > might
> >> > > > be
> >> > > > > > > > > reasonable to
> >> > > > > > > > > > > >>>>> work on this extra use-case with a more
> >> > > > > > advanced/complicated
> >> > > > > > > > > design
> >> > > > > > > > > > > >>>>> separately in a followup work.
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>>>> - Where is this CheckpointTrigger running?
> For
> >> > > > example,
> >> > > > > > is
> >> > > > > > > it
> >> > > > > > > > > going
> >> > > > > > > > > > > >>>> to
> >> > > > > > > > > > > >>>>>> run
> >> > > > > > > > > > > >>>>>>> on the subtask of every source operator? Or
> >> is it
> >> > > > going
> >> > > > > > to
> >> > > > > > > > run
> >> > > > > > > > > on the
> >> > > > > > > > > > > >>>>> JM?
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>> IMO on the JM.
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>>> - Are we going to provide a default
> >> > implementation
> >> > > of
> >> > > > > > this
> >> > > > > > > > > > > >>>>>>> CheckpointTrigger in Flink that implements
> the
> >> > > > > algorithm
> >> > > > > > > > > described
> >> > > > > > > > > > > >>>>> below,
> >> > > > > > > > > > > >>>>>>> or do we expect each source operator
> >> developer to
> >> > > > > > implement
> >> > > > > > > > > their own
> >> > > > > > > > > > > >>>>>>> CheckpointTrigger?
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>> As I mentioned before, I think we should
> >> provide
> >> > at
> >> > > > the
> >> > > > > > very
> >> > > > > > > > > least the
> >> > > > > > > > > > > >>>>>> implementation
> >> > > > > > > > > > > >>>>>> that replaces the current triggering
> mechanism
> >> > > > > (statically
> >> > > > > > > > > configured
> >> > > > > > > > > > > >>>>>> checkpointing interval)
> >> > > > > > > > > > > >>>>>> and it would be great to provide the
> >> backpressure
> >> > > > > > monitoring
> >> > > > > > > > > trigger
> >> > > > > > > > > > > >> as
> >> > > > > > > > > > > >>>>>> well.
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> I agree that if there is a good use-case that
> >> can
> >> > be
> >> > > > > > > addressed
> >> > > > > > > > > by the
> >> > > > > > > > > > > >>>>> proposed CheckpointTrigger, then it is
> >> reasonable
> >> > > > > > > > > > > >>>>> to add CheckpointTrigger and replace the
> current
> >> > > > > triggering
> >> > > > > > > > > mechanism
> >> > > > > > > > > > > >>>> with
> >> > > > > > > > > > > >>>>> it.
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> I also agree that we will likely find such a
> >> > > use-case.
> >> > > > > For
> >> > > > > > > > > example,
> >> > > > > > > > > > > >>>> suppose
> >> > > > > > > > > > > >>>>> the source records have event timestamps, then
> >> it
> >> > is
> >> > > > > likely
> >> > > > > > > > > > > >>>>> that we can use the trigger to dynamically
> >> control
> >> > > the
> >> > > > > > > > > checkpointing
> >> > > > > > > > > > > >>>>> interval based on the difference between the
> >> > > watermark
> >> > > > > and
> >> > > > > > > > > current
> >> > > > > > > > > > > >> system
> >> > > > > > > > > > > >>>>> time.
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> But I am not sure the addition of this
> >> > > > CheckpointTrigger
> >> > > > > > > should
> >> > > > > > > > > be
> >> > > > > > > > > > > >>>> coupled
> >> > > > > > > > > > > >>>>> with FLIP-309. Whether or not it is coupled
> >> > probably
> >> > > > > > depends
> >> > > > > > > on
> >> > > > > > > > > the
> >> > > > > > > > > > > >>>>> concrete API design around CheckpointTrigger.
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> If you would be adamant that the backpressure
> >> > > > monitoring
> >> > > > > > > > doesn't
> >> > > > > > > > > cover
> >> > > > > > > > > > > >>>> well
> >> > > > > > > > > > > >>>>>> enough your use case, I would be ok to
> provide
> >> the
> >> > > > hacky
> >> > > > > > > > > version that
> >> > > > > > > > > > > >> I
> >> > > > > > > > > > > >>>>>> also mentioned
> >> > > > > > > > > > > >>>>>> before:
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>>> """
> >> > > > > > > > > > > >>>>>> Especially that if my proposed algorithm
> >> wouldn't
> >> > > work
> >> > > > > > good
> >> > > > > > > > > enough,
> >> > > > > > > > > > > >>>> there
> >> > > > > > > > > > > >>>>>> is
> >> > > > > > > > > > > >>>>>> an obvious solution, that any source could
> add
> >> a
> >> > > > metric,
> >> > > > > > > like
> >> > > > > > > > > let say
> >> > > > > > > > > > > >>>>>> "processingBacklog: true/false", and the
> >> > > > > > `CheckpointTrigger`
> >> > > > > > > > > > > >>>>>> could use this as an override to always
> switch
> >> to
> >> > > the
> >> > > > > > > > > > > >>>>>> "slowCheckpointInterval". I don't think we
> need
> >> > it,
> >> > > > but
> >> > > > > > > that's
> >> > > > > > > > > always
> >> > > > > > > > > > > >>>> an
> >> > > > > > > > > > > >>>>>> option
> >> > > > > > > > > > > >>>>>> that would be basically equivalent to your
> >> > original
> >> > > > > > > proposal.
> >> > > > > > > > > > > >>>>>> """
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> Hmm.. do you mean we can do the following:
> >> > > > > > > > > > > >>>>> - Have all source operators emit a metric
> named
> >> > > > > > > > > "processingBacklog".
> >> > > > > > > > > > > >>>>> - Add a job-level config that specifies "the
> >> > > > > checkpointing
> >> > > > > > > > > interval to
> >> > > > > > > > > > > >> be
> >> > > > > > > > > > > >>>>> used when any source is processing backlog".
> >> > > > > > > > > > > >>>>> - The JM collects the "processingBacklog"
> >> > > periodically
> >> > > > > from
> >> > > > > > > all
> >> > > > > > > > > source
> >> > > > > > > > > > > >>>>> operators and uses the newly added config
> value
> >> as
> >> > > > > > > appropriate.
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> The challenge with this approach is that we
> >> need to
> >> > > > > define
> >> > > > > > > the
> >> > > > > > > > > > > >> semantics
> >> > > > > > > > > > > >>>> of
> >> > > > > > > > > > > >>>>> this "processingBacklog" metric and have all
> >> source
> >> > > > > > operators
> >> > > > > > > > > > > >>>>> implement this metric. I am not sure we are
> >> able to
> >> > > do
> >> > > > > this
> >> > > > > > > yet
> >> > > > > > > > > without
> >> > > > > > > > > > > >>>>> having users explicitly provide this
> information
> >> > on a
> >> > > > > > > > per-source
> >> > > > > > > > > basis.
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> Suppose the job read from a bounded Kafka
> >> source,
> >> > > > should
> >> > > > > it
> >> > > > > > > > emit
> >> > > > > > > > > > > >>>>> "processingBacklog=true"? If yes, then the job
> >> > might
> >> > > > use
> >> > > > > > long
> >> > > > > > > > > > > >>>> checkpointing
> >> > > > > > > > > > > >>>>> interval even
> >> > > > > > > > > > > >>>>> if the job is asked to process data starting
> >> from
> >> > now
> >> > > > to
> >> > > > > > the
> >> > > > > > > > > next 1
> >> > > > > > > > > > > >> hour.
> >> > > > > > > > > > > >>>>> If no, then the job might use the short
> >> > checkpointing
> >> > > > > > > interval
> >> > > > > > > > > > > >>>>> even if the job is asked to re-process data
> >> > starting
> >> > > > > from 7
> >> > > > > > > > days
> >> > > > > > > > > ago.
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>>> - How can users specify the
> >> > > > > > > > > > > >>>>>>
> fastCheckpointInterval/slowCheckpointInterval?
> >> > > > > > > > > > > >>>>>>> For example, will we provide APIs on the
> >> > > > > > CheckpointTrigger
> >> > > > > > > > that
> >> > > > > > > > > > > >>>>> end-users
> >> > > > > > > > > > > >>>>>>> can use to specify the checkpointing
> interval?
> >> > What
> >> > > > > would
> >> > > > > > > > that
> >> > > > > > > > > look
> >> > > > > > > > > > > >>>>> like?
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>> Also as I mentioned before, just like metric
> >> > > reporters
> >> > > > > are
> >> > > > > > > > > configured:
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/metric_reporters/
> >> > > > > > > > > > > >>>>>> Every CheckpointTrigger could have its own
> >> custom
> >> > > > > > > > configuration.
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>>> Overall, my gut feel is that the alternative
> >> > > approach
> >> > > > > > based
> >> > > > > > > > on
> >> > > > > > > > > > > >>>>>>> CheckpointTrigger is more complicated
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>> Yes, as usual, more generic things are more
> >> > > > complicated,
> >> > > > > > but
> >> > > > > > > > > often
> >> > > > > > > > > > > >> more
> >> > > > > > > > > > > >>>>>> useful in the long run.
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>>> and harder to use.
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>> I don't agree. Why setting in config
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>> execution.checkpointing.trigger:
> >> > > > > > > > > > > >>>> BackPressureMonitoringCheckpointTrigger
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> execution.checkpointing.BackPressureMonitoringCheckpointTrigger.fast-interval:
> >> > > > > > > > > > > >>>>>> 1s
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> execution.checkpointing.BackPressureMonitoringCheckpointTrigger.slow-interval:
> >> > > > > > > > > > > >>>>>> 30s
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>> that we could even provide a shortcut to the
> >> above
> >> > > > > > construct
> >> > > > > > > > > via:
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>> execution.checkpointing.fast-interval: 1s
> >> > > > > > > > > > > >>>>>> execution.checkpointing.slow-interval: 30s
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>> is harder compared to setting two/three
> >> checkpoint
> >> > > > > > > intervals,
> >> > > > > > > > > one in
> >> > > > > > > > > > > >>>> the
> >> > > > > > > > > > > >>>>>> config/or via `env.enableCheckpointing(x)`,
> >> > > > > > > > > > > >>>>>> secondly passing one/two (fast/slow) values
> on
> >> the
> >> > > > > source
> >> > > > > > > > > itself?
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> If we can address the use-case by providing
> just
> >> > the
> >> > > > two
> >> > > > > > > > > job-level
> >> > > > > > > > > > > >> config
> >> > > > > > > > > > > >>>>> as described above, I agree it will indeed be
> >> > > simpler.
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> I have tried to achieve this goal. But the
> >> caveat
> >> > is
> >> > > > that
> >> > > > > > it
> >> > > > > > > > > requires
> >> > > > > > > > > > > >>>> much
> >> > > > > > > > > > > >>>>> more work than described above in order to
> give
> >> the
> >> > > > > configs
> >> > > > > > > > > > > >> well-defined
> >> > > > > > > > > > > >>>>> semantics. So I find it simpler to just use
> the
> >> > > > approach
> >> > > > > in
> >> > > > > > > > > FLIP-309.
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> Let me explain my concern below. It will be
> >> great
> >> > if
> >> > > > you
> >> > > > > or
> >> > > > > > > > > someone
> >> > > > > > > > > > > >> else
> >> > > > > > > > > > > >>>>> can help provide a solution.
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> 1) We need to clearly document when the
> >> > fast-interval
> >> > > > and
> >> > > > > > > > > slow-interval
> >> > > > > > > > > > > >>>>> will be used so that users can derive the
> >> expected
> >> > > > > behavior
> >> > > > > > > of
> >> > > > > > > > > the job
> >> > > > > > > > > > > >>>> and
> >> > > > > > > > > > > >>>>> be able to config these values.
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> 2) The trigger of fast/slow interval depends
> on
> >> the
> >> > > > > > behavior
> >> > > > > > > of
> >> > > > > > > > > the
> >> > > > > > > > > > > >>>> source
> >> > > > > > > > > > > >>>>> (e.g. MySQL CDC, HybridSource). However, no
> >> > existing
> >> > > > > > concepts
> >> > > > > > > > of
> >> > > > > > > > > source
> >> > > > > > > > > > > >>>>> operator (e.g. boundedness) can describe the
> >> target
> >> > > > > > behavior.
> >> > > > > > > > For
> >> > > > > > > > > > > >>>> example,
> >> > > > > > > > > > > >>>>> MySQL CDC internally has two phases, namely
> >> > snapshot
> >> > > > > phase
> >> > > > > > > and
> >> > > > > > > > > binlog
> >> > > > > > > > > > > >>>>> phase, which are not explicitly exposed to its
> >> > users
> >> > > > via
> >> > > > > > > source
> >> > > > > > > > > > > >> operator
> >> > > > > > > > > > > >>>>> API. And we probably should not enumerate all
> >> > > internal
> >> > > > > > phases
> >> > > > > > > > of
> >> > > > > > > > > all
> >> > > > > > > > > > > >>>> source
> >> > > > > > > > > > > >>>>> operators that are affected by fast/slow
> >> interval.
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> 3) An alternative approach might be to define
> a
> >> new
> >> > > > > concept
> >> > > > > > > > (e.g.
> >> > > > > > > > > > > >>>>> processingBacklog) that is applied to all
> source
> >> > > > > operators.
> >> > > > > > > > Then
> >> > > > > > > > > the
> >> > > > > > > > > > > >>>>> fast/slow interval's documentation can depend
> on
> >> > this
> >> > > > > > > concept.
> >> > > > > > > > > That
> >> > > > > > > > > > > >> means
> >> > > > > > > > > > > >>>>> we have to add a top-level concept (similar to
> >> > source
> >> > > > > > > > > boundedness) and
> >> > > > > > > > > > > >>>>> require all source operators to specify how
> they
> >> > > > enforce
> >> > > > > > this
> >> > > > > > > > > concept
> >> > > > > > > > > > > >>>> (e.g.
> >> > > > > > > > > > > >>>>> FileSystemSource always emits
> >> > > processingBacklog=true).
> >> > > > > And
> >> > > > > > > > there
> >> > > > > > > > > might
> >> > > > > > > > > > > >> be
> >> > > > > > > > > > > >>>>> cases where the source itself (e.g. a bounded
> >> Kafka
> >> > > > > Source)
> >> > > > > > > can
> >> > > > > > > > > not
> >> > > > > > > > > > > >>>>> automatically derive the value of this
> concept,
> >> in
> >> > > > which
> >> > > > > > case
> >> > > > > > > > we
> >> > > > > > > > > need
> >> > > > > > > > > > > >> to
> >> > > > > > > > > > > >>>>> provide option for users to explicitly specify
> >> the
> >> > > > value
> >> > > > > > for
> >> > > > > > > > this
> >> > > > > > > > > > > >> concept
> >> > > > > > > > > > > >>>>> on a per-source basis.
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>>>> And it probably also has the issues of
> "having
> >> > two
> >> > > > > places
> >> > > > > > > to
> >> > > > > > > > > > > >>>> configure
> >> > > > > > > > > > > >>>>>> checkpointing
> >> > > > > > > > > > > >>>>>>> interval" and "giving flexibility for every
> >> > source
> >> > > to
> >> > > > > > > > > implement a
> >> > > > > > > > > > > >>>>>> different
> >> > > > > > > > > > > >>>>>>> API" (as mentioned below).
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>> No, it doesn't.
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>>> IMO, it is a hard-requirement for the
> >> user-facing
> >> > > API
> >> > > > > to
> >> > > > > > be
> >> > > > > > > > > > > >>>>>>> clearly defined and users should be able to
> >> use
> >> > the
> >> > > > API
> >> > > > > > > > without
> >> > > > > > > > > > > >>>> concern
> >> > > > > > > > > > > >>>>>> of
> >> > > > > > > > > > > >>>>>>> regression. And this requirement is more
> >> > important
> >> > > > than
> >> > > > > > the
> >> > > > > > > > > other
> >> > > > > > > > > > > >>>> goals
> >> > > > > > > > > > > >>>>>>> discussed above because it is related to the
> >> > > > > > > > > stability/performance of
> >> > > > > > > > > > > >>>>> the
> >> > > > > > > > > > > >>>>>>> production job. What do you think?
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>> I don't agree with this. There are many
> things
> >> > that
> >> > > > work
> >> > > > > > > > > something in
> >> > > > > > > > > > > >>>>>> between perfectly and well enough
> >> > > > > > > > > > > >>>>>> in some fraction of use cases (maybe in 99%,
> >> maybe
> >> > > 95%
> >> > > > > or
> >> > > > > > > > maybe
> >> > > > > > > > > 60%),
> >> > > > > > > > > > > >>>>> while
> >> > > > > > > > > > > >>>>>> still being very useful.
> >> > > > > > > > > > > >>>>>> Good examples are: selection of state
> backend,
> >> > > > unaligned
> >> > > > > > > > > checkpoints,
> >> > > > > > > > > > > >>>>>> buffer debloating but frankly if I go
> >> > > > > > > > > > > >>>>>> through list of currently available config
> >> > options,
> >> > > > > > > something
> >> > > > > > > > > like
> >> > > > > > > > > > > >> half
> >> > > > > > > > > > > >>>>> of
> >> > > > > > > > > > > >>>>>> them can cause regressions. Heck,
> >> > > > > > > > > > > >>>>>> even Flink itself doesn't work perfectly in
> >> 100%
> >> > of
> >> > > > the
> >> > > > > > use
> >> > > > > > > > > cases, due
> >> > > > > > > > > > > >>>>> to a
> >> > > > > > > > > > > >>>>>> variety of design choices. Of
> >> > > > > > > > > > > >>>>>> course, the more use cases are fine with said
> >> > > feature,
> >> > > > > the
> >> > > > > > > > > better, but
> >> > > > > > > > > > > >>>> we
> >> > > > > > > > > > > >>>>>> shouldn't fixate to perfectly cover
> >> > > > > > > > > > > >>>>>> 100% of the cases, as that's impossible.
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>> In this particular case, if back pressure
> >> > monitoring
> >> > > > > > > trigger
> >> > > > > > > > > can work
> >> > > > > > > > > > > >>>>> well
> >> > > > > > > > > > > >>>>>> enough in 95% of cases, I would
> >> > > > > > > > > > > >>>>>> say that's already better than the originally
> >> > > proposed
> >> > > > > > > > > alternative,
> >> > > > > > > > > > > >>>> which
> >> > > > > > > > > > > >>>>>> doesn't work at all if user has a large
> >> > > > > > > > > > > >>>>>> backlog to reprocess from Kafka, including
> when
> >> > > using
> >> > > > > > > > > HybridSource
> >> > > > > > > > > > > >>>> AFTER
> >> > > > > > > > > > > >>>>>> the switch to Kafka has
> >> > > > > > > > > > > >>>>>> happened. For the remaining 5%, we should try
> >> to
> >> > > > improve
> >> > > > > > the
> >> > > > > > > > > behaviour
> >> > > > > > > > > > > >>>>> over
> >> > > > > > > > > > > >>>>>> time, but ultimately, users can
> >> > > > > > > > > > > >>>>>> decide to just run a fixed checkpoint
> interval
> >> (or
> >> > > at
> >> > > > > > worst
> >> > > > > > > > use
> >> > > > > > > > > the
> >> > > > > > > > > > > >>>> hacky
> >> > > > > > > > > > > >>>>>> checkpoint trigger that I mentioned
> >> > > > > > > > > > > >>>>>> before a couple of times).
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>> Also to be pedantic, if a user naively
> selects
> >> > > > > > slow-interval
> >> > > > > > > > in
> >> > > > > > > > > your
> >> > > > > > > > > > > >>>>>> proposal to 30 minutes, when that user's
> >> > > > > > > > > > > >>>>>> job fails on average every 15-20minutes, his
> >> job
> >> > can
> >> > > > end
> >> > > > > > up
> >> > > > > > > in
> >> > > > > > > > > a state
> >> > > > > > > > > > > >>>>> that
> >> > > > > > > > > > > >>>>>> it can not make any progress,
> >> > > > > > > > > > > >>>>>> this arguably is quite serious regression.
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> I probably should not say it is "hard
> >> requirement".
> >> > > > After
> >> > > > > > all
> >> > > > > > > > > there are
> >> > > > > > > > > > > >>>>> pros/cons. We will need to consider
> >> implementation
> >> > > > > > > complexity,
> >> > > > > > > > > > > >> usability,
> >> > > > > > > > > > > >>>>> extensibility etc.
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> I just don't think we should take it for
> >> granted to
> >> > > > > > introduce
> >> > > > > > > > > > > >> regression
> >> > > > > > > > > > > >>>>> for one use-case in order to support another
> >> > > use-case.
> >> > > > If
> >> > > > > > we
> >> > > > > > > > can
> >> > > > > > > > > not
> >> > > > > > > > > > > >> find
> >> > > > > > > > > > > >>>>> an algorithm/solution that addresses
> >> > > > > > > > > > > >>>>> both use-case well, I hope we can be open to
> >> tackle
> >> > > > them
> >> > > > > > > > > separately so
> >> > > > > > > > > > > >>>> that
> >> > > > > > > > > > > >>>>> users can choose the option that best fits
> their
> >> > > needs.
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> All things else being equal, I think it is
> >> > preferred
> >> > > > for
> >> > > > > > > > > user-facing
> >> > > > > > > > > > > >> API
> >> > > > > > > > > > > >>>> to
> >> > > > > > > > > > > >>>>> be clearly defined and let users should be
> able
> >> to
> >> > > use
> >> > > > > the
> >> > > > > > > API
> >> > > > > > > > > without
> >> > > > > > > > > > > >>>>> concern of regression.
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> Maybe we can list pros/cons for the
> alternative
> >> > > > > approaches
> >> > > > > > we
> >> > > > > > > > > have been
> >> > > > > > > > > > > >>>>> discussing and see choose the best approach.
> And
> >> > > maybe
> >> > > > we
> >> > > > > > > will
> >> > > > > > > > > end up
> >> > > > > > > > > > > >>>>> finding that use-case
> >> > > > > > > > > > > >>>>> which needs CheckpointTrigger can be tackled
> >> > > separately
> >> > > > > > from
> >> > > > > > > > the
> >> > > > > > > > > > > >> use-case
> >> > > > > > > > > > > >>>>> in FLIP-309.
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>>>> I am not sure if there is a typo. Because if
> >> > > > > > > > > > > >>>>> backPressuredTimeMsPerSecond
> >> > > > > > > > > > > >>>>>> =
> >> > > > > > > > > > > >>>>>>> 0, then
> maxRecordsConsumedWithoutBackpressure
> >> =
> >> > > > > > > > > > > >>>> numRecordsInPerSecond /
> >> > > > > > > > > > > >>>>>>> 1000 * metricsUpdateInterval according to
> the
> >> > above
> >> > > > > > > > algorithm.
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> Do you mean
> >> > "maxRecordsConsumedWithoutBackpressure
> >> > > =
> >> > > > > > > > > > > >>>>>> (numRecordsInPerSecond
> >> > > > > > > > > > > >>>>>>> / (1 - backPressuredTimeMsPerSecond /
> 1000)) *
> >> > > > > > > > > > > >>>> metricsUpdateInterval"?
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>> It looks like there is indeed some mistake in
> >> my
> >> > > > > proposal
> >> > > > > > > > > above. Yours
> >> > > > > > > > > > > >>>>> look
> >> > > > > > > > > > > >>>>>> more correct, it probably
> >> > > > > > > > > > > >>>>>> still needs some safeguard/special handling
> if
> >> > > > > > > > > > > >>>>>> `backPressuredTimeMsPerSecond > 950`
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>>> The only information it can access is the
> >> > backlog.
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>> Again no. It can access whatever we want to
> >> > provide
> >> > > to
> >> > > > > it.
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>> Regarding the rest of your concerns. It's a
> >> matter
> >> > > of
> >> > > > > > > tweaking
> >> > > > > > > > > the
> >> > > > > > > > > > > >>>>>> parameters and the algorithm itself,
> >> > > > > > > > > > > >>>>>> and how much safety-net do we want to have.
> >> > > > Ultimately,
> >> > > > > > I'm
> >> > > > > > > > > pretty
> >> > > > > > > > > > > >> sure
> >> > > > > > > > > > > >>>>>> that's a (for 95-99% of cases)
> >> > > > > > > > > > > >>>>>> solvable problem. If not, there is always the
> >> > hacky
> >> > > > > > > solution,
> >> > > > > > > > > that
> >> > > > > > > > > > > >>>> could
> >> > > > > > > > > > > >>>>> be
> >> > > > > > > > > > > >>>>>> even integrated into this above
> >> > > > > > > > > > > >>>>>> mentioned algorithm as a short circuit to
> >> always
> >> > > reach
> >> > > > > > > > > > > >> `slow-interval`.
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>> Apart of that, you picked 3 minutes as the
> >> > > > checkpointing
> >> > > > > > > > > interval in
> >> > > > > > > > > > > >>>> your
> >> > > > > > > > > > > >>>>>> counter example. In most cases
> >> > > > > > > > > > > >>>>>> any interval above 1 minute would inflict
> >> pretty
> >> > > > > > negligible
> >> > > > > > > > > overheads,
> >> > > > > > > > > > > >>>> so
> >> > > > > > > > > > > >>>>>> all in all, I would doubt there is
> >> > > > > > > > > > > >>>>>> a significant benefit (in most cases) of
> >> > increasing
> >> > > 3
> >> > > > > > minute
> >> > > > > > > > > > > >> checkpoint
> >> > > > > > > > > > > >>>>>> interval to anything more, let alone
> >> > > > > > > > > > > >>>>>> 30 minutes.
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> I am not sure we should design the algorithm
> >> with
> >> > the
> >> > > > > > > > assumption
> >> > > > > > > > > that
> >> > > > > > > > > > > >> the
> >> > > > > > > > > > > >>>>> short checkpointing interval will always be
> >> higher
> >> > > > than 1
> >> > > > > > > > minute
> >> > > > > > > > > etc.
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> I agree the proposed algorithm can solve most
> >> cases
> >> > > > where
> >> > > > > > the
> >> > > > > > > > > resource
> >> > > > > > > > > > > >> is
> >> > > > > > > > > > > >>>>> sufficient and there is always no backlog in
> >> source
> >> > > > > > subtasks.
> >> > > > > > > > On
> >> > > > > > > > > the
> >> > > > > > > > > > > >>>> other
> >> > > > > > > > > > > >>>>> hand, what makes SRE
> >> > > > > > > > > > > >>>>> life hard is probably the remaining 1-5% cases
> >> > where
> >> > > > the
> >> > > > > > > > traffic
> >> > > > > > > > > is
> >> > > > > > > > > > > >> spiky
> >> > > > > > > > > > > >>>>> and the cluster is reaching its capacity
> limit.
> >> The
> >> > > > > ability
> >> > > > > > > to
> >> > > > > > > > > predict
> >> > > > > > > > > > > >>>> and
> >> > > > > > > > > > > >>>>> control Flink job's behavior (including
> >> > checkpointing
> >> > > > > > > interval)
> >> > > > > > > > > can
> >> > > > > > > > > > > >>>>> considerably reduce the burden of manging
> Flink
> >> > jobs.
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>> Best,
> >> > > > > > > > > > > >>>>> Dong
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>> Best,
> >> > > > > > > > > > > >>>>>> Piotrek
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>> sob., 3 cze 2023 o 05:44 Dong Lin <
> >> > > > lindon...@gmail.com>
> >> > > > > > > > > napisał(a):
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>>> Hi Piotr,
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> Thanks for the explanations. I have some
> >> followup
> >> > > > > > questions
> >> > > > > > > > > below.
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> On Fri, Jun 2, 2023 at 10:55 PM Piotr
> >> Nowojski <
> >> > > > > > > > > pnowoj...@apache.org
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>>>> wrote:
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>>> Hi All,
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>> Thanks for chipping in the discussion
> Ahmed!
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>> Regarding using the REST API. Currently I'm
> >> > > leaning
> >> > > > > > > towards
> >> > > > > > > > > > > >>>>>> implementing
> >> > > > > > > > > > > >>>>>>>> this feature inside the Flink itself, via
> >> some
> >> > > > > pluggable
> >> > > > > > > > > interface.
> >> > > > > > > > > > > >>>>>>>> REST API solution would be tempting, but I
> >> guess
> >> > > not
> >> > > > > > > > everyone
> >> > > > > > > > > is
> >> > > > > > > > > > > >>>>> using
> >> > > > > > > > > > > >>>>>>>> Flink Kubernetes Operator.
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>> @Dong
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>>> I am not sure metrics such as
> >> isBackPressured
> >> > are
> >> > > > > > already
> >> > > > > > > > > sent to
> >> > > > > > > > > > > >>>>> JM.
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>> Fetching code path on the JM:
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl#queryTmMetricsFuture
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>
> >> > > > > > > > >
> >> > > > >
> >> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore#add
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>> Example code path accessing Task level
> >> metrics
> >> > via
> >> > > > JM
> >> > > > > > > using
> >> > > > > > > > > the
> >> > > > > > > > > > > >>>>>>>> `MetricStore`:
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.flink.runtime.rest.handler.job.metrics.AggregatingSubtasksMetricsHandler
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> Thanks for the code reference. I checked the
> >> code
> >> > > > that
> >> > > > > > > > invoked
> >> > > > > > > > > these
> >> > > > > > > > > > > >>>>> two
> >> > > > > > > > > > > >>>>>>> classes and found the following information:
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> -
> >> AggregatingSubtasksMetricsHandler#getStoresis
> >> > > > > currently
> >> > > > > > > > > invoked
> >> > > > > > > > > > > >>>> only
> >> > > > > > > > > > > >>>>>>> when AggregatingJobsMetricsHandler is
> invoked.
> >> > > > > > > > > > > >>>>>>> - AggregatingJobsMetricsHandler is only
> >> > > instantiated
> >> > > > > and
> >> > > > > > > > > returned by
> >> > > > > > > > > > > >>>>>>> WebMonitorEndpoint#initializeHandlers
> >> > > > > > > > > > > >>>>>>> - WebMonitorEndpoint#initializeHandlers is
> >> only
> >> > > used
> >> > > > by
> >> > > > > > > > > > > >>>>>> RestServerEndpoint.
> >> > > > > > > > > > > >>>>>>> And RestServerEndpoint invokes these
> handlers
> >> in
> >> > > > > response
> >> > > > > > > to
> >> > > > > > > > > external
> >> > > > > > > > > > > >>>>>> REST
> >> > > > > > > > > > > >>>>>>> request.
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> I understand that JM will get the
> >> > > > backpressure-related
> >> > > > > > > > metrics
> >> > > > > > > > > every
> >> > > > > > > > > > > >>>>> time
> >> > > > > > > > > > > >>>>>>> the RestServerEndpoint receives the REST
> >> request
> >> > to
> >> > > > get
> >> > > > > > > these
> >> > > > > > > > > > > >>>> metrics.
> >> > > > > > > > > > > >>>>>> But
> >> > > > > > > > > > > >>>>>>> I am not sure if RestServerEndpoint is
> already
> >> > > always
> >> > > > > > > > > receiving the
> >> > > > > > > > > > > >>>>> REST
> >> > > > > > > > > > > >>>>>>> metrics at regular interval (suppose there
> is
> >> no
> >> > > > human
> >> > > > > > > > manually
> >> > > > > > > > > > > >>>>>>> opening/clicking the Flink Web UI). And if
> it
> >> > does,
> >> > > > > what
> >> > > > > > is
> >> > > > > > > > the
> >> > > > > > > > > > > >>>>> interval?
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>>>> For example, let's say every source
> operator
> >> > > > subtask
> >> > > > > > > > reports
> >> > > > > > > > > this
> >> > > > > > > > > > > >>>>>>> metric
> >> > > > > > > > > > > >>>>>>>> to
> >> > > > > > > > > > > >>>>>>>>> JM once every 10 seconds. There are 100
> >> source
> >> > > > > > subtasks.
> >> > > > > > > > And
> >> > > > > > > > > each
> >> > > > > > > > > > > >>>>>>> subtask
> >> > > > > > > > > > > >>>>>>>>> is backpressured roughly 10% of the total
> >> time
> >> > > due
> >> > > > to
> >> > > > > > > > traffic
> >> > > > > > > > > > > >>>>> spikes
> >> > > > > > > > > > > >>>>>>> (and
> >> > > > > > > > > > > >>>>>>>>> limited buffer). Then at any given time,
> >> there
> >> > > are
> >> > > > 1
> >> > > > > -
> >> > > > > > > > > 0.9^100 =
> >> > > > > > > > > > > >>>>>>> 99.997%
> >> > > > > > > > > > > >>>>>>>>> chance that there is at least one subtask
> >> that
> >> > is
> >> > > > > > > > > backpressured.
> >> > > > > > > > > > > >>>>> Then
> >> > > > > > > > > > > >>>>>>> we
> >> > > > > > > > > > > >>>>>>>>> have to wait for at least 10 seconds to
> >> check
> >> > > > again.
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>> backPressuredTimeMsPerSecond and other
> >> related
> >> > > > metrics
> >> > > > > > > (like
> >> > > > > > > > > > > >>>>>>>> busyTimeMsPerSecond) are not subject to
> that
> >> > > > problem.
> >> > > > > > > > > > > >>>>>>>> They are recalculated once every metric
> >> fetching
> >> > > > > > interval,
> >> > > > > > > > > and they
> >> > > > > > > > > > > >>>>>>> report
> >> > > > > > > > > > > >>>>>>>> accurately on average the given subtask
> spent
> >> > > > > > > > > > > >>>>>> busy/idling/backpressured.
> >> > > > > > > > > > > >>>>>>>> In your example,
> backPressuredTimeMsPerSecond
> >> > > would
> >> > > > > > report
> >> > > > > > > > > 100ms/s.
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> Suppose every subtask is already reporting
> >> > > > > > > > > > > >>>> backPressuredTimeMsPerSecond
> >> > > > > > > > > > > >>>>>> to
> >> > > > > > > > > > > >>>>>>> JM once every 100 ms. If a job has 10
> >> operators
> >> > > (that
> >> > > > > are
> >> > > > > > > not
> >> > > > > > > > > > > >>>> chained)
> >> > > > > > > > > > > >>>>>> and
> >> > > > > > > > > > > >>>>>>> each operator has 100 subtasks, then JM
> would
> >> > need
> >> > > to
> >> > > > > > > handle
> >> > > > > > > > > 10000
> >> > > > > > > > > > > >>>>>> requests
> >> > > > > > > > > > > >>>>>>> per second to receive metrics from these
> 1000
> >> > > > subtasks.
> >> > > > > > It
> >> > > > > > > > > seems
> >> > > > > > > > > > > >>>> like a
> >> > > > > > > > > > > >>>>>>> non-trivial overhead for medium-to-large
> sized
> >> > jobs
> >> > > > and
> >> > > > > > can
> >> > > > > > > > > make JM
> >> > > > > > > > > > > >>>> the
> >> > > > > > > > > > > >>>>>>> performance bottleneck during job execution.
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> I would be surprised if Flink is already
> >> paying
> >> > > this
> >> > > > > much
> >> > > > > > > > > overhead
> >> > > > > > > > > > > >>>> just
> >> > > > > > > > > > > >>>>>> for
> >> > > > > > > > > > > >>>>>>> metrics monitoring. That is the main reason
> I
> >> > still
> >> > > > > doubt
> >> > > > > > > it
> >> > > > > > > > > is true.
> >> > > > > > > > > > > >>>>> Can
> >> > > > > > > > > > > >>>>>>> you show where this 100 ms is currently
> >> > configured?
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> Alternatively, maybe you mean that we should
> >> add
> >> > > > extra
> >> > > > > > code
> >> > > > > > > > to
> >> > > > > > > > > invoke
> >> > > > > > > > > > > >>>>> the
> >> > > > > > > > > > > >>>>>>> REST API at 100 ms interval. Then that means
> >> we
> >> > > need
> >> > > > to
> >> > > > > > > > > considerably
> >> > > > > > > > > > > >>>>>>> increase the network/cpu overhead at JM,
> where
> >> > the
> >> > > > > > overhead
> >> > > > > > > > > will
> >> > > > > > > > > > > >>>>> increase
> >> > > > > > > > > > > >>>>>>> as the number of TM/slots increase, which
> may
> >> > pose
> >> > > > risk
> >> > > > > > to
> >> > > > > > > > the
> >> > > > > > > > > > > >>>>>> scalability
> >> > > > > > > > > > > >>>>>>> of the proposed design. I am not sure we
> >> should
> >> > do
> >> > > > > this.
> >> > > > > > > What
> >> > > > > > > > > do you
> >> > > > > > > > > > > >>>>>> think?
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>>> While it will be nice to support
> additional
> >> > > > use-cases
> >> > > > > > > > > > > >>>>>>>>> with one proposal, it is probably also
> >> > reasonable
> >> > > > to
> >> > > > > > make
> >> > > > > > > > > > > >>>>> incremental
> >> > > > > > > > > > > >>>>>>>>> progress and support the low-hanging-fruit
> >> > > use-case
> >> > > > > > > first.
> >> > > > > > > > > The
> >> > > > > > > > > > > >>>>> choice
> >> > > > > > > > > > > >>>>>>>>> really depends on the complexity and the
> >> > > importance
> >> > > > > of
> >> > > > > > > > > supporting
> >> > > > > > > > > > > >>>>> the
> >> > > > > > > > > > > >>>>>>>> extra
> >> > > > > > > > > > > >>>>>>>>> use-cases.
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>> That would be true, if that was a private
> >> > > > > implementation
> >> > > > > > > > > detail or
> >> > > > > > > > > > > >>>> if
> >> > > > > > > > > > > >>>>>> the
> >> > > > > > > > > > > >>>>>>>> low-hanging-fruit-solution would be on the
> >> > direct
> >> > > > path
> >> > > > > > to
> >> > > > > > > > the
> >> > > > > > > > > final
> >> > > > > > > > > > > >>>>>>>> solution.
> >> > > > > > > > > > > >>>>>>>> That's unfortunately not the case here.
> This
> >> > will
> >> > > > add
> >> > > > > > > public
> >> > > > > > > > > facing
> >> > > > > > > > > > > >>>>>> API,
> >> > > > > > > > > > > >>>>>>>> that we will later need to maintain, no
> >> matter
> >> > > what
> >> > > > > the
> >> > > > > > > > final
> >> > > > > > > > > > > >>>>> solution
> >> > > > > > > > > > > >>>>>>> will
> >> > > > > > > > > > > >>>>>>>> be,
> >> > > > > > > > > > > >>>>>>>> and at the moment at least I don't see it
> >> being
> >> > > > > related
> >> > > > > > > to a
> >> > > > > > > > > > > >>>>> "perfect"
> >> > > > > > > > > > > >>>>>>>> solution.
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> Sure. Then let's decide the final solution
> >> first.
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>>>> I guess the point is that the suggested
> >> > approach,
> >> > > > > which
> >> > > > > > > > > > > >>>> dynamically
> >> > > > > > > > > > > >>>>>>>>> determines the checkpointing interval
> based
> >> on
> >> > > the
> >> > > > > > > > > backpressure,
> >> > > > > > > > > > > >>>>> may
> >> > > > > > > > > > > >>>>>>>> cause
> >> > > > > > > > > > > >>>>>>>>> regression when the checkpointing interval
> >> is
> >> > > > > > relatively
> >> > > > > > > > low.
> >> > > > > > > > > > > >>>> This
> >> > > > > > > > > > > >>>>>>> makes
> >> > > > > > > > > > > >>>>>>>> it
> >> > > > > > > > > > > >>>>>>>>> hard for users to enable this feature in
> >> > > > production.
> >> > > > > It
> >> > > > > > > is
> >> > > > > > > > > like
> >> > > > > > > > > > > >>>> an
> >> > > > > > > > > > > >>>>>>>>> auto-driving system that is not guaranteed
> >> to
> >> > > work
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>> Yes, creating a more generic solution that
> >> would
> >> > > > > require
> >> > > > > > > > less
> >> > > > > > > > > > > >>>>>>> configuration
> >> > > > > > > > > > > >>>>>>>> is usually more difficult then static
> >> > > > configurations.
> >> > > > > > > > > > > >>>>>>>> It doesn't mean we shouldn't try to do
> that.
> >> > > > > Especially
> >> > > > > > > that
> >> > > > > > > > > if my
> >> > > > > > > > > > > >>>>>>> proposed
> >> > > > > > > > > > > >>>>>>>> algorithm wouldn't work good enough, there
> is
> >> > > > > > > > > > > >>>>>>>> an obvious solution, that any source could
> >> add a
> >> > > > > metric,
> >> > > > > > > > like
> >> > > > > > > > > let
> >> > > > > > > > > > > >>>> say
> >> > > > > > > > > > > >>>>>>>> "processingBacklog: true/false", and the
> >> > > > > > > `CheckpointTrigger`
> >> > > > > > > > > > > >>>>>>>> could use this as an override to always
> >> switch
> >> > to
> >> > > > the
> >> > > > > > > > > > > >>>>>>>> "slowCheckpointInterval". I don't think we
> >> need
> >> > > it,
> >> > > > > but
> >> > > > > > > > that's
> >> > > > > > > > > > > >>>> always
> >> > > > > > > > > > > >>>>>> an
> >> > > > > > > > > > > >>>>>>>> option
> >> > > > > > > > > > > >>>>>>>> that would be basically equivalent to your
> >> > > original
> >> > > > > > > > proposal.
> >> > > > > > > > > Or
> >> > > > > > > > > > > >>>> even
> >> > > > > > > > > > > >>>>>>>> source could add
> >> "suggestedCheckpointInterval :
> >> > > > int",
> >> > > > > > and
> >> > > > > > > > > > > >>>>>>>> `CheckpointTrigger` could use that value if
> >> > > present
> >> > > > > as a
> >> > > > > > > > hint
> >> > > > > > > > > in
> >> > > > > > > > > > > >>>> one
> >> > > > > > > > > > > >>>>>> way
> >> > > > > > > > > > > >>>>>>> or
> >> > > > > > > > > > > >>>>>>>> another.
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> So far we have talked about the possibility
> of
> >> > > using
> >> > > > > > > > > > > >>>> CheckpointTrigger
> >> > > > > > > > > > > >>>>>> and
> >> > > > > > > > > > > >>>>>>> mentioned the CheckpointTrigger
> >> > > > > > > > > > > >>>>>>> and read metric values.
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> Can you help answer the following questions
> so
> >> > > that I
> >> > > > > can
> >> > > > > > > > > understand
> >> > > > > > > > > > > >>>>> the
> >> > > > > > > > > > > >>>>>>> alternative solution more concretely:
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> - What is the interface of this
> >> > CheckpointTrigger?
> >> > > > For
> >> > > > > > > > > example, are
> >> > > > > > > > > > > >>>> we
> >> > > > > > > > > > > >>>>>>> going to give CheckpointTrigger a context
> >> that it
> >> > > can
> >> > > > > use
> >> > > > > > > to
> >> > > > > > > > > fetch
> >> > > > > > > > > > > >>>>>>> arbitrary metric values? This can help us
> >> > > understand
> >> > > > > what
> >> > > > > > > > > information
> >> > > > > > > > > > > >>>>>> this
> >> > > > > > > > > > > >>>>>>> user-defined CheckpointTrigger can use to
> make
> >> > the
> >> > > > > > > checkpoint
> >> > > > > > > > > > > >>>> decision.
> >> > > > > > > > > > > >>>>>>> - Where is this CheckpointTrigger running?
> For
> >> > > > example,
> >> > > > > > is
> >> > > > > > > it
> >> > > > > > > > > going
> >> > > > > > > > > > > >>>> to
> >> > > > > > > > > > > >>>>>> run
> >> > > > > > > > > > > >>>>>>> on the subtask of every source operator? Or
> >> is it
> >> > > > going
> >> > > > > > to
> >> > > > > > > > run
> >> > > > > > > > > on the
> >> > > > > > > > > > > >>>>> JM?
> >> > > > > > > > > > > >>>>>>> - Are we going to provide a default
> >> > implementation
> >> > > of
> >> > > > > > this
> >> > > > > > > > > > > >>>>>>> CheckpointTrigger in Flink that implements
> the
> >> > > > > algorithm
> >> > > > > > > > > described
> >> > > > > > > > > > > >>>>> below,
> >> > > > > > > > > > > >>>>>>> or do we expect each source operator
> >> developer to
> >> > > > > > implement
> >> > > > > > > > > their own
> >> > > > > > > > > > > >>>>>>> CheckpointTrigger?
> >> > > > > > > > > > > >>>>>>> - How can users specify the
> >> > > > > > > > > > > >>>>>>
> fastCheckpointInterval/slowCheckpointInterval?
> >> > > > > > > > > > > >>>>>>> For example, will we provide APIs on the
> >> > > > > > CheckpointTrigger
> >> > > > > > > > that
> >> > > > > > > > > > > >>>>> end-users
> >> > > > > > > > > > > >>>>>>> can use to specify the checkpointing
> interval?
> >> > What
> >> > > > > would
> >> > > > > > > > that
> >> > > > > > > > > look
> >> > > > > > > > > > > >>>>> like?
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> Overall, my gut feel is that the alternative
> >> > > approach
> >> > > > > > based
> >> > > > > > > > on
> >> > > > > > > > > > > >>>>>>> CheckpointTrigger is more complicated and
> >> harder
> >> > to
> >> > > > > use.
> >> > > > > > > And
> >> > > > > > > > it
> >> > > > > > > > > > > >>>>> probably
> >> > > > > > > > > > > >>>>>>> also has the issues of "having two places to
> >> > > > configure
> >> > > > > > > > > checkpointing
> >> > > > > > > > > > > >>>>>>> interval" and "giving flexibility for every
> >> > source
> >> > > to
> >> > > > > > > > > implement a
> >> > > > > > > > > > > >>>>>> different
> >> > > > > > > > > > > >>>>>>> API" (as mentioned below).
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> Maybe we can evaluate it more after knowing
> >> the
> >> > > > answers
> >> > > > > > to
> >> > > > > > > > the
> >> > > > > > > > > above
> >> > > > > > > > > > > >>>>>>> questions.
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>>> On the other hand, the approach currently
> >> > > proposed
> >> > > > in
> >> > > > > > the
> >> > > > > > > > > FLIP is
> >> > > > > > > > > > > >>>>>> much
> >> > > > > > > > > > > >>>>>>>>> simpler as it does not depend on
> >> backpressure.
> >> > > > Users
> >> > > > > > > > specify
> >> > > > > > > > > the
> >> > > > > > > > > > > >>>>>> extra
> >> > > > > > > > > > > >>>>>>>>> interval requirement on the specific
> sources
> >> > > (e.g.
> >> > > > > > > > > HybridSource,
> >> > > > > > > > > > > >>>>>> MySQL
> >> > > > > > > > > > > >>>>>>>> CDC
> >> > > > > > > > > > > >>>>>>>>> Source) and can easily know the
> >> checkpointing
> >> > > > > interval
> >> > > > > > > will
> >> > > > > > > > > be
> >> > > > > > > > > > > >>>> used
> >> > > > > > > > > > > >>>>>> on
> >> > > > > > > > > > > >>>>>>>> the
> >> > > > > > > > > > > >>>>>>>>> continuous phase of the corresponding
> >> source.
> >> > > This
> >> > > > is
> >> > > > > > > > pretty
> >> > > > > > > > > much
> >> > > > > > > > > > > >>>>>> same
> >> > > > > > > > > > > >>>>>>> as
> >> > > > > > > > > > > >>>>>>>>> how users use the existing
> >> > > > > > > execution.checkpointing.interval
> >> > > > > > > > > > > >>>> config.
> >> > > > > > > > > > > >>>>>> So
> >> > > > > > > > > > > >>>>>>>>> there is no extra concern of regression
> >> caused
> >> > by
> >> > > > > this
> >> > > > > > > > > approach.
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>> To an extent, but as I have already
> >> previously
> >> > > > > > mentioned I
> >> > > > > > > > > really
> >> > > > > > > > > > > >>>>>> really
> >> > > > > > > > > > > >>>>>>> do
> >> > > > > > > > > > > >>>>>>>> not like idea of:
> >> > > > > > > > > > > >>>>>>>> - having two places to configure
> >> checkpointing
> >> > > > > interval
> >> > > > > > > > > (config
> >> > > > > > > > > > > >>>>> file
> >> > > > > > > > > > > >>>>>>> and
> >> > > > > > > > > > > >>>>>>>> in the Source builders)
> >> > > > > > > > > > > >>>>>>>> - giving flexibility for every source to
> >> > > implement a
> >> > > > > > > > different
> >> > > > > > > > > > > >>>> API
> >> > > > > > > > > > > >>>>>> for
> >> > > > > > > > > > > >>>>>>>> that purpose
> >> > > > > > > > > > > >>>>>>>> - creating a solution that is not generic
> >> > enough,
> >> > > so
> >> > > > > > that
> >> > > > > > > we
> >> > > > > > > > > will
> >> > > > > > > > > > > >>>>>> need
> >> > > > > > > > > > > >>>>>>> a
> >> > > > > > > > > > > >>>>>>>> completely different mechanism in the
> future
> >> > > anyway
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> Yeah, I understand different developers
> might
> >> > have
> >> > > > > > > different
> >> > > > > > > > > > > >>>>>>> concerns/tastes for these APIs. Ultimately,
> >> there
> >> > > > might
> >> > > > > > not
> >> > > > > > > > be
> >> > > > > > > > > a
> >> > > > > > > > > > > >>>>> perfect
> >> > > > > > > > > > > >>>>>>> solution and we have to choose based on the
> >> > > pros/cons
> >> > > > > of
> >> > > > > > > > these
> >> > > > > > > > > > > >>>>> solutions.
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> I agree with you that, all things being
> >> equal, it
> >> > > is
> >> > > > > > > > > preferable to 1)
> >> > > > > > > > > > > >>>>>> have
> >> > > > > > > > > > > >>>>>>> one place to configure checkpointing
> >> intervals,
> >> > 2)
> >> > > > have
> >> > > > > > all
> >> > > > > > > > > source
> >> > > > > > > > > > > >>>>>>> operators use the same API, and 3) create a
> >> > > solution
> >> > > > > that
> >> > > > > > > is
> >> > > > > > > > > generic
> >> > > > > > > > > > > >>>>> and
> >> > > > > > > > > > > >>>>>>> last lasting. Note that these three goals
> >> affects
> >> > > the
> >> > > > > > > > > usability and
> >> > > > > > > > > > > >>>>>>> extensibility of the API, but not
> necessarily
> >> the
> >> > > > > > > > > > > >>>> stability/performance
> >> > > > > > > > > > > >>>>>> of
> >> > > > > > > > > > > >>>>>>> the production job.
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> BTW, there are also other preferrable goals.
> >> For
> >> > > > > example,
> >> > > > > > > it
> >> > > > > > > > > is very
> >> > > > > > > > > > > >>>>>> useful
> >> > > > > > > > > > > >>>>>>> for the job's behavior to be predictable and
> >> > > > > > interpretable
> >> > > > > > > so
> >> > > > > > > > > that
> >> > > > > > > > > > > >>>> SRE
> >> > > > > > > > > > > >>>>>> can
> >> > > > > > > > > > > >>>>>>> operator/debug the Flink in an easier way.
> We
> >> can
> >> > > > list
> >> > > > > > > these
> >> > > > > > > > > > > >>>> pros/cons
> >> > > > > > > > > > > >>>>>>> altogether later.
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> I am wondering if we can first agree on the
> >> > > priority
> >> > > > of
> >> > > > > > > goals
> >> > > > > > > > > we want
> >> > > > > > > > > > > >>>>> to
> >> > > > > > > > > > > >>>>>>> achieve. IMO, it is a hard-requirement for
> the
> >> > > > > > user-facing
> >> > > > > > > > API
> >> > > > > > > > > to be
> >> > > > > > > > > > > >>>>>>> clearly defined and users should be able to
> >> use
> >> > the
> >> > > > API
> >> > > > > > > > without
> >> > > > > > > > > > > >>>> concern
> >> > > > > > > > > > > >>>>>> of
> >> > > > > > > > > > > >>>>>>> regression. And this requirement is more
> >> > important
> >> > > > than
> >> > > > > > the
> >> > > > > > > > > other
> >> > > > > > > > > > > >>>> goals
> >> > > > > > > > > > > >>>>>>> discussed above because it is related to the
> >> > > > > > > > > stability/performance of
> >> > > > > > > > > > > >>>>> the
> >> > > > > > > > > > > >>>>>>> production job. What do you think?
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>>> Sounds good. Looking forward to learning
> >> more
> >> > > > ideas.
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>> I have thought about this a bit more, and I
> >> > think
> >> > > we
> >> > > > > > don't
> >> > > > > > > > > need to
> >> > > > > > > > > > > >>>>>> check
> >> > > > > > > > > > > >>>>>>>> for the backpressure status, or how much
> >> > > overloaded
> >> > > > > all
> >> > > > > > of
> >> > > > > > > > the
> >> > > > > > > > > > > >>>>>> operators
> >> > > > > > > > > > > >>>>>>>> are.
> >> > > > > > > > > > > >>>>>>>> We could just check three things for source
> >> > > > operators:
> >> > > > > > > > > > > >>>>>>>> 1. pendingRecords (backlog length)
> >> > > > > > > > > > > >>>>>>>> 2. numRecordsInPerSecond
> >> > > > > > > > > > > >>>>>>>> 3. backPressuredTimeMsPerSecond
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>> // int metricsUpdateInterval = 10s //
> >> obtained
> >> > > from
> >> > > > > > config
> >> > > > > > > > > > > >>>>>>>> // Next line calculates how many records
> can
> >> we
> >> > > > > consume
> >> > > > > > > from
> >> > > > > > > > > the
> >> > > > > > > > > > > >>>>>> backlog,
> >> > > > > > > > > > > >>>>>>>> assuming
> >> > > > > > > > > > > >>>>>>>> // that magically the reason behind a
> >> > backpressure
> >> > > > > > > vanishes.
> >> > > > > > > > > We
> >> > > > > > > > > > > >>>> will
> >> > > > > > > > > > > >>>>>> use
> >> > > > > > > > > > > >>>>>>>> this only as
> >> > > > > > > > > > > >>>>>>>> // a safeguard  against scenarios like for
> >> > example
> >> > > > if
> >> > > > > > > > > backpressure
> >> > > > > > > > > > > >>>>> was
> >> > > > > > > > > > > >>>>>>>> caused by some
> >> > > > > > > > > > > >>>>>>>> // intermittent failure/performance
> >> degradation.
> >> > > > > > > > > > > >>>>>>>> maxRecordsConsumedWithoutBackpressure =
> >> > > > > > > > > (numRecordsInPerSecond /
> >> > > > > > > > > > > >>>>> (1000
> >> > > > > > > > > > > >>>>>>>> - backPressuredTimeMsPerSecond / 1000)) *
> >> > > > > > > > > metricsUpdateInterval
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> I am not sure if there is a typo. Because if
> >> > > > > > > > > > > >>>>>> backPressuredTimeMsPerSecond =
> >> > > > > > > > > > > >>>>>>> 0, then
> maxRecordsConsumedWithoutBackpressure
> >> =
> >> > > > > > > > > > > >>>> numRecordsInPerSecond /
> >> > > > > > > > > > > >>>>>>> 1000 * metricsUpdateInterval according to
> the
> >> > above
> >> > > > > > > > algorithm.
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> Do you mean
> >> > "maxRecordsConsumedWithoutBackpressure
> >> > > =
> >> > > > > > > > > > > >>>>>> (numRecordsInPerSecond
> >> > > > > > > > > > > >>>>>>> / (1 - backPressuredTimeMsPerSecond /
> 1000)) *
> >> > > > > > > > > > > >>>> metricsUpdateInterval"?
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>> // we are excluding
> >> > > > > > maxRecordsConsumedWithoutBackpressure
> >> > > > > > > > > from the
> >> > > > > > > > > > > >>>>>>> backlog
> >> > > > > > > > > > > >>>>>>>> as
> >> > > > > > > > > > > >>>>>>>> // a safeguard against an intermittent back
> >> > > pressure
> >> > > > > > > > > problems, so
> >> > > > > > > > > > > >>>>> that
> >> > > > > > > > > > > >>>>>> we
> >> > > > > > > > > > > >>>>>>>> don't
> >> > > > > > > > > > > >>>>>>>> // calculate next checkpoint interval far
> >> far in
> >> > > the
> >> > > > > > > future,
> >> > > > > > > > > while
> >> > > > > > > > > > > >>>>> the
> >> > > > > > > > > > > >>>>>>>> backpressure
> >> > > > > > > > > > > >>>>>>>> // goes away before we will recalculate
> >> metrics
> >> > > and
> >> > > > > new
> >> > > > > > > > > > > >>>> checkpointing
> >> > > > > > > > > > > >>>>>>>> interval
> >> > > > > > > > > > > >>>>>>>> timeToConsumeBacklog = (pendingRecords -
> >> > > > > > > > > > > >>>>>>>> maxRecordsConsumedWithoutBackpressure) /
> >> > > > > > > > numRecordsInPerSecond
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>> Then we can use those numbers to calculate
> >> > desired
> >> > > > > > > > > checkpointed
> >> > > > > > > > > > > >>>>>> interval
> >> > > > > > > > > > > >>>>>>>> for example like this:
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>> long calculatedCheckpointInterval =
> >> > > > > > timeToConsumeBacklog /
> >> > > > > > > > 10;
> >> > > > > > > > > > > >>>> //this
> >> > > > > > > > > > > >>>>>> may
> >> > > > > > > > > > > >>>>>>>> need some refining
> >> > > > > > > > > > > >>>>>>>> long nextCheckpointInterval =
> >> > > > > > > > min(max(fastCheckpointInterval,
> >> > > > > > > > > > > >>>>>>>> calculatedCheckpointInterval),
> >> > > > > slowCheckpointInterval);
> >> > > > > > > > > > > >>>>>>>> long nextCheckpointTs = lastCheckpointTs +
> >> > > > > > > > > nextCheckpointInterval;
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>> WDYT?
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> I think the idea of the above algorithm is
> to
> >> > > incline
> >> > > > > to
> >> > > > > > > use
> >> > > > > > > > > the
> >> > > > > > > > > > > >>>>>>> fastCheckpointInterval unless we are very
> sure
> >> > the
> >> > > > > > backlog
> >> > > > > > > > > will take
> >> > > > > > > > > > > >>>> a
> >> > > > > > > > > > > >>>>>> long
> >> > > > > > > > > > > >>>>>>> time to process. This can alleviate the
> >> concern
> >> > of
> >> > > > > > > regression
> >> > > > > > > > > during
> >> > > > > > > > > > > >>>>> the
> >> > > > > > > > > > > >>>>>>> continuous_bounded phase since we are more
> >> likely
> >> > > to
> >> > > > > use
> >> > > > > > > the
> >> > > > > > > > > > > >>>>>>> fastCheckpointInterval. However, it can
> cause
> >> > > > > regression
> >> > > > > > > > > during the
> >> > > > > > > > > > > >>>>>> bounded
> >> > > > > > > > > > > >>>>>>> phase.
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> I will use a concrete example to explain the
> >> risk
> >> > > of
> >> > > > > > > > > regression:
> >> > > > > > > > > > > >>>>>>> - The user is using HybridSource to read
> from
> >> > HDFS
> >> > > > > > followed
> >> > > > > > > > by
> >> > > > > > > > > Kafka.
> >> > > > > > > > > > > >>>>> The
> >> > > > > > > > > > > >>>>>>> data in HDFS is old and there is no need for
> >> data
> >> > > > > > freshness
> >> > > > > > > > > for the
> >> > > > > > > > > > > >>>>> data
> >> > > > > > > > > > > >>>>>> in
> >> > > > > > > > > > > >>>>>>> HDFS.
> >> > > > > > > > > > > >>>>>>> - The user configures the job as below:
> >> > > > > > > > > > > >>>>>>> - fastCheckpointInterval = 3 minutes
> >> > > > > > > > > > > >>>>>>> - slowCheckpointInterval = 30 minutes
> >> > > > > > > > > > > >>>>>>> - metricsUpdateInterval = 100 ms
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> Using the above formulate, we can know that
> >> once
> >> > > > > > > > pendingRecords
> >> > > > > > > > > > > >>>>>>> <= numRecordsInPerSecond * 30-minutes, then
> >> > > > > > > > > > > >>>>> calculatedCheckpointInterval
> >> > > > > > > > > > > >>>>>> <=
> >> > > > > > > > > > > >>>>>>> 3 minutes, meaning that we will use
> >> > > > > > slowCheckpointInterval
> >> > > > > > > as
> >> > > > > > > > > the
> >> > > > > > > > > > > >>>>>>> checkpointing interval. Then in the last 30
> >> > minutes
> >> > > > of
> >> > > > > > the
> >> > > > > > > > > bounded
> >> > > > > > > > > > > >>>>> phase,
> >> > > > > > > > > > > >>>>>>> the checkpointing frequency will be 10X
> higher
> >> > than
> >> > > > > what
> >> > > > > > > the
> >> > > > > > > > > user
> >> > > > > > > > > > > >>>>> wants.
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> Also note that the same issue would also
> >> > > considerably
> >> > > > > > limit
> >> > > > > > > > the
> >> > > > > > > > > > > >>>>> benefits
> >> > > > > > > > > > > >>>>>> of
> >> > > > > > > > > > > >>>>>>> the algorithm. For example, during the
> >> continuous
> >> > > > > phase,
> >> > > > > > > the
> >> > > > > > > > > > > >>>> algorithm
> >> > > > > > > > > > > >>>>>> will
> >> > > > > > > > > > > >>>>>>> only be better than the approach in FLIP-309
> >> when
> >> > > > there
> >> > > > > > is
> >> > > > > > > at
> >> > > > > > > > > least
> >> > > > > > > > > > > >>>>>>> 30-minutes worth of backlog in the source.
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> Sure, having a slower checkpointing interval
> >> in
> >> > > this
> >> > > > > > > extreme
> >> > > > > > > > > case
> >> > > > > > > > > > > >>>>> (where
> >> > > > > > > > > > > >>>>>>> there is 30-minutes backlog in the
> >> > > > continous-unbounded
> >> > > > > > > phase)
> >> > > > > > > > > is
> >> > > > > > > > > > > >>>> still
> >> > > > > > > > > > > >>>>>>> useful when this happens. But since this is
> >> the
> >> > > > > un-common
> >> > > > > > > > > case, and
> >> > > > > > > > > > > >>>> the
> >> > > > > > > > > > > >>>>>>> right solution is probably to do capacity
> >> > planning
> >> > > to
> >> > > > > > avoid
> >> > > > > > > > > this from
> >> > > > > > > > > > > >>>>>>> happening in the first place, I am not sure
> >> it is
> >> > > > worth
> >> > > > > > > > > optimizing
> >> > > > > > > > > > > >>>> for
> >> > > > > > > > > > > >>>>>> this
> >> > > > > > > > > > > >>>>>>> case at the cost of regression in the
> bounded
> >> > phase
> >> > > > and
> >> > > > > > the
> >> > > > > > > > > reduced
> >> > > > > > > > > > > >>>>>>> operational predictability for users (e.g.
> >> what
> >> > > > > > > checkpointing
> >> > > > > > > > > > > >>>> interval
> >> > > > > > > > > > > >>>>>>> should I expect at this stage of the job).
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> I think the fundamental issue with this
> >> algorithm
> >> > > is
> >> > > > > that
> >> > > > > > > it
> >> > > > > > > > is
> >> > > > > > > > > > > >>>> applied
> >> > > > > > > > > > > >>>>>> to
> >> > > > > > > > > > > >>>>>>> both the bounded phases and the
> >> > continous_unbounded
> >> > > > > > phases
> >> > > > > > > > > without
> >> > > > > > > > > > > >>>>>> knowing
> >> > > > > > > > > > > >>>>>>> which phase the job is running at. The only
> >> > > > information
> >> > > > > > it
> >> > > > > > > > can
> >> > > > > > > > > access
> >> > > > > > > > > > > >>>>> is
> >> > > > > > > > > > > >>>>>>> the backlog. But two sources with the same
> >> amount
> >> > > of
> >> > > > > > > backlog
> >> > > > > > > > > do not
> >> > > > > > > > > > > >>>>>>> necessarily mean they have the same data
> >> > freshness
> >> > > > > > > > requirement.
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> In this particular example, users know that
> >> the
> >> > > data
> >> > > > in
> >> > > > > > > HDFS
> >> > > > > > > > > is very
> >> > > > > > > > > > > >>>>> old
> >> > > > > > > > > > > >>>>>>> and there is no need for data freshness.
> Users
> >> > can
> >> > > > > > express
> >> > > > > > > > > signals
> >> > > > > > > > > > > >>>> via
> >> > > > > > > > > > > >>>>>> the
> >> > > > > > > > > > > >>>>>>> per-source API proposed in the FLIP. This is
> >> why
> >> > > the
> >> > > > > > > current
> >> > > > > > > > > approach
> >> > > > > > > > > > > >>>>> in
> >> > > > > > > > > > > >>>>>>> FLIP-309 can be better in this case.
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> What do you think?
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>> Best,
> >> > > > > > > > > > > >>>>>>> Dong
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>> Best,
> >> > > > > > > > > > > >>>>>>>> Piotrek
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > >>>>>
> >> > > > > > > > > > > >>>>
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >>
> >> > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
>

Reply via email to