Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-18 Thread Leonard Xu
+1 for interval-during-backlog best, leonard > On Jul 14, 2023, at 11:38 PM, Piotr Nowojski wrote: > > Hi All, > > We had a lot of off-line discussions. As a result I would suggest dropping > the idea of introducing an end-to-end-latency concept, until > we can properly implement it, which

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-18 Thread Piotr Nowojski
Thanks Dong! Piotrek wt., 18 lip 2023 o 06:04 Dong Lin napisał(a): > Hi all, > > I have updated FLIP-309 as suggested by Piotr to include a reference to > FLIP-328 in the future work section. > > Piotra, Stephan, and I discussed offline regarding the choice > between

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-17 Thread Dong Lin
Hi all, I have updated FLIP-309 as suggested by Piotr to include a reference to FLIP-328 in the future work section. Piotra, Stephan, and I discussed offline regarding the choice between execution.checkpointing.max-interval and execution.checkpointing.interval-during-backlog. The advantage of

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-14 Thread Piotr Nowojski
Hi All, We had a lot of off-line discussions. As a result I would suggest dropping the idea of introducing an end-to-end-latency concept, until we can properly implement it, which will require more designing and experimenting. I would suggest starting with a more manual solution, where the user

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-11 Thread Dong Lin
Hi Piotr, I think I understand your motivation for suggeseting execution.slow-end-to-end-latency now. Please see my followup comments (after the previous email) inline. On Wed, Jul 12, 2023 at 12:32 AM Piotr Nowojski wrote: > Hi Dong, > > Thanks for the updates, a couple of comments: > > > If

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-11 Thread Dong Lin
Hi Piotr, Thanks for the comments. Please see my reply inline. On Wed, Jul 12, 2023 at 12:32 AM Piotr Nowojski wrote: > Hi Dong, > > Thanks for the updates, a couple of comments: > > > If a record is generated by a source when the source's > isProcessingBacklog is true, or some of the records

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-11 Thread Piotr Nowojski
Hi Dong, Thanks for the updates, a couple of comments: > If a record is generated by a source when the source's isProcessingBacklog is true, or some of the records used to > derive this record (by an operator) has isBacklog = true, then this record should have isBacklog = true. Otherwise, > this

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-11 Thread Dong Lin
Hi Piotr and everyone, I have documented the vision with a summary of the existing work in this doc. Please feel free to review/comment/edit this doc. Looking forward to working with you together in this line of work.

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-11 Thread Dong Lin
Hi Piotr, Thank you for all the discussions! I will ask for a meeting in the future when we have prolonged discussions like this :) Please see my comments inline. BTW, I am hoping we can make this feature available in Flink 1.18, which will feature freeze soon on July 24. If this FLIP looks

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-10 Thread Piotr Nowojski
Hi All, Me and Dong chatted offline about the above mentioned issues (thanks for that offline chat I think it helped both of us a lot). The summary is below. > Previously, I thought you meant to add a generic logic in SourceReaderBase > to read existing metrics (e.g. backpressure) and emit the >

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-05 Thread Dong Lin
Hi Piotr, I am sorry if you feel unhappy or upset with us for not following/fixing your proposal. It is not my intention to give you this feeling. After all, we are all trying to make Flink better, to support more use-case with the most maintainable code. I hope you understand that just like you,

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-05 Thread Piotr Nowojski
Hi Guys, I would like to ask you again, to spend a bit more effort on trying to find solutions, not just pointing out problems. For 1.5 months, the discussion doesn't go in circle, but I'm suggesting a solution, you are trying to undermine it with some arguments, I'm coming back with a fix, often

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-05 Thread Hang Ruan
Hi, Piotr & Dong. Thanks for the discussion. IMO, I do not think the provided counter proposal is a good idea. There are some concerns from my side. 1. It is hard to find the error checkpoint. If there are other errors causing the checkpoint failure, we have to check every failed checkpoint to

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-04 Thread Dong Lin
Hi Piotr, Any suggestion on how we can practically move forward to address the target use-case? My understanding is that the current proposal does not have any correctness/performance issues. And it allows the extension to support all the extra use-case without having to throw away the proposed

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-03 Thread Dong Lin
Hi Piotr, Please see my comments inline. On Mon, Jul 3, 2023 at 5:19 PM Piotr Nowojski wrote: > Hi Dong, > > Starting from the end: > > > It seems that the only benefit of this approach is to avoid" > > adding SplitEnumeratorContext#setIsProcessingBacklog." > > Yes, that's the major benefit of

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-03 Thread Piotr Nowojski
Hi Dong, Starting from the end: > It seems that the only benefit of this approach is to avoid" > adding SplitEnumeratorContext#setIsProcessingBacklog." Yes, that's the major benefit of this counter-proposal. > In the target use-case, user still want to do checkpoint (though at a" > larger

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-01 Thread Dong Lin
Hi Piotr, Thank you for providing further suggestions to help improve the API. Please see my comments inline. On Fri, Jun 30, 2023 at 10:35 PM Piotr Nowojski wrote: > Hey, > > Sorry for a late reply, I was OoO for a week. I have three things to point > out. > > 1. === > > The

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-30 Thread Piotr Nowojski
Hey, Sorry for a late reply, I was OoO for a week. I have three things to point out. 1. === The updated proposal is indeed better, but to be honest I still don't like it, for mostly the same reasons that I have mentioned earlier: - only a partial solution, that doesn't address all

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-28 Thread feng xiangyu
Hi Dong, Thanks for your quick reply. I think this has truly solved our problem and will enable us upgrade our existing jobs more seamless. Best, Xiangyu Dong Lin 于2023年6月29日周四 10:50写道: > Hi Feng, > > Thanks for the feedback. Yes, you can configure the >

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-28 Thread Dong Lin
Thanks everyone (and specifically Piotr) for your valuable suggestions and review! We will open the voting thread for this FLIP. We hope to make this feature available in Flink 1.18 release, which will feature freeze on July 11. Piotr: we will create a followup FLIP (probably in FLIP-328

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-28 Thread Dong Lin
Hi Feng, Thanks for the feedback. Yes, you can configure the execution.checkpointing.interval-during-backlog to effectively disable checkpoint during backlog. Prior to your comment, the FLIP allows users to do this by setting the config value to something large (e.g. 365 day). After thinking

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-28 Thread feng xiangyu
Hi Dong and Yunfeng, Thanks for the proposal, your flip sounds very useful from my perspective. In our business, when we using hybrid source in production we also met the problem described in your flip. In our solution, we tend to skip making any checkpoints before all batch tasks have finished

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-28 Thread Hang Ruan
Thanks for Dong and Yunfeng's work. The FLIP looks good to me. This new version is clearer to understand. Best, Hang Dong Lin 于2023年6月27日周二 16:53写道: > Thanks Jack, Jingsong, and Zhu for the review! > > Thanks Zhu for the suggestion. I have updated the configuration name as > suggested. > > On

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-27 Thread Dong Lin
Thanks Jack, Jingsong, and Zhu for the review! Thanks Zhu for the suggestion. I have updated the configuration name as suggested. On Tue, Jun 27, 2023 at 4:45 PM Zhu Zhu wrote: > Thanks Dong and Yunfeng for creating this FLIP and driving this discussion. > > The new design looks generally good

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-27 Thread Zhu Zhu
Thanks Dong and Yunfeng for creating this FLIP and driving this discussion. The new design looks generally good to me. Increasing the checkpoint interval when the job is processing backlogs is easier for users to understand and can help in more scenarios. I have one comment about the new

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-27 Thread Jingsong Li
Looks good to me! Thanks Dong, Yunfeng and all for your discussion and design. Best, Jingsong On Tue, Jun 27, 2023 at 3:35 PM Jark Wu wrote: > > Thank you Dong for driving this FLIP. > > The new design looks good to me! > > Best, > Jark > > > 2023年6月27日 14:38,Dong Lin 写道: > > > > Thank you

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-27 Thread Jark Wu
Thank you Dong for driving this FLIP. The new design looks good to me! Best, Jark > 2023年6月27日 14:38,Dong Lin 写道: > > Thank you Leonard for the review! > > Hi Piotr, do you have any comments on the latest proposal? > > I am wondering if it is OK to start the voting thread this week. > >

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-27 Thread Dong Lin
Thank you Leonard for the review! Hi Piotr, do you have any comments on the latest proposal? I am wondering if it is OK to start the voting thread this week. On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu wrote: > Thanks Dong for driving this FLIP forward! > > Introducing `backlog status`

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-26 Thread Leonard Xu
Thanks Dong for driving this FLIP forward! Introducing `backlog status` concept for flink job makes sense to me as following reasons: From concept/API design perspective, it’s more general and natural than above proposals as it can be used in HybridSource for bounded records, CDC Source for

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-21 Thread Dong Lin
Hi Piotr, Thanks again for proposing the isProcessingBacklog concept. After discussing with Becket Qin and thinking about this more, I agree it is a better idea to add a top-level concept to all source operators to address the target use-case. The main reason that changed my mind is that

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-16 Thread Dong Lin
Hi Piotr, Thanks for the reply. Please see my comments inline. On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski wrote: > Hi Dong, > > > Suppose there are 1000 subtask and each subtask has 1% chance of being > > "backpressured" at a given time (due to random traffic spikes). Then at > any > >

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-16 Thread Piotr Nowojski
Hi Dong, > Suppose there are 1000 subtask and each subtask has 1% chance of being > "backpressured" at a given time (due to random traffic spikes). Then at any > given time, the chance of the job > being considered not-backpressured = (1-0.01)^1000. Since we evaluate the > backpressure metric

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-15 Thread Dong Lin
Hi again Piotr, Thank you for the reply. Please see my reply inline. On Fri, Jun 16, 2023 at 12:11 AM Piotr Nowojski wrote: > Hi again Dong, > > > I understand that JM will get the backpressure-related metrics every time > > the RestServerEndpoint receives the REST request to get these

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-15 Thread Piotr Nowojski
Hi again Dong, > I understand that JM will get the backpressure-related metrics every time > the RestServerEndpoint receives the REST request to get these metrics. But > I am not sure if RestServerEndpoint is already always receiving the REST > metrics at regular interval (suppose there is no

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-02 Thread Dong Lin
Hi Piotr, Thanks for the explanations. I have some followup questions below. On Fri, Jun 2, 2023 at 10:55 PM Piotr Nowojski wrote: > Hi All, > > Thanks for chipping in the discussion Ahmed! > > Regarding using the REST API. Currently I'm leaning towards implementing > this feature inside the

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-02 Thread Piotr Nowojski
Hi All, Thanks for chipping in the discussion Ahmed! Regarding using the REST API. Currently I'm leaning towards implementing this feature inside the Flink itself, via some pluggable interface. REST API solution would be tempting, but I guess not everyone is using Flink Kubernetes Operator.

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-02 Thread Ahmed Hamdy
Hi Dong, Thanks for the quick reply and for clarification, yeah that makes sense! Best Regards, Ahmed Hamdy On Fri, 2 Jun 2023 at 02:59, Dong Lin wrote: > Hi Ahmed, > > Thanks for the comments. > > I agree with you and Piotr that it would be useful to provide a more > generic approach to

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-01 Thread Dong Lin
Hi Ahmed, Thanks for the comments. I agree with you and Piotr that it would be useful to provide a more generic approach to address more use-case in one proposal. On the other hand, I also think it is important to make sure that the alternative (more generic) approach can indeed address the

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-01 Thread Ahmed Hamdy
Hi Dong, Thanks for the great proposal. The thread is very intuitive along with suggestions from Jing and Piotr. As much as I like the simplicity of the proposed approach I think a much wider benefit is achieved by taking a more generic approach similar to Piotr's suggestion of having a

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-31 Thread Dong Lin
Hi Piotr, Thanks for the reply. Please see my comments inline. On Wed, May 31, 2023 at 12:58 AM Piotr Nowojski wrote: > Hi Dong, > > First of all we don't need to send any extra signal from source (or non > source) operators. All of the operators are already reporting backpressured > metrics

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-30 Thread Piotr Nowojski
Hi Dong, First of all we don't need to send any extra signal from source (or non source) operators. All of the operators are already reporting backpressured metrics [1] and all of the metrics are already sent to JobManager. We would only need to pass some accessor to the metrics to the

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-30 Thread Jing Ge
Hi Piotr, > But why do we need to have two separate mechanisms, if the dynamic > adjustment based on the backpressure/backlog would > achieve basically the same goal as your proposal and would solve both of > the problems? Having two independent solutions > in the same codebase, in the docs,

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-30 Thread Dong Lin
Hi Piotr, Thank you for providing those details. I understand you suggested using the existing "isBackPressured" signal to determine whether we should use the less frequent checkpointing interval. I followed your thoughts and tried to make it work. Below are the issues that I am not able to

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-30 Thread Piotr Nowojski
Hi again, Thanks Dong, yes I think your concerns are valid, and that's why I have previously refined my idea to use one of the backpressure measuring metrics that we already have. Either simply `isBackPressured == true` check [1], or `backPressuredTimeMsPerSecond >= N` (where `N ~= 990`) [2].

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-29 Thread Dong Lin
Let me correct the typo in the last paragraph as below: To make the problem even harder, the incoming traffic can be spiky. And the overhead of triggering checkpointing can be relatively low, in which case it might be more performant (w.r.t. e2e lag) for the Flink job to checkpoint at the more

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-29 Thread Dong Lin
Hi Piotrek, Thanks for providing more details of the alternative approach! If I understand your proposal correctly, here are the requirements for it to work without incurring any regression: 1) The source needs a way to determine whether there exists backpressure. 2) If there is backpressure,

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-29 Thread Piotr Nowojski
Hi @Jing > Your proposal to dynamically adjust the checkpoint intervals is elegant! It > makes sense to build it as a generic feature in Flink. Looking forward to > it. However, for some user cases, e.g. when users were aware of the bounded > sources (in the HybridSource) and care more about the

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-25 Thread Jing Ge
Hi Dong, Hi Piotr, Thanks for the clarification. @Dong According to the code examples in the FLIP, I thought we are focusing on the HybridSource scenario. With the current HybridSource implementation, we don't even need to know the boundedness of sources in the HybridSource, since all sources

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-25 Thread Dong Lin
Hi Piotr, Thanks for the discussion. Please see my comments inline. On Thu, May 25, 2023 at 6:34 PM Piotr Nowojski wrote: > Hi all, > > Thanks for the discussion. > > @Dong > > > In the target use-case, we would like to HybridSource to trigger> > checkpoint more frequently when it is read the

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-25 Thread Piotr Nowojski
Hi all, Thanks for the discussion. @Dong > In the target use-case, we would like to HybridSource to trigger> checkpoint more frequently when it is read the Kafka Source (than when it > is reading the HDFS source). We would need to set a flag for the checkpoint > trigger to know which source the

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-24 Thread Dong Lin
Hi Jing, Thanks for your comments! Regarding the idea of using the existing "boundedness" attribute of sources, that is indeed something that we might find intuitive initially. I have thought about this idea, but could not find a good way to make it work. I will try to explain my thoughts and

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-23 Thread Jing Ge
Hi Yunfeng, Hi Dong Thanks for the informative discussion! It is a rational requirement to set different checkpoint intervals for different sources in a hybridSource. The tiny downside of this proposal, at least for me, is that I have to understand the upper-bound definition of the interval and

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-23 Thread Dong Lin
Hi Piotr, Thanks for the comments. Let me try to understand your concerns and hopefully address the concerns. >> What would happen if there are two (or more) operator coordinators with conflicting desired checkpoint trigger behaviour With the proposed change, there won't exist any

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-23 Thread Piotr Nowojski
Hi, Thanks for the proposal. However, are you sure that the OperatorCoordinator is the right place to place such logic? What would happen if there are two (or more) operator coordinators with conflicting desired checkpoint trigger behaviour? If one source is processing a backlog and the other is

[DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-09 Thread Yunfeng Zhou
Hi all, Dong(cc'ed) and I are opening this thread to discuss our proposal to support dynamically triggering checkpoints from operators, which has been documented in FLIP-309 . With the help of the ability proposed in