+1 for interval-during-backlog

best,
leonard

> On Jul 14, 2023, at 11:38 PM, Piotr Nowojski <piotr.nowoj...@gmail.com> wrote:
> 
> Hi All,
> 
> We had a lot of off-line discussions. As a result I would suggest dropping
> the idea of introducing an end-to-end-latency concept, until
> we can properly implement it, which will require more designing and
> experimenting. I would suggest starting with a more manual solution,
> where the user needs to configure concrete parameters, like
> `execution.checkpointing.max-interval` or `execution.flush-interval`.
> 
> FLIP-309 looks good to me, I would just rename
> `execution.checkpointing.interval-during-backlog` to
> `execution.checkpointing.max-interval`.
> 
> I would also reference future work, that a solution that would allow set
> `isProcessingBacklog` for sources like Kafka will be introduced via
> FLIP-328 [1].
> 
> Best,
> Piotrek
> 
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> 
> śr., 12 lip 2023 o 03:49 Dong Lin <lindon...@gmail.com> napisał(a):
> 
>> Hi Piotr,
>> 
>> I think I understand your motivation for suggeseting
>> execution.slow-end-to-end-latency now. Please see my followup comments
>> (after the previous email) inline.
>> 
>> On Wed, Jul 12, 2023 at 12:32 AM Piotr Nowojski <pnowoj...@apache.org>
>> wrote:
>> 
>>> Hi Dong,
>>> 
>>> Thanks for the updates, a couple of comments:
>>> 
>>>> If a record is generated by a source when the source's
>>> isProcessingBacklog is true, or some of the records used to
>>>> derive this record (by an operator) has isBacklog = true, then this
>>> record should have isBacklog = true. Otherwise,
>>>> this record should have isBacklog = false.
>>> 
>>> nit:
>>> I think this conflicts with "Rule of thumb for non-source operators to
>> set
>>> isBacklog = true for the records it emits:"
>>> section later on, when it comes to a case if an operator has mixed
>>> isBacklog = false and isBacklog = true inputs.
>>> 
>>>> execution.checkpointing.interval-during-backlog
>>> 
>>> Do we need to define this as an interval config parameter? Won't that add
>>> an option that will be almost instantly deprecated
>>> because what we actually would like to have is:
>>> execution.slow-end-to-end-latency and execution.end-to-end-latency
>>> 
>> 
>> I guess you are suggesting that we should allow users to specify a higher
>> end-to-end latency budget for those records that are emitted by two-phase
>> commit sink, than those records that are emitted by none-two-phase commit
>> sink.
>> 
>> My concern with this approach is that it will increase the complexity of
>> the definition of "processing latency requirement", as well as the
>> complexity of the Flink runtime code that handles it. Currently, the
>> FLIP-325 defines end-to-end latency as an attribute of the records that is
>> statically assigned when the record is generated at the source, regardless
>> of how it will be emitted later in the topology. If we make the changes
>> proposed above, we would need to define the latency requirement w.r.t. the
>> attribute of the operators that it travels through before its result is
>> emitted, which is less intuitive and more complex.
>> 
>> For now, it is not clear whether it is necessary to have two categories of
>> latency requirement for the same job. Maybe it is reasonable to assume that
>> if a job has two-phase commit sink and the user is OK to emit some results
>> at 1 minute interval, then more likely than not the user is also OK to emit
>> all results at 1 minute interval, include those that go through
>> none-two-phase commit sink?
>> 
>> If we do want to support different end-to-end latency depending on whether
>> the operator is emitted by two-phase commit sink, I would prefer to still
>> use execution.checkpointing.interval-during-backlog instead of
>> execution.slow-end-to-end-latency. This allows us to keep the concept of
>> end-to-end latency simple. Also, by explicitly including "checkpointing
>> interval" in the name of the config that directly affects checkpointing
>> interval, we can make it easier and more intuitive for users to understand
>> the impact and set proper value for such configs.
>> 
>> What do you think?
>> 
>> Best,
>> Dong
>> 
>> 
>>> Maybe we can introduce only `execution.slow-end-to-end-latency` (% maybe
>> a
>>> better name), and for the time being
>>> use it as the checkpoint interval value during backlog?
>> 
>> 
>>> Or do you envision that in the future users will be configuring only:
>>> - execution.end-to-end-latency
>>> and only optionally:
>>> - execution.checkpointing.interval-during-backlog
>>> ?
>>> 
>>> Best Piotrek
>>> 
>>> PS, I will read the summary that you have just published later, but I
>> think
>>> we don't need to block this FLIP on the
>>> existence of that high level summary.
>>> 
>>> wt., 11 lip 2023 o 17:49 Dong Lin <lindon...@gmail.com> napisał(a):
>>> 
>>>> Hi Piotr and everyone,
>>>> 
>>>> I have documented the vision with a summary of the existing work in
>> this
>>>> doc. Please feel free to review/comment/edit this doc. Looking forward
>> to
>>>> working with you together in this line of work.
>>>> 
>>>> 
>>>> 
>>> 
>> https://docs.google.com/document/d/1CgxXvPdAbv60R9yrrQAwaRgK3aMAgAL7RPPr799tOsQ/edit?usp=sharing
>>>> 
>>>> Best,
>>>> Dong
>>>> 
>>>> On Tue, Jul 11, 2023 at 1:07 AM Piotr Nowojski <
>> piotr.nowoj...@gmail.com
>>>> 
>>>> wrote:
>>>> 
>>>>> Hi All,
>>>>> 
>>>>> Me and Dong chatted offline about the above mentioned issues (thanks
>>> for
>>>>> that offline chat
>>>>> I think it helped both of us a lot). The summary is below.
>>>>> 
>>>>>> Previously, I thought you meant to add a generic logic in
>>>>> SourceReaderBase
>>>>>> to read existing metrics (e.g. backpressure) and emit the
>>>>>> IsProcessingBacklogEvent to SourceCoordinator. I am sorry if I have
>>>>>> misunderstood your suggetions.
>>>>>> 
>>>>>> After double-checking your previous suggestion, I am wondering if
>> you
>>>> are
>>>>>> OK with the following approach:
>>>>>> 
>>>>>> - Add a job-level config
>>>> execution.checkpointing.interval-during-backlog
>>>>>> - Add an API SourceReaderContext#setProcessingBacklog(boolean
>>>>>> isProcessingBacklog).
>>>>>> - When this API is invoked, it internally sends an
>>>>>> internal SourceReaderBacklogEvent to SourceCoordinator.
>>>>>> - SourceCoordinator should keep track of the latest
>>> isProcessingBacklog
>>>>>> status from all its subtasks. And for now, we will hardcode the
>> logic
>>>>> such
>>>>>> that if any source reader says it is under backlog, then
>>>>>> execution.checkpointing.interval-during-backlog is used.
>>>>>> 
>>>>>> This approach looks good to me as it can achieve the same
>> performance
>>>>> with
>>>>>> the same number of public APIs for the target use-case. And I
>> suppose
>>>> in
>>>>>> the future we might be able to re-use this API for source reader to
>>> set
>>>>> its
>>>>>> backlog status based on its backpressure metrics, which could be an
>>>> extra
>>>>>> advantage over the current approach.
>>>>>> 
>>>>>> Do you think we can agree to adopt the approach described above?
>>>>> 
>>>>> Yes, I think that's a viable approach. I would be perfectly fine to
>> not
>>>>> introduce
>>>>> `SourceReaderContext#setProcessingBacklog(boolean
>>> isProcessingBacklog).`
>>>>> and sending the `SourceReaderBacklogEvent` from SourceReader to JM
>>>>> in this FLIP. It could be implemented once we would decide to add
>> some
>>>> more
>>>>> generic
>>>>> ways of detecting backlog/backpressure on the SourceReader level.
>>>>> 
>>>>> I think we could also just keep the current proposal of adding
>>>>> `SplitEnumeratorContext#setIsProcessingBacklog`, and use it in the
>>>> sources
>>>>> that
>>>>> can set it on the `SplitEnumerator` level. Later we could merge this
>>> with
>>>>> another
>>>>> mechanisms of detecting "isProcessingBacklog", like based on
>> watermark
>>>> lag,
>>>>> backpressure, etc, via some component running on the JM.
>>>>> 
>>>>> At the same time I'm fine with having the "isProcessingBacklog"
>> concept
>>>> to
>>>>> switch
>>>>> runtime back and forth between high and low latency modes instead of
>>>>> "backpressure". In FLIP-325 I have asked:
>>>>> 
>>>>>> I think there is one thing that hasn't been discussed neither here
>>> nor
>>>> in
>>>>> FLIP-309. Given that we have
>>>>>> three dimensions:
>>>>>> - e2e latency/checkpointing interval
>>>>>> - enabling some kind of batching/buffering on the operator level
>>>>>> - how much resources we want to allocate to the job
>>>>>> 
>>>>>> How do we want Flink to adjust itself between those three? For
>>> example:
>>>>>> a) Should we assume that given Job has a fixed amount of assigned
>>>>> resources and make it paramount that
>>>>>>  Flink doesn't exceed those available resources? So in case of
>>>>> backpressure, we
>>>>>>  should extend checkpointing intervals, emit records less
>> frequently
>>>> and
>>>>> in batches.
>>>>>> b) Or should we assume that the amount of resources is flexible (up
>>> to
>>>> a
>>>>> point?), and the desired e2e latency
>>>>>>  is the paramount aspect? So in case of backpressure, we should
>>> still
>>>>> adhere to the configured e2e latency,
>>>>>>  and wait for the user or autoscaler to scale up the job?
>>>>>> 
>>>>>> In case of a), I think the concept of "isProcessingBacklog" is not
>>>>> needed, we could steer the behaviour only
>>>>>> using the backpressure information.
>>>>>> 
>>>>>> On the other hand, in case of b), "isProcessingBacklog" information
>>>> might
>>>>> be helpful, to let Flink know that
>>>>>> we can safely decrease the e2e latency/checkpoint interval even if
>>>> there
>>>>> is no backpressure, to use fewer
>>>>>> resources (and let the autoscaler scale down the job).
>>>>>> 
>>>>>> Do we want to have both, or only one of those? Do a) and b)
>>> complement
>>>>> one another? If job is backpressured,
>>>>>> we should follow a) and expose to autoscaler/users information
>> "Hey!
>>>> I'm
>>>>> barely keeping up! I need more resources!".
>>>>>> While, when there is no backpressure and latency doesn't matter
>>>>> (isProcessingBacklog=true), we can limit the resource
>>>>>> usage
>>>>> 
>>>>> After thinking this over:
>>>>> - the case that we don't have "isProcessingBacklog" information, but
>>> the
>>>>> source operator is
>>>>>  back pressured, must be intermittent. EIther back pressure will go
>>>> away,
>>>>> or shortly we should
>>>>>  reach the "isProcessingBacklog" state anyway
>>>>> - and even if we implement some back pressure detecting algorithm to
>>>> switch
>>>>> the runtime into the
>>>>>  "high latency mode", we can always report that as
>>> "isProcessingBacklog"
>>>>> anyway, as runtime should
>>>>>   react the same way in both cases (backpressure and
>>>> "isProcessingBacklog
>>>>> states).
>>>>> 
>>>>> ===============
>>>>> 
>>>>> With a common understanding of the final solution that we want to
>> have
>>> in
>>>>> the future, I'm pretty much fine with the current
>>>>> FLIP-309 proposal, with a couple of remarks:
>>>>> 1. Could you include in the FLIP-309 the long term solution as we
>> have
>>>>> discussed.
>>>>>        a) Would be nice to have some diagram showing how the
>>>>> "isProcessingBacklog" information would be travelling,
>>>>>             being aggregated and what will be done with that
>>>> information.
>>>>> (from SourceReader/SplitEnumerator to some
>>>>>            "component" aggregating it, and then ... ?)
>>>>> 2. For me "processing backlog" doesn't necessarily equate to
>>>> "backpressure"
>>>>> (HybridSource can be
>>>>>    both NOT backpressured and processing backlog at the same time).
>> If
>>>> you
>>>>> think the same way, can you include that
>>>>>    definition of "processing backlog" in the FLIP including its
>>> relation
>>>>> to the backpressure state? If not, we need to align
>>>>>    on that definition first :)
>>>>> 
>>>>> Also I'm missing a big picture description, that would show what are
>>> you
>>>>> trying to achieve and what's the overarching vision
>>>>> behind all of the current and future FLIPs that you are planning in
>>> this
>>>>> area (FLIP-309, FLIP-325, FLIP-327, FLIP-331, ...?).
>>>>> Or was it described somewhere and I've missed it?
>>>>> 
>>>>> Best,
>>>>> Piotrek
>>>>> 
>>>>> 
>>>>> 
>>>>> czw., 6 lip 2023 o 06:25 Dong Lin <lindon...@gmail.com> napisał(a):
>>>>> 
>>>>>> Hi Piotr,
>>>>>> 
>>>>>> I am sorry if you feel unhappy or upset with us for not
>>>> following/fixing
>>>>>> your proposal. It is not my intention to give you this feeling.
>> After
>>>>> all,
>>>>>> we are all trying to make Flink better, to support more use-case
>> with
>>>> the
>>>>>> most maintainable code. I hope you understand that just like you, I
>>>> have
>>>>>> also been doing my best to think through various design options and
>>>>> taking
>>>>>> time to evalute the pros/cons. Eventually, we probably still need
>> to
>>>>> reach
>>>>>> consensus by clearly listing and comparing the objective pros/cons
>> of
>>>>>> different proposals and identifying the best choice.
>>>>>> 
>>>>>> Regarding your concern (or frustration) that we are always finding
>>>> issues
>>>>>> in your proposal, I would say it is normal (and probably necessary)
>>> for
>>>>>> developers to find pros/cons in each other's solutions, so that we
>>> can
>>>>>> eventually pick the right one. I will appreciate anyone who can
>>>> correctly
>>>>>> pinpoint the concrete issue in my proposal so that I can improve it
>>> or
>>>>>> choose an alternative solution.
>>>>>> 
>>>>>> Regarding your concern that we are not spending enough effort to
>> find
>>>>>> solutions and that the problem in your solution can be solved in a
>>>>> minute,
>>>>>> I would like to say that is not true. For each of your previous
>>>>> proposals,
>>>>>> I typically spent 1+ hours thinking through your proposal to
>>> understand
>>>>>> whether it works and why it does not work, and another 1+ hour to
>>> write
>>>>>> down the details and explain why it does not work. And I have had a
>>>>> variety
>>>>>> of offline discussions with my colleagues discussing various
>>> proposals
>>>>>> (including yours) with 6+ hours in total. Maybe I am not capable
>>> enough
>>>>> to
>>>>>> fix those issues in one minute or so so. If you think your proposal
>>> can
>>>>> be
>>>>>> easily fixed in one minute or so, I would really appreciate it if
>> you
>>>> can
>>>>>> think through your proposal and fix it in the first place :)
>>>>>> 
>>>>>> For your information, I have had several long discussions with my
>>>>>> colleagues at Alibaba and also Becket on this FLIP. We have
>> seriously
>>>>>> considered your proposals and discussed in detail what are the
>>>> pros/cons
>>>>>> and whether we can improve these solutions. The initial version of
>>> this
>>>>>> FLIP (which allows the source operator to specify checkpoint
>>> intervals)
>>>>>> does not get enough support due to concerns of not being generic
>>> (i.e.
>>>>>> users need to specify checkpoint intervals on a per-source basis).
>> It
>>>> is
>>>>>> only after I updated the FLIP to use the job-level
>>>>>> execution.checkpointing.interval-during-backlog, then they agree to
>>>> give
>>>>> +1
>>>>>> to the FLIP. What I want to tell you is that your suggestions have
>>> been
>>>>>> taken seriously, and the quality of the FLIP has been taken
>> seriously
>>>>>> by all those who have voted. As a result of taking your suggestion
>>>>>> seriously and trying to find improvements, we updated the FLIP to
>> use
>>>>>> isProcessingBacklog.
>>>>>> 
>>>>>> I am wondering, do you think it will be useful to discuss
>>> face-to-face
>>>>> via
>>>>>> video conference call? It is not just between you and me. We can
>>> invite
>>>>> the
>>>>>> developers who are interested to join and help with the discussion.
>>>> That
>>>>>> might improve communication efficiency and help us understand each
>>>> other
>>>>>> better :)
>>>>>> 
>>>>>> I am writing this long email to hopefully get your understanding. I
>>>> care
>>>>>> much more about the quality of the eventual solution rather than
>> who
>>>>>> proposed the solution. Please bear with me and see my comments
>>> inline,
>>>>> with
>>>>>> an explanation of the pros/cons of these proposals.
>>>>>> 
>>>>>> 
>>>>>> On Wed, Jul 5, 2023 at 11:06 PM Piotr Nowojski <
>>>> piotr.nowoj...@gmail.com
>>>>>> 
>>>>>> wrote:
>>>>>> 
>>>>>>> Hi Guys,
>>>>>>> 
>>>>>>> I would like to ask you again, to spend a bit more effort on
>> trying
>>>> to
>>>>>> find
>>>>>>> solutions, not just pointing out problems. For 1.5 months,
>>>>>>> the discussion doesn't go in circle, but I'm suggesting a
>> solution,
>>>> you
>>>>>> are
>>>>>>> trying to undermine it with some arguments, I'm coming
>>>>>>> back with a fix, often an extremely easy one, only for you to try
>>> to
>>>>> find
>>>>>>> yet another "issue". It doesn't bode well, if you are finding
>>>>>>> a "problem" that can be solved with a minute or so of thinking or
>>>> even
>>>>>> has
>>>>>>> already been solved.
>>>>>>> 
>>>>>>> I have provided you so far with at least three distinct solutions
>>>> that
>>>>>>> could address your exact target use-case. Two [1][2] generic
>>>>>>> enough to be probably good enough for the foreseeable future, one
>>>>>>> intermediate and not generic [3] but which wouldn't
>>>>>>> require @Public API changes or some custom hidden interfaces.
>>>>>> 
>>>>>> 
>>>>>>> All in all:
>>>>>>> - [1] with added metric hints like "isProcessingBacklog" solves
>>> your
>>>>>> target
>>>>>>> use case pretty well. Downside is having to improve
>>>>>>>  how JM is collecting/aggregating metrics
>>>>>>> 
>>>>>> 
>>>>>> Here is my analysis of this proposal compared to the current
>> approach
>>>> in
>>>>>> the FLIP-309.
>>>>>> 
>>>>>> pros:
>>>>>> - No need to add the public API
>>>>>> SplitEnumeratorContext#setIsProcessingBacklog.
>>>>>> cons:
>>>>>> - Need to add a public API that subclasses of SourceReader can use
>> to
>>>>>> specify its IsProcessingBacklog metric value.
>>>>>> - Source Coordinator needs to periodically pull the
>>> isProcessingBacklog
>>>>>> metrics from all TMs throughout the job execution.
>>>>>> 
>>>>>> Here is why I think the cons outweigh the pros:
>>>>>> 1) JM needs to collect/aggregate metrics with extra runtime
>> overhead,
>>>>> which
>>>>>> is not necessary for the target use-case with the push-based
>> approach
>>>> in
>>>>>> FLIP-309.
>>>>>> 2) For the target use-case, it is simpler and more intuitive for
>>> source
>>>>>> operators (e.g. HybridSource, MySQL CDC source) to be able to set
>> its
>>>>>> isProcessingBacklog status in the SplitEnumerator. This is because
>>> the
>>>>>> switch between bounded/unbounded stages happens in their
>>>> SplitEnumerator.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>> - [2] is basically an equivalent of [1], replacing metrics with
>>>> events.
>>>>>> It
>>>>>>> also is a superset of your proposal
>>>>>>> 
>>>>>> 
>>>>>> Previously, I thought you meant to add a generic logic in
>>>>> SourceReaderBase
>>>>>> to read existing metrics (e.g. backpressure) and emit the
>>>>>> IsProcessingBacklogEvent to SourceCoordinator. I am sorry if I have
>>>>>> misunderstood your suggetions.
>>>>>> 
>>>>>> After double-checking your previous suggestion, I am wondering if
>> you
>>>> are
>>>>>> OK with the following approach:
>>>>>> 
>>>>>> - Add a job-level config
>>>> execution.checkpointing.interval-during-backlog
>>>>>> - Add an API SourceReaderContext#setProcessingBacklog(boolean
>>>>>> isProcessingBacklog).
>>>>>> - When this API is invoked, it internally sends an
>>>>>> internal SourceReaderBacklogEvent to SourceCoordinator.
>>>>>> - SourceCoordinator should keep track of the latest
>>> isProcessingBacklog
>>>>>> status from all its subtasks. And for now, we will hardcode the
>> logic
>>>>> such
>>>>>> that if any source reader says it is under backlog, then
>>>>>> execution.checkpointing.interval-during-backlog is used.
>>>>>> 
>>>>>> This approach looks good to me as it can achieve the same
>> performance
>>>>> with
>>>>>> the same number of public APIs for the target use-case. And I
>> suppose
>>>> in
>>>>>> the future we might be able to re-use this API for source reader to
>>> set
>>>>> its
>>>>>> backlog status based on its backpressure metrics, which could be an
>>>> extra
>>>>>> advantage over the current approach.
>>>>>> 
>>>>>> Do you think we can agree to adopt the approach described above?
>>>>>> 
>>>>>> 
>>>>>> - [3] yes, it's hacky, but it's a solution that could be thrown
>> away
>>>> once
>>>>>>> we implement [1] or [2] . The only real theoretical
>>>>>>>  downside is that it cannot control the long checkpoint exactly
>>>> (short
>>>>>>> checkpoint interval has to be a divisor of the long checkpoint
>>>>>>>  interval, but I simply can not imagine a practical use where
>> that
>>>>> would
>>>>>>> be a blocker for a user. Please..., someone wanting to set
>>>>>>>  short checkpoint interval to 3min and long to 7 minutes, and
>> that
>>>>>> someone
>>>>>>> can not accept the long interval to be 9 minutes?
>>>>>>>  And that's even ignoring the fact that if someone has an issue
>>> with
>>>>>> the 3
>>>>>>> minutes checkpoint interval, I can hardly think that merely
>>>>>>>  doubling the interval to 7 minutes would significantly solve
>> any
>>>>>> problem
>>>>>>> for that user.
>>>>>>> 
>>>>>> 
>>>>>> Yes, this is a fabricated example that shows
>>>>>> execution.checkpointing.interval-during-backlog might not be
>>> accurately
>>>>>> enforced with this option. I think you are probably right that it
>>> might
>>>>> not
>>>>>> matter that much. I just think we should try our best to make Flink
>>>>> public
>>>>>> API's semantics (including configuration) clear, simple, and
>>>> enforceable.
>>>>>> If we can make the user-facing configuration enforceable at the
>> cost
>>> of
>>>>> an
>>>>>> extra developer facing API (i.e. setProcessingBacklog(...)), I
>> would
>>>>> prefer
>>>>>> to do this.
>>>>>> 
>>>>>> It seems that we both agree that option [2] is better than [3]. I
>>> will
>>>>> skip
>>>>>> the further comments for this option and we can probably focus on
>>>>>> option [2] :)
>>>>>> 
>>>>>> 
>>>>>>> Dong a long time ago you wrote:
>>>>>>>> Sure. Then let's decide the final solution first.
>>>>>>> 
>>>>>>> Have you thought about that? Maybe I'm wrong but I don't remember
>>> you
>>>>>>> describing in any of your proposals how they could be
>>>>>>> extended in the future, to cover more generic cases. Regardless
>> if
>>>> you
>>>>>>> either don't believe in the generic solution or struggle to
>>>>>>> 
>>>>>> 
>>>>>> Yes, I have thought about the plan to extend the current FLIP to
>>>> support
>>>>>> metrics (e.g. backpressure) based solution you described earlier.
>>>>> Actually,
>>>>>> I mentioned multiple times in the earlier email that your
>> suggestion
>>> of
>>>>>> using metrics is valuable and I will do this in a follow-up FLIP.
>>>>>> 
>>>>>> Here are my comments from the previous email:
>>>>>> - See "I will add follow-up FLIPs to make use of the event-time
>>> metrics
>>>>> and
>>>>>> backpressure metrics" from Jul 3, 2023, 6:39 PM
>>>>>> - See "I agree it is valuable" from Jul 1, 2023, 11:00 PM
>>>>>> - See "we will create a followup FLIP (probably in FLIP-328)" from
>>> Jun
>>>>> 29,
>>>>>> 2023, 11:01 AM
>>>>>> 
>>>>>> Frankly speaking, I think the idea around using the backpressure
>>>> metrics
>>>>>> still needs a bit more thinking before we can propose a FLIP. But I
>>> am
>>>>>> pretty sure we can make use of the watermark/event-time to
>> determine
>>>> the
>>>>>> backlog status.
>>>>>> 
>>>>>> grasp it, if you can come back with something that can be easily
>>>> extended
>>>>>>> in the future, up to a point where one could implement
>>>>>>> something similar to this backpressure detecting algorithm that I
>>>>>> mentioned
>>>>>>> many times before, I would be happy to discuss and
>>>>>>> support it.
>>>>>>> 
>>>>>> 
>>>>>> Here is my idea of extending the source reader to support
>>>>> event-time-based
>>>>>> backlog detecting algorithms:
>>>>>> 
>>>>>> - Add a job-level config such as
>> watermark-lag-threshold-for-backlog.
>>>> If
>>>>>> any source reader determines that the event-timestamp is available
>>> and
>>>>> the
>>>>>> system-time - watermark exceeds this threshold, then the source
>>> reader
>>>>>> considers its isProcessingBacklog=true.
>>>>>> - The source reader can send an event to the source coordinator.
>> Note
>>>>> that
>>>>>> this might be doable in the SourceReaderBase without adding any
>>> public
>>>>> API
>>>>>> which the concrete SourceReader subclass needs to explicitly
>> invoke.
>>>>>> - And in the future if FLIP-325 is accepted, insteading of sending
>>> the
>>>>>> event to SourceCoordinator and let SourceCoordinator inform the
>>>>> checkpoint
>>>>>> coordinator, the source reader might just emit the information as
>>> part
>>>> of
>>>>>> the RecordAttributes and let the two-phase commit sink inform the
>>>>>> checkpoint coordinator.
>>>>>> 
>>>>>> Note that this is a sketch of the idea and it might need further
>>>>>> improvement. I just hope you understand that we have thought about
>>> this
>>>>>> idea and did quite a lot of thinking for these design options. If
>> it
>>> is
>>>>> OK
>>>>>> with you, I hope we can make incremental progress and discuss the
>>>>>> metrics-based solution separately in a follow-up FLIP.
>>>>>> 
>>>>>> Last but not least, thanks for taking so much time to leave
>> comments
>>>> and
>>>>>> help us improve the FLIP. Please kindly bear with us in this
>>>> discussion.
>>>>> I
>>>>>> am looking forward to collaborating with you to find the best
>> design
>>>> for
>>>>>> the target use-cases.
>>>>>> 
>>>>>> Best,
>>>>>> Dong
>>>>>> 
>>>>>> 
>>>>>>> Hang, about your points 1. and 2., do you think those problems
>> are
>>>>>>> insurmountable and blockers for that counter proposal?
>>>>>>> 
>>>>>>>> 1. It is hard to find the error checkpoint.
>>>>>>> 
>>>>>>> No it's not, please take a look at what I exactly proposed and
>>> maybe
>>>> at
>>>>>> the
>>>>>>> code.
>>>>>>> 
>>>>>>>> 2. (...) The failed checkpoint may make them think the job is
>>>>>> unhealthy.
>>>>>>> 
>>>>>>> Please read again what I wrote in [3]. I'm mentioning there a
>>>> solution
>>>>>> for
>>>>>>> this exact "problem".
>>>>>>> 
>>>>>>> About the necessity of the config value, I'm still not convinced
>>>> that's
>>>>>>> needed from the start, but yes we can add some config option
>>>>>>> if you think otherwise. This option, if named properly, could be
>>>>> re-used
>>>>>> in
>>>>>>> the future for different solutions, so that's fine by me.
>>>>>>> 
>>>>>>> Best,
>>>>>>> Piotrek
>>>>>>> 
>>>>>>> [1] Introduced in my very first e-mail from 23 maj 2023, 16:26,
>> and
>>>>>> refined
>>>>>>> later with point "2." in my e-mail from 16 June 2023, 17:58
>>>>>>> [2] Section "2. ===============" in my e-mail from 30 June 2023,
>>>> 16:34
>>>>>>> [3] Section "3. ===============" in my e-mail from 30 June 2023,
>>>> 16:34
>>>>>>> 
>>>>>>> All times in CEST.
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 

Reply via email to