Re: [VOTE] FLIP-331: Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment

2024-02-04 Thread Dong Lin
Thanks for the FLIP.

+1 (binding)

Best,
Dong

On Wed, Jan 31, 2024 at 11:41 AM Xuannan Su  wrote:

> Hi everyone,
>
> Thanks for all the feedback about the FLIP-331: Support
> EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute
> to optimize task deployment [1] [2].
>
> I'd like to start a vote for it. The vote will be open for at least 72
> hours(excluding weekends,until Feb 5, 12:00AM GMT) unless there is an
> objection or an insufficient number of votes.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-331%3A+Support+EndOfStreamTrigger+and+isOutputOnlyAfterEndOfStream+operator+attribute+to+optimize+task+deployment
> [2] https://lists.apache.org/thread/qq39rmg3f23ysx5m094s4c4cq0m4tdj5
>
>
> Best,
> Xuannan
>


Re: Re: [DISCUSS] FLIP-331: Support EndOfStreamWindows and isOutputOnEOF operator attribute to optimize task deployment

2024-01-25 Thread Dong Lin
Thanks Xuannan for the update!

+1 (binding)

On Wed, Jan 10, 2024 at 5:54 PM Xuannan Su  wrote:

> Hi all,
>
> After several rounds of offline discussions with Xingtong and Jinhao,
> we have decided to narrow the scope of the FLIP. It will now focus on
> introducing OperatorAttributes that indicate whether an operator emits
> records only after inputs have ended. We will also use the attribute
> to optimize task scheduling for better resource utilization. Setting
> the backlog status and optimizing the operator implementation during
> the backlog will be deferred to future work.
>
> In addition to the change above, we also make the following changes to
> the FLIP to address the problems mentioned by Dong:
> - Public interfaces are updated to reuse the GlobalWindows.
> - Instead of making all outputs of the upstream operators of the
> "isOutputOnlyAfterEndOfStream=true" operator blocking, we only make
> the output of the operator with "isOutputOnlyAfterEndOfStream=true"
> blocking. This can prevent the second problem Dong mentioned. In the
> future, we may introduce an extra OperatorAttributes to indicate if an
> operator has any side output.
>
> I would greatly appreciate any comment or feedback you may have on the
> updated FLIP.
>
> Best regards,
> Xuannan
>
> On Tue, Sep 26, 2023 at 11:24 AM Dong Lin  wrote:
> >
> > Hi all,
> >
> > Thanks for the review!
> >
> > Becket and I discussed this FLIP offline and we agreed on several things
> > that need to be improved with this FLIP. I will summarize our discussion
> > with the problems and TODOs. We will update the FLIP and let you know
> once
> > the FLIP is ready for review again.
> >
> > 1) Investigate whether it is possible to update the existing
> GlobalWindows
> > in a backward-compatible way and re-use it for the same purpose
> > as EndOfStreamWindows, without introducing EndOfStreamWindows as a new
> > class.
> >
> > Note that GlobalWindows#getDefaultTrigger returns a NeverTrigger instance
> > which will not trigger window's computation even on end-of-inputs. We
> will
> > need to investigate its existing usage and see if we can re-use it in a
> > backward-compatible way.
> >
> > 2) Let JM know whether any operator in the upstream of the operator with
> > "isOutputOnEOF=true" will emit output via any side channel. The FLIP
> should
> > update the execution mode of those operators *only if* all outputs from
> > those operators are emitted only at the end of input.
> >
> > More specifically, the upstream operator might involve a user-defined
> > operator that might emit output directly to an external service, where
> the
> > emission operation is not explicitly expressed as an operator's output
> edge
> > and thus not visible to JM. Similarly, it is also possible for the
> > user-defined operator to register a timer
> > via InternalTimerService#registerEventTimeTimer and emit output to an
> > external service inside Triggerable#onEventTime. There is a chance that
> > users still need related logic to output data in real-time, even if the
> > downstream operators have isOutputOnEOF=true.
> >
> > One possible solution to address this problem is to add an extra
> > OperatorAttribute to specify whether this operator might output records
> in
> > such a way that does not go through operator's output (e.g. side output).
> > Then the JM can safely enable the runtime optimization currently
> described
> > in the FLIP when there is no such operator.
> >
> > 3) Create a follow-up FLIP that allows users to specify whether a source
> > with Boundedness=bounded should have isProcessingBacklog=true.
> >
> > This capability would effectively introduce a 3rd strategy to set backlog
> > status (in addition to FLIP-309 and FLIP-328). It might be useful to note
> > that, even though the data in bounded sources are backlog data in most
> > practical use-cases, it is not necessarily true. For example, users might
> > want to start a Flink job to consume real-time data from a Kafka topic
> and
> > specify that the job stops after 24 hours, which means the source is
> > technically bounded while the data is fresh/real-time.
> >
> > This capability is more generic and can cover more use-case than
> > EndOfStreamWindows. On the other hand, EndOfStreamWindows will still be
> > useful in cases where users already need to specify this window assigner
> in
> > a DataStream program, without bothering users to decide whether it is
> safe
> > to treat data in a bounded source as backlo

Re: Re: Re: [VOTE] Accept Flink CDC into Apache Flink

2024-01-12 Thread Dong Lin
+1 (binding)

On Sat, Jan 13, 2024 at 6:04 AM Austin Bennett  wrote:

> +1 (non-binding)
>
> On Fri, Jan 12, 2024 at 5:44 PM Becket Qin  wrote:
>
> > +1 (binding)
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Fri, Jan 12, 2024 at 5:58 AM Zhijiang  > .invalid>
> > wrote:
> >
> > > +1 (binding)
> > > Best,
> > > Zhijiang
> > > --
> > > From:Kurt Yang 
> > > Send Time:2024年1月12日(星期五) 15:31
> > > To:dev
> > > Subject:Re: Re: Re: [VOTE] Accept Flink CDC into Apache Flink
> > > +1 (binding)
> > > Best,
> > > Kurt
> > > On Fri, Jan 12, 2024 at 2:21 PM Hequn Cheng  wrote:
> > > > +1 (binding)
> > > >
> > > > Thanks,
> > > > Hequn
> > > >
> > > > On Fri, Jan 12, 2024 at 2:19 PM godfrey he 
> > wrote:
> > > >
> > > > > +1 (binding)
> > > > >
> > > > > Thanks,
> > > > > Godfrey
> > > > >
> > > > > Zhu Zhu  于2024年1月12日周五 14:10写道:
> > > > > >
> > > > > > +1 (binding)
> > > > > >
> > > > > > Thanks,
> > > > > > Zhu
> > > > > >
> > > > > > Hangxiang Yu  于2024年1月11日周四 14:26写道:
> > > > > >
> > > > > > > +1 (non-binding)
> > > > > > >
> > > > > > > On Thu, Jan 11, 2024 at 11:19 AM Xuannan Su <
> > suxuanna...@gmail.com
> > > >
> > > > > wrote:
> > > > > > >
> > > > > > > > +1 (non-binding)
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Xuannan
> > > > > > > >
> > > > > > > > On Thu, Jan 11, 2024 at 10:28 AM Xuyang 
> > > > wrote:
> > > > > > > > >
> > > > > > > > > +1 (non-binding)--
> > > > > > > > >
> > > > > > > > > Best!
> > > > > > > > > Xuyang
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > 在 2024-01-11 10:00:11,"Yang Wang"  >
> > > 写道:
> > > > > > > > > >+1 (binding)
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >Best,
> > > > > > > > > >Yang
> > > > > > > > > >
> > > > > > > > > >On Thu, Jan 11, 2024 at 9:53 AM liu ron <
> ron9@gmail.com
> > >
> > > > > wrote:
> > > > > > > > > >
> > > > > > > > > >> +1 non-binding
> > > > > > > > > >>
> > > > > > > > > >> Best
> > > > > > > > > >> Ron
> > > > > > > > > >>
> > > > > > > > > >> Matthias Pohl 
> > > 于2024年1月10日周三
> > > > > > > 23:05写道:
> > > > > > > > > >>
> > > > > > > > > >> > +1 (binding)
> > > > > > > > > >> >
> > > > > > > > > >> > On Wed, Jan 10, 2024 at 3:35 PM ConradJam <
> > > > > jam.gz...@gmail.com>
> > > > > > > > wrote:
> > > > > > > > > >> >
> > > > > > > > > >> > > +1 non-binding
> > > > > > > > > >> > >
> > > > > > > > > >> > > Dawid Wysakowicz 
> > 于2024年1月10日周三
> > > > > > > 21:06写道:
> > > > > > > > > >> > >
> > > > > > > > > >> > > > +1 (binding)
> > > > > > > > > >> > > > Best,
> > > > > > > > > >> > > > Dawid
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > On Wed, 10 Jan 2024 at 11:54, Piotr Nowojski <
> > > > > > > > pnowoj...@apache.org>
> > > > > > > > > >> > > wrote:
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > > +1 (binding)
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > śr., 10 sty 2024 o 11:25 Martijn Visser <
> > > > > > > > martijnvis...@apache.org>
> > > > > > > > > >> > > > > napisał(a):
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > > +1 (binding)
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > On Wed, Jan 10, 2024 at 4:43 AM Xingbo Huang <
> > > > > > > > hxbks...@gmail.com
> > > > > > > > > >> >
> > > > > > > > > >> > > > wrote:
> > > > > > > > > >> > > > > > >
> > > > > > > > > >> > > > > > > +1 (binding)
> > > > > > > > > >> > > > > > >
> > > > > > > > > >> > > > > > > Best,
> > > > > > > > > >> > > > > > > Xingbo
> > > > > > > > > >> > > > > > >
> > > > > > > > > >> > > > > > > Dian Fu 
> 于2024年1月10日周三
> > > > > 11:35写道:
> > > > > > > > > >> > > > > > >
> > > > > > > > > >> > > > > > > > +1 (binding)
> > > > > > > > > >> > > > > > > >
> > > > > > > > > >> > > > > > > > Regards,
> > > > > > > > > >> > > > > > > > Dian
> > > > > > > > > >> > > > > > > >
> > > > > > > > > >> > > > > > > > On Wed, Jan 10, 2024 at 5:09 AM Sharath <
> > > > > > > > > >> dsaishar...@gmail.com
> > > > > > > > > >> > >
> > > > > > > > > >> > > > > wrote:
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > > +1 (non-binding)
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > > Best,
> > > > > > > > > >> > > > > > > > > Sharath
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > > On Tue, Jan 9, 2024 at 1:02 PM Venkata
> > > Sanath
> > > > > > > > Muppalla <
> > > > > > > > > >> > > > > > > > sanath...@gmail.com>
> > > > > > > > > >> > > > > > > > > wrote:
> > > > > > > > > >> > > > > > > > >
> > > > > > > > > >> > > > > > > > > > +1 (non-binding)
> > > > > > > > > >> > > > > > > > > >
> > > > > > > > > >> > > > > > > > > > Thanks,
> > > > > > > > > >> > > > > > > > > > Sanath
> > > > > > > > > >> > > > > > > > > >
> > > > > > > > > >> > > > > > > > > > On Tue, Jan 9, 2024 at 11:16 AM Peter
> > > Huang
> > > > <
> > > > > > > > > >> > > > > > > > huangzhenqiu0...@gmail.com>
> > > > > > > > > 

Re: [DISCUSS] FLIP-329: Add operator attribute to specify support for object-reuse

2024-01-04 Thread Dong Lin
Hi Lu,

I am not actively working on Flink and this JIRA recently. If Xuannan does
not plan to work on this anytime soon, I personally think it will be great
if you can help work on this FLIP. Maybe we can start the voting thread if
there is no further comment on this FLIP.

Xuannan, what do you think?

Thanks,
Dong


On Fri, Jan 5, 2024 at 2:03 AM Lu Niu  wrote:

> Hi,
>
> Is this still under active development? I notice
> https://issues.apache.org/jira/browse/FLINK-32476 is labeled as
> deprioritized. If this is the case, would it be acceptable for us to take
> on the task?
>
> Best
> Lu
>
>
>
> On Thu, Oct 19, 2023 at 4:26 PM Ken Krugler 
> wrote:
>
>> Hi Dong,
>>
>> Sorry for not seeing this initially. I did have one question about the
>> description of the issue in the FLIP:
>>
>> > However, in cases where the upstream and downstream operators do not
>> store or access references to the input or output records, this deep-copy
>> overhead becomes unnecessary
>>
>> I was interested in getting clarification as to what you meant by “or
>> access references…”, to see if it covered this situation:
>>
>> StreamX —forward--> operator1
>> StreamX —forward--> operator2
>>
>> If operator1 modifies the record, and object re-use is enabled, then
>> operator2 will see the modified version, right?
>>
>> Thanks,
>>
>> — Ken
>>
>> > On Jul 2, 2023, at 7:24 PM, Xuannan Su  wrote:
>> >
>> > Hi all,
>> >
>> > Dong(cc'ed) and I are opening this thread to discuss our proposal to
>> > add operator attribute to allow operator to specify support for
>> > object-reuse [1].
>> >
>> > Currently, the default configuration for pipeline.object-reuse is set
>> > to false to avoid data corruption, which can result in suboptimal
>> > performance. We propose adding APIs that operators can utilize to
>> > inform the Flink runtime whether it is safe to reuse the emitted
>> > records. This enhancement would enable Flink to maximize its
>> > performance using the default configuration.
>> >
>> > Please refer to the FLIP document for more details about the proposed
>> > design and implementation. We welcome any feedback and opinions on
>> > this proposal.
>> >
>> > Best regards,
>> >
>> > Dong and Xuannan
>> >
>> > [1]
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255073749
>>
>> --
>> Ken Krugler
>> http://www.scaleunlimited.com
>> Custom big data solutions
>> Flink & Pinot
>>
>>
>>
>>


Re: [DISCUSS] FLIP-329: Add operator attribute to specify support for object-reuse

2024-01-04 Thread Dong Lin
Hi Ken,

Sorry for the late reply. I didn't notice this email from you until now.

In this scenario you described above, I don't think operator2 will see the
result modified by operato1. Note that object re-use applies only to the
transmission of data between operators in the same operator chain. But
Flink won't put StreamX, operator1 and operator2 in the same operator chain
when both operator1 and operator2 reads the same output from StreamX.

Would this answer your question?

Thanks,
Dong



On Fri, Oct 20, 2023 at 7:26 AM Ken Krugler 
wrote:

> Hi Dong,
>
> Sorry for not seeing this initially. I did have one question about the
> description of the issue in the FLIP:
>
> However, in cases where the upstream and downstream operators do not store
> or access references to the input or output records, this deep-copy
> overhead becomes unnecessary
>
>
> I was interested in getting clarification as to what you meant by “or
> access references…”, to see if it covered this situation:
>
> StreamX —forward--> operator1
> StreamX —forward--> operator2
>
> If operator1 modifies the record, and object re-use is enabled, then
> operator2 will see the modified version, right?
>
> Thanks,
>
> — Ken
>
> On Jul 2, 2023, at 7:24 PM, Xuannan Su  wrote:
>
> Hi all,
>
> Dong(cc'ed) and I are opening this thread to discuss our proposal to
> add operator attribute to allow operator to specify support for
> object-reuse [1].
>
> Currently, the default configuration for pipeline.object-reuse is set
> to false to avoid data corruption, which can result in suboptimal
> performance. We propose adding APIs that operators can utilize to
> inform the Flink runtime whether it is safe to reuse the emitted
> records. This enhancement would enable Flink to maximize its
> performance using the default configuration.
>
> Please refer to the FLIP document for more details about the proposed
> design and implementation. We welcome any feedback and opinions on
> this proposal.
>
> Best regards,
>
> Dong and Xuannan
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255073749
>
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink & Pinot
>
>
>
>


Re: [VOTE] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-10-19 Thread Dong Lin
Thanks for the FLIP!

+1 (binding)

On Wed, Oct 18, 2023 at 10:25 AM Xuannan Su  wrote:

> Hi all,
>
> We would like to start the vote for FLIP-328: Allow source operators
> to determine isProcessingBacklog based on watermark lag [1]. This FLIP
> was discussed in this thread [2].
>
> The vote will be open until at least Oct 21st (at least 72 hours),
> following the consensus voting process.
>
> Cheers,
> Xuannan
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> [2] https://lists.apache.org/thread/kmtyqxsp35cobx3g0kwfn6ksxlq57m4t
>


Re: [VOTE] FLIP-329: Add operator attribute to specify support for object-reuse

2023-10-18 Thread Dong Lin
Thanks for the FLIP!

+1 (binding)

Xuannan Su 于2023年10月19日 周四10:30写道:

> Hi all,
>
> We would like to start the vote for FLIP-329: Add operator attribute
> to specify support for object-reuse[1]. This FLIP was discussed in
> this thread [2].
>
> The vote will be open until at least Oct 22nd (at least 72 hours),
> following the consensus voting process.
>
> Cheers,
> Xuannan
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255073749
> [2] https://lists.apache.org/thread/2h2r68m7bwsnvd8w1m50rktd7w6mr5n4
>


[RESULT][VOTE] FLIP-327: Support switching from batch to stream mode to improve throughput when processing backlog data

2023-09-27 Thread Dong Lin
Hi all,

Thank everyone for your review and the votes!

I am happy to announce that FLIP-327: Support switching from batch to
stream mode to improve throughput when processing backlog data [1] has been
accepted.

There are 4 binding votes and 3 non-binding votes [2]:

- Jing Ge (binding)
- Rui Fan (binding)
- Xintong Song (binding)
- Dong Lin (binding)
- Yuepeng Pan (non-binding)
- Venkatakrishnan Sowrirajan (non-binding)
- Ahmed Hamdy (non-binding)

There is no disapproving vote.

Cheers,
Dong

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-327
%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data
[2] https://lists.apache.org/thread/7cj6pzx7w0ynqyogxgk668wjr322mcqw


Re: [VOTE] FLIP-327: Support switching from batch to stream mode to improve throughput when processing backlog data

2023-09-27 Thread Dong Lin
Thank you all for the votes!

I will close the voting thread and summarize the result in a separate email.

On Tue, Sep 26, 2023 at 1:11 AM Ahmed Hamdy  wrote:

> +1(non binding)
> Best regards
> Ahmed Hamdy
>
> On Mon, 25 Sep 2023, 19:57 Venkatakrishnan Sowrirajan, 
> wrote:
>
> > +1 (non-binding)
> >
> > On Sun, Sep 24, 2023, 6:49 PM Xintong Song 
> wrote:
> >
> > > +1 (binding)
> > >
> > > Best,
> > >
> > > Xintong
> > >
> > >
> > >
> > > On Sat, Sep 23, 2023 at 10:16 PM Yuepeng Pan 
> > > wrote:
> > >
> > > > +1(non-binding), thank you for driving this proposal.
> > > >
> > > > Best,
> > > > Yuepeng Pan.
> > > > At 2023-09-22 14:07:45, "Dong Lin"  wrote:
> > > > >Hi all,
> > > > >
> > > > >We would like to start the vote for FLIP-327: Support switching from
> > > batch
> > > > >to stream mode to improve throughput when processing backlog data
> [1].
> > > > This
> > > > >FLIP was discussed in this thread [2].
> > > > >
> > > > >The vote will be open until at least Sep 27th (at least 72
> > > > >hours), following the consensus voting process.
> > > > >
> > > > >Cheers,
> > > > >Xuannan and Dong
> > > > >
> > > > >[1]
> > > > >
> > > >
> > >
> >
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP-327*3A*Support*switching*from*batch*to*stream*mode*to*improve*throughput*when*processing*backlog*data__;JSsrKysrKysrKysrKysr!!IKRxdwAv5BmarQ!ZP19r7-3IBaBI-kV0olZvaz5a2TFN3uxge2TJM7WQvovjfRbOl71NaC3SEh_UEJmH7Lssqu0bx4FKResPPc7$
> > > > >[2]
> > >
> >
> https://urldefense.com/v3/__https://lists.apache.org/thread/29nvjt9sgnzvs90browb8r6ng31dcs3n__;!!IKRxdwAv5BmarQ!ZP19r7-3IBaBI-kV0olZvaz5a2TFN3uxge2TJM7WQvovjfRbOl71NaC3SEh_UEJmH7Lssqu0bx4FKZEAz9yp$
> > > >
> > >
> >
>


Re: Re: [DISCUSS] FLIP-331: Support EndOfStreamWindows and isOutputOnEOF operator attribute to optimize task deployment

2023-09-25 Thread Dong Lin
Hi all,

Thanks for the review!

Becket and I discussed this FLIP offline and we agreed on several things
that need to be improved with this FLIP. I will summarize our discussion
with the problems and TODOs. We will update the FLIP and let you know once
the FLIP is ready for review again.

1) Investigate whether it is possible to update the existing GlobalWindows
in a backward-compatible way and re-use it for the same purpose
as EndOfStreamWindows, without introducing EndOfStreamWindows as a new
class.

Note that GlobalWindows#getDefaultTrigger returns a NeverTrigger instance
which will not trigger window's computation even on end-of-inputs. We will
need to investigate its existing usage and see if we can re-use it in a
backward-compatible way.

2) Let JM know whether any operator in the upstream of the operator with
"isOutputOnEOF=true" will emit output via any side channel. The FLIP should
update the execution mode of those operators *only if* all outputs from
those operators are emitted only at the end of input.

More specifically, the upstream operator might involve a user-defined
operator that might emit output directly to an external service, where the
emission operation is not explicitly expressed as an operator's output edge
and thus not visible to JM. Similarly, it is also possible for the
user-defined operator to register a timer
via InternalTimerService#registerEventTimeTimer and emit output to an
external service inside Triggerable#onEventTime. There is a chance that
users still need related logic to output data in real-time, even if the
downstream operators have isOutputOnEOF=true.

One possible solution to address this problem is to add an extra
OperatorAttribute to specify whether this operator might output records in
such a way that does not go through operator's output (e.g. side output).
Then the JM can safely enable the runtime optimization currently described
in the FLIP when there is no such operator.

3) Create a follow-up FLIP that allows users to specify whether a source
with Boundedness=bounded should have isProcessingBacklog=true.

This capability would effectively introduce a 3rd strategy to set backlog
status (in addition to FLIP-309 and FLIP-328). It might be useful to note
that, even though the data in bounded sources are backlog data in most
practical use-cases, it is not necessarily true. For example, users might
want to start a Flink job to consume real-time data from a Kafka topic and
specify that the job stops after 24 hours, which means the source is
technically bounded while the data is fresh/real-time.

This capability is more generic and can cover more use-case than
EndOfStreamWindows. On the other hand, EndOfStreamWindows will still be
useful in cases where users already need to specify this window assigner in
a DataStream program, without bothering users to decide whether it is safe
to treat data in a bounded source as backlog data.


Regards,
Dong






On Mon, Sep 18, 2023 at 2:56 PM Yuxin Tan  wrote:

> Hi, Dong,
> Thanks for your efforts.
>
> +1 to this proposal,
> I believe this will improve the performance in some mixture circumstances
> of bounded and unbounded workloads.
>
> Best,
> Yuxin
>
>
> Xintong Song  于2023年9月18日周一 10:56写道:
>
> > Thanks for addressing my comments, Dong.
> >
> > LGTM.
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Sat, Sep 16, 2023 at 3:34 PM Wencong Liu 
> wrote:
> >
> > > Hi Dong & Jinhao,
> > >
> > > Thanks for your clarification! +1
> > >
> > > Best regards,
> > > Wencong
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > At 2023-09-15 11:26:16, "Dong Lin"  wrote:
> > > >Hi Wencong,
> > > >
> > > >Thanks for your comments! Please see my reply inline.
> > > >
> > > >On Thu, Sep 14, 2023 at 12:30 PM Wencong Liu 
> > > wrote:
> > > >
> > > >> Dear Dong,
> > > >>
> > > >> I have thoroughly reviewed the proposal for FLIP-331 and believe it
> > > would
> > > >> be
> > > >> a valuable addition to Flink. However, I do have a few questions
> that
> > I
> > > >> would
> > > >> like to discuss:
> > > >>
> > > >>
> > > >> 1. The FLIP-331 proposed the EndOfStreamWindows that is implemented
> by
> > > >> TimeWindow with maxTimestamp = (Long.MAX_VALUE - 1), which naturally
> > > >> supports WindowedStream and AllWindowedStream to process all records
>

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-25 Thread Dong Lin
Hi Piotr,

Thanks for the comments. Let me try to explain it below.

Overall, the two competing options differ in how an invocation of
`#setIsProcessingBacklog(false)` affects the backlog status for the given
source (corresponding to the SplitEnumeratorContext instance on which this
method is invoked).

- With my approach, setIsProcessingBacklog(false) merely unsets effects of
any previous invocation of setIsProcessingBacklog(..) on the given source,
without necessarily forcing the source's backlog status to be false.
- With Jark’s approach, setIsProcessingBacklog(false) forces the source's
backlog status to be false.

There is no practical difference between these two options as of FLIP-309.
However, once we introduce additional strategy (e.g. job-level config) to
configure backlog status in FLIP-328, there will be tricky and important
differences between them.

More specifically, let’s say we want to introduce a job-level config such
as “”pipeline.backlog.watermark-lag-threshold” as mentioned in FLIP-328:

- With Jack’s approach, if MySQL CDC invokes setIsProcessingBacklog(false)
at the beginning of the “unbounded phase”, then that effectively means
isProcessingBacklog=false even if watermark lag exceeds the configured
threshold, preventing job-level config from taking effect during the
"unbounded phase".
- With my approach, even if MySQL CDC invokes setIsProcessingBacklog(false)
at the beginning of the “unbounded phase”, the source can still have
isProcessingBacklog=true when watermark lag is too high.

Would this clarify the difference between these two options?

Regards,
Dong


On Mon, Sep 25, 2023 at 5:15 PM Piotr Nowojski 
wrote:

> Hi Jarl and Dong,
>
> I'm a bit confused about the difference between the two competing options.
> Could one of you elaborate what's the difference between:
> > 2) The semantics of `#setIsProcessingBacklog(false)` is that it overrides
> > the effect of the previous invocation (if any) of
> > `#setIsProcessingBacklog(true)` on the given source instance.
>
> and
>
> > 2) The semantics of `#setIsProcessingBacklog(false)` is that the given
> > source instance will have watermarkLag=false.
>
> ?
>
> Best,
> Piotrek
>
> czw., 21 wrz 2023 o 15:28 Dong Lin  napisał(a):
>
> > Hi all,
> >
> > Jark and I discussed this FLIP offline and I will summarize our
> discussion
> > below. It would be great if you could provide your opinion of the
> proposed
> > options.
> >
> > Regarding the target use-cases:
> > - We both agreed that MySQL CDC should have backlog=true when
> watermarkLag
> > is large during the binlog phase.
> > - Dong argued that other streaming sources with watermarkLag defined
> (e.g.
> > Kafka) should also have backlog=true when watermarkLag is large. The
> > pros/cons discussion below assume this use-case needs to be supported.
> >
> > The 1st option is what is currently proposed in FLIP-328, with the
> > following key characteristics:
> > 1) There is one job-level config (i.e.
> > pipeline.backlog.watermark-lag-threshold) that applies to all sources
> with
> > watermarkLag metric defined.
> > 2) The semantics of `#setIsProcessingBacklog(false)` is that it overrides
> > the effect of the previous invocation (if any) of
> > `#setIsProcessingBacklog(true)` on the given source instance.
> >
> > The 2nd option is what Jark proposed in this email thread, with the
> > following key characteristics:
> > 1) Add source-specific config (both Java API and SQL source property) to
> > every source for which we want to set backlog status based on the
> > watermarkLag metric. For example, we might add separate Java APIs
> > `#setWatermarkLagThreshold`  for MySQL CDC source, HybridSource,
> > KafkaSource, PulsarSource etc.
> > 2) The semantics of `#setIsProcessingBacklog(false)` is that the given
> > source instance will have watermarkLag=false.
> >
> > Here are the key pros/cons of these two options.
> >
> > Cons of the 1st option:
> > 1) The semantics of `#setIsProcessingBacklog(false)` is harder to
> > understand for Flink operator developers than the corresponding semantics
> > in option-2.
> >
> > Cons of the  2nd option:
> > 1) More work for end-users. For a job with multiple sources that need to
> be
> > configured with a watermark lag threshold, users need to specify multiple
> > configs (one for each source) instead of specifying one job-level config.
> >
> > 2) More work for Flink operator developers. Overall there are more public
> > APIs (one Java API and one SQL property for each source that needs to
> > determine backlog based on watermark) exposed to end users. This also
> adds
> >

[VOTE] FLIP-327: Support switching from batch to stream mode to improve throughput when processing backlog data

2023-09-22 Thread Dong Lin
Hi all,

We would like to start the vote for FLIP-327: Support switching from batch
to stream mode to improve throughput when processing backlog data [1]. This
FLIP was discussed in this thread [2].

The vote will be open until at least Sep 27th (at least 72
hours), following the consensus voting process.

Cheers,
Xuannan and Dong

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data
[2] https://lists.apache.org/thread/29nvjt9sgnzvs90browb8r6ng31dcs3n


Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-21 Thread Dong Lin
Hi all,

Jark and I discussed this FLIP offline and I will summarize our discussion
below. It would be great if you could provide your opinion of the proposed
options.

Regarding the target use-cases:
- We both agreed that MySQL CDC should have backlog=true when watermarkLag
is large during the binlog phase.
- Dong argued that other streaming sources with watermarkLag defined (e.g.
Kafka) should also have backlog=true when watermarkLag is large. The
pros/cons discussion below assume this use-case needs to be supported.

The 1st option is what is currently proposed in FLIP-328, with the
following key characteristics:
1) There is one job-level config (i.e.
pipeline.backlog.watermark-lag-threshold) that applies to all sources with
watermarkLag metric defined.
2) The semantics of `#setIsProcessingBacklog(false)` is that it overrides
the effect of the previous invocation (if any) of
`#setIsProcessingBacklog(true)` on the given source instance.

The 2nd option is what Jark proposed in this email thread, with the
following key characteristics:
1) Add source-specific config (both Java API and SQL source property) to
every source for which we want to set backlog status based on the
watermarkLag metric. For example, we might add separate Java APIs
`#setWatermarkLagThreshold`  for MySQL CDC source, HybridSource,
KafkaSource, PulsarSource etc.
2) The semantics of `#setIsProcessingBacklog(false)` is that the given
source instance will have watermarkLag=false.

Here are the key pros/cons of these two options.

Cons of the 1st option:
1) The semantics of `#setIsProcessingBacklog(false)` is harder to
understand for Flink operator developers than the corresponding semantics
in option-2.

Cons of the  2nd option:
1) More work for end-users. For a job with multiple sources that need to be
configured with a watermark lag threshold, users need to specify multiple
configs (one for each source) instead of specifying one job-level config.

2) More work for Flink operator developers. Overall there are more public
APIs (one Java API and one SQL property for each source that needs to
determine backlog based on watermark) exposed to end users. This also adds
more burden for the Flink community to maintain these APIs.

3) It would be hard (e.g. require backward incompatible API change) to
extend the Flink runtime to support job-level config to set watermark
strategy in the future (e.g. support the
pipeline.backlog.watermark-lag-threshold in option-1). This is because an
existing source operator's code might have hardcoded an invocation of
`#setIsProcessingBacklog(false)`, which means the backlog status must be
set to true, which prevents Flink runtime from setting backlog=true when a
new strategy is triggered.

Overall, I am still inclined to choose option-1 because it is more
extensible and simpler to use in the long term when we want to support/use
multiple sources whose backlog status can change based on the watermark
lag. While option-1's `#setIsProcessingBacklog` is a bit harder to
understand than option-2, I think this overhead/cost is worthwhile as it
makes end-users' life easier in the long term.

Jark: thank you for taking the time to review this FLIP. Please feel free
to comment if I missed anything in the pros/cons above.

Jark and I have not reached agreement on which option is better. It will be
really helpful if we can get more comments on these options.

Thanks,
Dong


On Tue, Sep 19, 2023 at 11:26 AM Dong Lin  wrote:

> Hi Jark,
>
> Thanks for the reply. Please see my comments inline.
>
> On Tue, Sep 19, 2023 at 10:12 AM Jark Wu  wrote:
>
>> Hi Dong,
>>
>> Sorry for the late reply.
>>
>> > The rationale is that if there is any strategy that is triggered and
>> says
>> > backlog=true, then job's backlog should be true. Otherwise, the job's
>> > backlog status is false.
>>
>> I'm quite confused about this. Does that mean, if the source is in the
>> changelog phase, the source has to continuously invoke
>> "setIsProcessingBacklog(true)" (in an infinite loop?). Otherwise,
>> the job's backlog status would be set to false by the framework?
>>
>
> No, the source would not have to continuously invoke
> setIsProcessingBacklog(true) in an infinite loop.
>
> Actually, I am not very sure why there is confusion that "the job's
> backlog status would be set to false by the framework". Could you explain
> where that comes from?
>
> I guess it might be useful to provide a complete overview of how
> setIsProcessingBacklog(...)
> and pipline.backlog.watermark-lag-threshold work together to determine the
> overall job's backlog status. Let me explain it below.
>
> Here is the semantics/behavior of setIsProcessingBacklog(..).
> - This method is invoked on a per-source basis.
> - This method can be invoked multiple times with true/false as its input
> param

Re: [DISCUSS] FLIP-327: Support stream-batch unified operator to improve job throughput when processing backlog data

2023-09-19 Thread Dong Lin
Hi all,

Thank you for the comments!

If there is no further comment, we will open the voting thread in 3 days.

Cheers,
Xuannan and Dong


Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-18 Thread Dong Lin
Hi Jark,

Thanks for the reply. Please see my comments inline.

On Tue, Sep 19, 2023 at 10:12 AM Jark Wu  wrote:

> Hi Dong,
>
> Sorry for the late reply.
>
> > The rationale is that if there is any strategy that is triggered and says
> > backlog=true, then job's backlog should be true. Otherwise, the job's
> > backlog status is false.
>
> I'm quite confused about this. Does that mean, if the source is in the
> changelog phase, the source has to continuously invoke
> "setIsProcessingBacklog(true)" (in an infinite loop?). Otherwise,
> the job's backlog status would be set to false by the framework?
>

No, the source would not have to continuously invoke
setIsProcessingBacklog(true) in an infinite loop.

Actually, I am not very sure why there is confusion that "the job's backlog
status would be set to false by the framework". Could you explain where
that comes from?

I guess it might be useful to provide a complete overview of how
setIsProcessingBacklog(...)
and pipline.backlog.watermark-lag-threshold work together to determine the
overall job's backlog status. Let me explain it below.

Here is the semantics/behavior of setIsProcessingBacklog(..).
- This method is invoked on a per-source basis.
- This method can be invoked multiple times with true/false as its input
parameter.
- For a given source, the last invocation of this method overwrites the
effect of earlier invocation of this method.
- For a given source, if this method has been invoked at least once and the
last invocation of this method has isProcessingBacklog = true, then it is
guaranteed that the backlog status of this source is set to true.

Here is the semantics/behavior of pipline.backlog.watermark-lag-threshold:
- This config is specified at the job level and applies to every source
included in this job.
- For a given source, if it's watermarkLag metric is available (see
FLIP-33) and watermarkLag > watermark-lag-threshold, then it is guaranteed
that backlog status of this source is set to true.

Here is how the source's backlog status is determined: If a rule specified
above says the source's backog status should be true, then the source's
backlog status is set to true. Otherwise, it is set to false.

How is how the job's backlog status is determined: If there exists a source
that is currently running and the source's backlog status is set to true,
then the job's backlog status is set to true. Otherwise, it is set to false.

Hopefully this can help clarify the behavior. If needed, we can update the
relevant doc (e.g. setIsProcessingBacklog() Java doc) to make semantics
above clearer for Flink users.

And I would be happy to discuss/explain this design offline when you have
time.

Thanks,
Dong


>
> Best,
> Jark
>
> On Tue, 19 Sept 2023 at 09:13, Dong Lin  wrote:
>
> > Hi Jark,
> >
> > Do you have time to comment on whether the current design looks good?
> >
> > I plan to start voting in 3 days if there is no follow-up comment.
> >
> > Thanks,
> > Dong
> >
> >
> > On Fri, Sep 15, 2023 at 2:01 PM Jark Wu  wrote:
> >
> > > Hi Dong,
> > >
> > > > Note that we can not simply enforce the semantics of "any invocation
> of
> > > > setIsProcessingBacklog(false) will set the job's backlog status to
> > > false".
> > > > Suppose we have a job with two operators, where operatorA invokes
> > > > setIsProcessingBacklog(false) and operatorB invokes
> > > > setIsProcessingBacklog(true). There will be conflict if we use the
> > > > semantics of "any invocation of setIsProcessingBacklog(false) will
> set
> > > the
> > > > job's backlog status to false".
> > >
> > > So it should set job's backlog status to false if the job has only a
> > single
> > > source,
> > > right?
> > >
> > > Could you elaborate on the behavior if there is a job with a single
> > source,
> > > and the watermark lag exceeds the configured value (should set backlog
> to
> > > true?),
> > > but the source invokes "setIsProcessingBacklog(false)"? Or the inverse
> > one,
> > > the source invokes "setIsProcessingBacklog(false)" first, but the
> > watermark
> > > lag
> > > exceeds the configured value.
> > >
> > > This is the conflict I'm concerned about.
> > >
> > > Best,
> > > Jark
> > >
> > > On Fri, 15 Sept 2023 at 12:00, Dong Lin  wrote:
> > >
> > > > Hi Jark,
> > > >
> > > > Please see my comments inline.
> > > >
> > > > On Fri, Sep 15, 2023 at 10:35 AM Jark Wu  wrote:
> > > >
> > &

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-18 Thread Dong Lin
Hi Jark,

Do you have time to comment on whether the current design looks good?

I plan to start voting in 3 days if there is no follow-up comment.

Thanks,
Dong


On Fri, Sep 15, 2023 at 2:01 PM Jark Wu  wrote:

> Hi Dong,
>
> > Note that we can not simply enforce the semantics of "any invocation of
> > setIsProcessingBacklog(false) will set the job's backlog status to
> false".
> > Suppose we have a job with two operators, where operatorA invokes
> > setIsProcessingBacklog(false) and operatorB invokes
> > setIsProcessingBacklog(true). There will be conflict if we use the
> > semantics of "any invocation of setIsProcessingBacklog(false) will set
> the
> > job's backlog status to false".
>
> So it should set job's backlog status to false if the job has only a single
> source,
> right?
>
> Could you elaborate on the behavior if there is a job with a single source,
> and the watermark lag exceeds the configured value (should set backlog to
> true?),
> but the source invokes "setIsProcessingBacklog(false)"? Or the inverse one,
> the source invokes "setIsProcessingBacklog(false)" first, but the watermark
> lag
> exceeds the configured value.
>
> This is the conflict I'm concerned about.
>
> Best,
> Jark
>
> On Fri, 15 Sept 2023 at 12:00, Dong Lin  wrote:
>
> > Hi Jark,
> >
> > Please see my comments inline.
> >
> > On Fri, Sep 15, 2023 at 10:35 AM Jark Wu  wrote:
> >
> > > Hi Dong,
> > >
> > > Please see my comments inline below.
> >
> >
> > > > Hmm.. can you explain what you mean by "different watermark delay
> > > > definitions for each source"?
> > >
> > > For example, "table1" defines a watermark with delay 5 seconds,
> > > "table2" defines a watermark with delay 10 seconds. They have different
> > > watermark delay definitions. So it is also reasonable they have
> different
> > > watermark lag definitions, e.g., "table1" allows "10mins" and "table2"
> > > allows "20mins".
> > >
> >
> > I think the watermark delay you mentioned above is conceptually /
> > fundamentally different from the watermark-lag-threshold proposed in this
> > FLIP.
> >
> > It might be useful to revisit the semantics of these two concepts:
> > - watermark delay is used to account for the maximum amount of
> orderliness
> > that users expect (or willing to wait for) for records from a given
> source.
> > - watermark-lag-threshold is used to define when processing latency is no
> > longer important (e.g. because data is already stale).
> >
> > Even though users might expect different out of orderliness for different
> > sources, users do not necessarily have different definitions / thresholds
> > for when a record is considered "already stale".
> >
> >
> > >
> > > > I think there is probably misunderstanding here. FLIP-309 does NOT
> > > directly
> > > > specify when backlog is false. It is intentionally specified in such
> a
> > > way
> > > > that there will  not be any conflict between these rules.
> > >
> > > Do you mean FLIP-309 doesn't allow to specify backlog to be false?
> > > Is this mentioned in FLIP-309? This is completely different from what I
> > >
> >
> > Can you explain what you mean by "allow to specify backlog to be false"?
> >
> > If what you mean is that "can invoke setIsProcessingBacklog(false)", then
> > FLIP-309 supports doing this.
> >
> > If what you mean is that "any invocation of setIsProcessingBacklog(false)
> > will set the job's backlog status to false", then FLIP-309 does not
> support
> > this. I believe the existing Java doc of this API and FLIP-309 is
> > compatible with this explanation.
> >
> > Note that we can not simply enforce the semantics of "any invocation of
> > setIsProcessingBacklog(false) will set the job's backlog status to
> false".
> > Suppose we have a job with two operators, where operatorA invokes
> > setIsProcessingBacklog(false) and operatorB invokes
> > setIsProcessingBacklog(true). There will be conflict if we use the
> > semantics of "any invocation of setIsProcessingBacklog(false) will set
> the
> > job's backlog status to false".
> >
> > Would this answer your question?
> >
> > Best,
> > Dong
> >
> >
> > > understand. From the API interface
> "ctx.setIsProcessing

Re: [DISCUSS] FLIP-327: Support stream-batch unified operator to improve job throughput when processing backlog data

2023-09-18 Thread Dong Lin
Hi Xintong,

Thank you for all the comments. Please see my reply inline.


On Mon, Sep 18, 2023 at 11:31 AM Xintong Song  wrote:

> Thanks for addressing my comments, Dong.
>
> The expected behavior of checkpointing and failover depends on whether
> > there is any operator currently running in the job with all its inputs'
> > isBacklog=true. If there exists such an operator and
> > interval-during-backlog = 0, then checkpoint will be disabled and the
> > operator will have to failover in a way similar to batch mode.
>
>
> This makes sense to me. Shall we also put this into the FLIP. Or maybe you
> already did that and I overlooked it? The current description in "4)
> Checkpoint and failover strategy" -> "Mixed mode" is a bit unclear to me.
> It says "At the point when isBacklog switches to false, source operator
> ...", which sounds like upon any source operator switching to isBacklog =
> false.
>


I think it is kind of mentioned in the doc
of execution.checkpointing.interval-during-backlog, which says "if it is
not null and any source reports isProcessingBacklog=true, it is the
interval...".

Based on this doc, we can derive that if there is one operator reporting
isBacklog=true, then the checkpointing interval is determined by
interval-during-backlog, which in this case has value 0 indicating that
checkpoint triggering is disabled.

Given that other readers might also have this question, I have updated the
FLIP-327 with the following statement to make it more explicit: "For jobs
with multiple sources and execution.checkpointing.interval-during-backlog =
0, checkpoint triggering is enabled if and only if all sources have
isBacklog=false".



> I am not sure what is the concern with having `flink-streaming-java` depend
> > on `flink-runtime`. Can you clarify the exact concern?
> >
>
> The concern here is that an API module should not depend on a runtime
> module. Currently, we have the "user codes -> flink-streaming-java ->
> flink-runtime" dependency chain, which makes binary compatibility
> impossible because any runtime changes can break the compatibility with a
> user jar (which bundles flink-streaming-java) compiled for an older
> version. Ideally, we want the runtime module to depend on the API module,
> rather than the other way around. This is one of the issues we are trying
> to resolve with the programmatic API refactor. However, the way we are
> trying to resolve it is to introduce another API module and gradually
> replace the current DataStream API / flink-streaming-java, which means
> flink-streaming-java will stay depending on flink-runtime for a while
> anyway. So the concern here is minor, only about we might need more effort
> when reworking this with the new API.
>

Thanks for the detailed explanation. Given that we plan to avoid having
flink-streaming-java depend on flink-runtime, I agree it is preferred to
avoid introducing more dependencies like this.

I have updated the FLIP to let RecordAttributes extend StreamElement.

Best,
Dong


> The rest of your replies make sense to me.
>
> Best,
>
> Xintong
>
>
>
> On Fri, Sep 15, 2023 at 10:05 PM Dong Lin  wrote:
>
> > Hi Xintong,
> >
> > Thanks for your comments! Please see my reply inline.
> >
> > On Thu, Sep 14, 2023 at 4:58 PM Xintong Song 
> > wrote:
> >
> > > Sorry to join the discussion late.
> > >
> > > Overall, I think it's a good idea to support dynamically switching the
> > > operator algorithms between Streaming (optimized towards low latency +
> > > checkpointing supports) and Batch (optimized towards throughput). This
> is
> > > indeed a big and complex topic, and I really appreciate the previous
> > > discussions that narrow the scope of this FLIP down to only considering
> > > switching from Batch to Streaming as a first step.
> > >
> > > I have several questions.
> > >
> > > 1. The FLIP discusses various behaviors under 4 scenarios: streaming
> > mode,
> > > batch mode, mixed mode with checkpoint interval > 0, mixed mode with
> > > checkpoint interval = 0. IIUC, this is because many batch optimizations
> > > cannot be supported together with checkpointing. This justifies that in
> > > mixed mode with interval > 0, most behaviors are the same as in
> streaming
> > > mode. However, mixed mode with checkpoint interval = 0 does not always
> > > necessarily mean we should apply such optimization. It is possible that
> > in
> > > some cases (likely with small data amounts) the cost of such
> > optimizations
> > > are higher than the benefit. Therefore, I'd sugge

Re: [DISCUSS] FLIP-327: Support stream-batch unified operator to improve job throughput when processing backlog data

2023-09-15 Thread Dong Lin
 modified so the watermark does not move to +inf, but to
> > min(streaming watermark). Giving these properties, it should be possible
> > to exchange batch and streaming processing without any cooperation with
> > the application logic itself. Is my understanding correct?
> >
> > If so, there is still one open question to efficiency, though. The
> > streaming operator _might_ need sorting by timestamp (e.g. processing
> > time-series data, or even sequential data). In that case simply
> > switching streaming semantics to batch processing does not yield
> > efficient processing, because the operator still needs to buffer and
> > manually sort all the input data (batch data is always unordered). On
> > the other hand, the batch runner already does sorting (for grouping by
> > key), so adding additional sorting criterion is very cheap. In Apache
> > Beam, we introduced a property of a stateful PTransform (DoFn) called
> > @RequiresTimeSortedInput [1], which can then be implemented efficiently
> > by batch engines.
> >
> > Does the FLIP somehow work with conditions i) and ii)? I can imagine for
> > instance that if data is read from say Kafka, then if backlog gets
> > sufficiently large, then even the batch processing can take substantial
> > time and if it fails after long processing, some of the original data
> > might be already rolled out from Kafka topic.
> >
> > In the FLIP there are some proposed changes to sources to emit metadata
> > about if the records come from backlog. What is the driving line of
> > thoughts why this is needed? In my point of view, streaming engines are
> > _always_ processing backlog, the only question is "how delayed are the
> > currently processed events after HEAD", or more specifically in this
> > case "how many elements can we expect to process if the source would
> > immediately stop receiving more data?". This should be configurable
> > using simple option defining the difference between current
> > processing-time (JM) and watermark of the source, or am I missing
> > something?
> >
> > Thanks for clarification and all the best,
> >
> >   Jan
> >
> > [1]
> >
> >
> https://beam.apache.org/releases/javadoc/2.50.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html
> >
> > On 8/31/23 13:17, Xuannan Su wrote:
> > > Hi all,
> > >
> > > I would like to share some updates on FLIP-327. Dong and I have had a
> > > series of discussions and have made several refinements to the FLIP.
> > >
> > > The major change to the FLIP is to allow the input of the one-input
> > > operator to be automatically sorted during backlog processing. When
> > > combined with the state backend optimization introduced in FLIP-325
> [1],
> > > all the keyed single-input operators can achieve similar performance as
> > in
> > > batch mode during backlog processing without any code change to the
> > > operator. We also implemented a POC[2] and conducted benchmark[3] using
> > the
> > > KeyedStream#reduce operation. The benchmark results demonstrate the
> > > performance gains that this FLIP can offer.
> > >
> > > I am looking forward to any comments or feedback you may have on this
> > FLIP.
> > >
> > > Best,
> > > Xuannan
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-325%3A+Introduce+LRU+cache+to+accelerate+state+backend+access
> > > [2] https://github.com/Sxnan/flink/tree/FLIP-327-demo
> > > [3]
> > >
> >
> https://github.com/Sxnan/flink/blob/d77d0d3fb268de0a1939944ea4796a112e2d68c0/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/backlog/ReduceBacklogBenchmark.java
> > >
> > >
> > >
> > >> On Aug 18, 2023, at 21:28, Dong Lin  wrote:
> > >>
> > >> Hi Piotr,
> > >>
> > >> Thanks for the explanation.
> > >>
> > >> To recap our offline discussion, there is a concern regarding the
> > >> capability to dynamically switch between stream and batch modes. This
> > >> concern is around unforeseen behaviors such as bugs or performance
> > >> regressions, which we might not yet be aware of yet. The reason for
> this
> > >> concern is that this feature involves a fundamental impact on the
> Flink
> > >> runtime's behavior.
> > >>
> > >> Due to the above concern, I agree it is reasonable to annotate related
> >

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-15 Thread Dong Lin
Hi Jark,

Please see my reply inline.

On Fri, Sep 15, 2023 at 2:01 PM Jark Wu  wrote:

> Hi Dong,
>
> > Note that we can not simply enforce the semantics of "any invocation of
> > setIsProcessingBacklog(false) will set the job's backlog status to
> false".
> > Suppose we have a job with two operators, where operatorA invokes
> > setIsProcessingBacklog(false) and operatorB invokes
> > setIsProcessingBacklog(true). There will be conflict if we use the
> > semantics of "any invocation of setIsProcessingBacklog(false) will set
> the
> > job's backlog status to false".
>
> So it should set job's backlog status to false if the job has only a single
> source,
> right?
>

As of the FLIP-309's implementation, with the constraint that there is only
one API to set backlog=true, your understanding is correct.

However, it might be useful to recall that FLIP-309 is proposed with the
explicit plan to add more strategies to set backlog=true, which is
documented in its future work section. The above statement, which assumes
there is only one strategy, is not useful in the long term.

With this knowledge in mind, a better and more long-lasting way to
interpret the semantics of setIsProcessingBacklog() would be this: "it
should set job's backlog status to false if the job has only a single
source AND there is no other strategy to set backlog=true".


> Could you elaborate on the behavior if there is a job with a single source,
> and the watermark lag exceeds the configured value (should set backlog to
> true?),
> but the source invokes "setIsProcessingBacklog(false)"? Or the inverse one,
> the source invokes "setIsProcessingBacklog(false)" first, but the watermark
> lag
> exceeds the configured value.
>

Suppose setIsProcessingBacklog(false) is invoked, and watermark lag exceeds
the configured value, then the job's backlog status is set to true.

The rationale is that if there is any strategy that is triggered and says
backlog=true, then job's backlog should be true. Otherwise, the job's
backlog status is false.

The key idea to avoid conflict is that we will not add any API with the
capability to explicitly set job's backlog status to false. A job's backlog
status is false if and only if no rule says it should be set to true.

BTW, I can understand that setIsProcessingBacklog(false) appears to suggest
the job's backlog is set to false. We can certainly update its Java doc to
make the semantics clearer. If you have any suggestion on a better name for
this method, we can also update it accordingly.

What do you think?


> This is the conflict I'm concerned about.
>
> Best,
> Jark
>
> On Fri, 15 Sept 2023 at 12:00, Dong Lin  wrote:
>
> > Hi Jark,
> >
> > Please see my comments inline.
> >
> > On Fri, Sep 15, 2023 at 10:35 AM Jark Wu  wrote:
> >
> > > Hi Dong,
> > >
> > > Please see my comments inline below.
> >
> >
> > > > Hmm.. can you explain what you mean by "different watermark delay
> > > > definitions for each source"?
> > >
> > > For example, "table1" defines a watermark with delay 5 seconds,
> > > "table2" defines a watermark with delay 10 seconds. They have different
> > > watermark delay definitions. So it is also reasonable they have
> different
> > > watermark lag definitions, e.g., "table1" allows "10mins" and "table2"
> > > allows "20mins".
> > >
> >
> > I think the watermark delay you mentioned above is conceptually /
> > fundamentally different from the watermark-lag-threshold proposed in this
> > FLIP.
> >
> > It might be useful to revisit the semantics of these two concepts:
> > - watermark delay is used to account for the maximum amount of
> orderliness
> > that users expect (or willing to wait for) for records from a given
> source.
> > - watermark-lag-threshold is used to define when processing latency is no
> > longer important (e.g. because data is already stale).
> >
> > Even though users might expect different out of orderliness for different
> > sources, users do not necessarily have different definitions / thresholds
> > for when a record is considered "already stale".
> >
> >
> > >
> > > > I think there is probably misunderstanding here. FLIP-309 does NOT
> > > directly
> > > > specify when backlog is false. It is intentionally specified in such
> a
> > > way
> > > > that there will  not be any conflict between these rules.
> > >
> > > Do you mean FLIP-309 doesn't allow to specify backlog to be false?
&

Re: [DISCUSS] FLIP-331: Support EndOfStreamWindows and isOutputOnEOF operator attribute to optimize task deployment

2023-09-14 Thread Dong Lin
Hi Xintong,

Thanks for the comments! Please see my reply inline.

On Thu, Sep 14, 2023 at 4:17 PM Xintong Song  wrote:

> Thanks for preparing this FLIP, Dong & Jinhao.
>
> I'm overall +1 to this proposal. This is helpful for some cases that we are
> dealing with.
> - Wencong and I are preparing guidelines for migrating from DataSet API to
> DataStream API. We noticed that users have to define a custom trigger in
> order to process all data within one window, or use a very large time
> window and mock a timestamp for each record expecting the Long.MAX_VALUE
> watermark to trigger the window. EndOfStreamWindow would help reduce that
> effort.
> - With Hybrid Shuffle mode, we also encountered the problem that the
> downstream tasks are pulled up but cannot perform any processing because
> the upstream tasks do not emit any data until all inputs are consumed. The
> output-on-eof operator attribute semantic should also be helpful in
> addressing this issue, as mentioned in future work of this FLIP.
>

Great! Thanks for providing the information.


>
> I have a few minor comments.
>
> A blocking input edge with pending records is same as a source with
> > isBacklog=true when an operator determines its RecordAttributes for
> > downstream nodes.
> >
> - It is not very clear to me what this sentence means. In particular, how
> does it relate to the proposed changes in this FLIP?
>

This is needed for FLIP-331 to work with FLIP-327. More specifically, once
both FLIP-327 and FLIP-331 are accepted, we need a way to determine the
backlog status for input with blocking edge type.

Thanks for catching this. I have added this explanation in the FLIP to
hopefully reduce the confusion.


> - IIUC, EndOfStrearmWindows is a new type of window that can be used in all
> cases where it takes a WindowAssigner. It is not limited to coGroup and
> aggregate. It might be better to make that more explicit in the FLIP.
>

Good point. I agree it is useful to make that explicit.

I have added a section "Analysis of APIs affected by this FLIP" in the FLIP
to clarify this.


Best,
Dong


>
> WDYT?
>
> Best,
>
> Xintong
>
>
>
> On Thu, Sep 14, 2023 at 12:30 PM Wencong Liu  wrote:
>
> > Dear Dong,
> >
> > I have thoroughly reviewed the proposal for FLIP-331 and believe it would
> > be
> > a valuable addition to Flink. However, I do have a few questions that I
> > would
> > like to discuss:
> >
> >
> > 1. The FLIP-331 proposed the EndOfStreamWindows that is implemented by
> > TimeWindow with maxTimestamp = (Long.MAX_VALUE - 1), which naturally
> > supports WindowedStream and AllWindowedStream to process all records
> > belonging to a key in a 'global' window under both STREAMING and BATCH
> > runtime execution mode.
> >
> >
> > However, besides coGroup and keyBy().aggregate(), other operators on
> > WindowedStream and AllWindowedStream, such as join/reduce, etc, currently
> > are still implemented based on WindowOperator.
> >
> >
> > In fact, these operators can also be implemented without using
> > WindowOperator
> > to prevent additional WindowAssigner#assignWindows or
> > triggerContext#onElement
> > invocation cost. Will there be plans to support these operators in the
> > future?
> >
> >
> > 2. When using EndOfStreamWindows, upstream operators no longer support
> > checkpointing. This limit may be too strict, especially when dealing with
> > bounded data in streaming runtime execution mode, where checkpointing
> > can still be useful.
> >
> > 3. The proposal mentions that if a transformation has isOutputOnEOF ==
> > true, the
> > operator as well as its upstream operators will be executed in 'batch
> > mode' with
> > checkpointing disabled. I would like to understand the specific
> > implications of this
> > 'batch mode' and if there are any other changes associated with it?
> >
> > Additionally, I am curious to know if this 'batch mode' conflicts with
> the
> > 'mix mode'
> >
> > described in FLIP-327. While the coGroup and keyBy().aggregate()
> operators
> > on
> > EndOfStreamWindows have the attribute 'isInternalSorterSupported' set to
> > true,
> > indicating support for the 'mixed mode', they also have isOutputOnEOF set
> > to true,
> > which suggests that the upstream operators should be executed in 'batch
> > mode'.
> > Will the 'mixed mode' be ignored when in 'batch mode'? I would appreciate
> > any
> > clarification on this matter.
> >
> > Thank you for taking the time to consider my feedback. I eagerly await
> > your r

Re: [DISCUSS] FLIP-331: Support EndOfStreamWindows and isOutputOnEOF operator attribute to optimize task deployment

2023-09-14 Thread Dong Lin
f the
four combinations of true/false values for these two attributes.

In the specific example you described above, let's say isOutputOnEOF = true
and isInternalSorterSupported = true. According to FLIP-331, the checkpoint
is disabled when this operator is running. And according to FLIP-327, this
operator will sort data internally, which means that Flink runtime should
not additionally sort its inputs. So overall the Flink job can comply with
the semantics of these two attributes consistently.


Thanks again for taking time to review this FLIP. Please let me know what
you think.

Best regards,
Dong


> Thank you for taking the time to consider my feedback. I eagerly await
> your response.
>
> Best regards,
>
> Wencong Liu
>
>
>
>
>
>
>
>
>
>
>
> At 2023-09-01 11:21:47, "Dong Lin"  wrote:
> >Hi all,
> >
> >Jinhao (cc'ed) and I are opening this thread to discuss FLIP-331: Support
> >EndOfStreamWindows and isOutputOnEOF operator attribute to optimize task
> >deployment. The design doc can be found at
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-331%3A+Support+EndOfStreamWindows++and+isOutputOnEOF+operator+attribute+to+optimize+task+deployment
> >.
> >
> >This FLIP introduces isOutputOnEOF operator attribute that JobManager can
> >use to optimize task deployment and resource utilization. In addition, it
> >also adds EndOfStreamWindows that can be used with the DataStream APIs
> >(e.g. cogroup, aggregate) to significantly increase throughput and reduce
> >resource utilization.
> >
> >We would greatly appreciate any comment or feedback you may have on this
> >proposal.
> >
> >Cheers,
> >Dong
>


Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-14 Thread Dong Lin
Hi Jark,

Please see my comments inline.

On Fri, Sep 15, 2023 at 10:35 AM Jark Wu  wrote:

> Hi Dong,
>
> Please see my comments inline below.


> > Hmm.. can you explain what you mean by "different watermark delay
> > definitions for each source"?
>
> For example, "table1" defines a watermark with delay 5 seconds,
> "table2" defines a watermark with delay 10 seconds. They have different
> watermark delay definitions. So it is also reasonable they have different
> watermark lag definitions, e.g., "table1" allows "10mins" and "table2"
> allows "20mins".
>

I think the watermark delay you mentioned above is conceptually /
fundamentally different from the watermark-lag-threshold proposed in this
FLIP.

It might be useful to revisit the semantics of these two concepts:
- watermark delay is used to account for the maximum amount of orderliness
that users expect (or willing to wait for) for records from a given source.
- watermark-lag-threshold is used to define when processing latency is no
longer important (e.g. because data is already stale).

Even though users might expect different out of orderliness for different
sources, users do not necessarily have different definitions / thresholds
for when a record is considered "already stale".


>
> > I think there is probably misunderstanding here. FLIP-309 does NOT
> directly
> > specify when backlog is false. It is intentionally specified in such a
> way
> > that there will  not be any conflict between these rules.
>
> Do you mean FLIP-309 doesn't allow to specify backlog to be false?
> Is this mentioned in FLIP-309? This is completely different from what I
>

Can you explain what you mean by "allow to specify backlog to be false"?

If what you mean is that "can invoke setIsProcessingBacklog(false)", then
FLIP-309 supports doing this.

If what you mean is that "any invocation of setIsProcessingBacklog(false)
will set the job's backlog status to false", then FLIP-309 does not support
this. I believe the existing Java doc of this API and FLIP-309 is
compatible with this explanation.

Note that we can not simply enforce the semantics of "any invocation of
setIsProcessingBacklog(false) will set the job's backlog status to false".
Suppose we have a job with two operators, where operatorA invokes
setIsProcessingBacklog(false) and operatorB invokes
setIsProcessingBacklog(true). There will be conflict if we use the
semantics of "any invocation of setIsProcessingBacklog(false) will set the
job's backlog status to false".

Would this answer your question?

Best,
Dong


> understand. From the API interface "ctx.setIsProcessingBacklog(boolean)",
> it allows users to invoke "setIsProcessingBacklog(false)". And FLIP-309
> also says "MySQL CDC source should report isProcessingBacklog=false
> at the beginning of the changelog stage." If not, maybe we need to revisit
> FLIP-309.


> Best,
> Jark
>
>
>
> On Fri, 15 Sept 2023 at 08:41, Dong Lin  wrote:
>
> > Hi Jark,
> >
> > Do you have any follow-up comment?
> >
> > My gut feeling is that suppose we need to support per-source watermark
> lag
> > specification in the future (not sure we have a use-case for this right
> > now), we can add such a config in the future with a follow-up FLIP. The
> > job-level config will still be useful as it makes users' configuration
> > simpler for common scenarios.
> >
> > If it is OK, can we agree to make incremental progress for Flink and
> start
> > a voting thread for this FLIP?
> >
> > Thanks,
> > Dong
> >
> >
> > On Mon, Sep 11, 2023 at 4:41 PM Jark Wu  wrote:
> >
> > > Hi Dong,
> > >
> > > Please see my comments inline.
> > >
> > > >  As a result, the proposed job-level
> > > > config will be applied only in the changelog stage. So there is no
> > > > difference between these two approaches in this particular case,
> right?
> > >
> > > How the job-level config can be applied ONLY in the changelog stage?
> > > I think it is only possible if it is implemented by the CDC source
> > itself,
> > > because the framework doesn't know which stage of the source is.
> > > Know that the CDC source may emit watermarks with a very small lag
> > > in the snapshot stage, and the job-level config may turn the backlog
> > > status into false.
> > >
> > > > On the other hand, per-source config will be necessary if users want
> to
> > > > apply different watermark lag thresholds for different sources in the
> > > same
> >

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-14 Thread Dong Lin
Hi Jark,

Do you have any follow-up comment?

My gut feeling is that suppose we need to support per-source watermark lag
specification in the future (not sure we have a use-case for this right
now), we can add such a config in the future with a follow-up FLIP. The
job-level config will still be useful as it makes users' configuration
simpler for common scenarios.

If it is OK, can we agree to make incremental progress for Flink and start
a voting thread for this FLIP?

Thanks,
Dong


On Mon, Sep 11, 2023 at 4:41 PM Jark Wu  wrote:

> Hi Dong,
>
> Please see my comments inline.
>
> >  As a result, the proposed job-level
> > config will be applied only in the changelog stage. So there is no
> > difference between these two approaches in this particular case, right?
>
> How the job-level config can be applied ONLY in the changelog stage?
> I think it is only possible if it is implemented by the CDC source itself,
> because the framework doesn't know which stage of the source is.
> Know that the CDC source may emit watermarks with a very small lag
> in the snapshot stage, and the job-level config may turn the backlog
> status into false.
>
> > On the other hand, per-source config will be necessary if users want to
> > apply different watermark lag thresholds for different sources in the
> same
> > job.
>
> We also have different watermark delay definitions for each source,
> I think this's also reasonable and necessary to have different watermark
> lags.
>
>
> > Each source can have its own rule that specifies when the backlog can be
> true
> > (e.g. MySql CDC says the backlog should be true during the snapshot
> stage).
> > And we can have a job-level config that specifies when the backlog should
> > be true. Note that it is designed in such a way that none of these rules
> > specify when the backlog should be false. That is why there is no
> conflict
> > by definition.
>
> IIUC, FLIP-309 provides `setIsProcessingBacklog` to specify when the
> backlog
> is true and when is FALSE. This conflicts with the job-level config as it
> will turn
> the status into true.
>
> > If I understand your comments correctly, you mean that we might have a
> > Flink SQL DDL with user-defined watermark expressions. And users also
> want
> > to set the backlog to true if the watermark generated by that
> > user-specified expression exceeds a threshold.
>
> No. I mean the source may not support generating watermarks, so the
> watermark
> expression is applied in a following operator (instead of in the source
> operator).
> This will result in the watermark lag doesn't work in this case and confuse
> users.
>
> > You are right that this is a limitation. However, this is only a
> short-term
> > limitation which we added to make sure that we can focus on the
> capability
> > to switch from backlog=true to backlog=false. In the future, we will
> remove
> > this limitation and also support switching from backlog=false to
> > backlog=true.
>
> I can understand it may be difficult to support runtime mode switching back
> and forth.
> However, I think this should be a limitation of FLIP-327, not FLIP-328.
> IIUC,
> FLIP-309 doesn't have this limitation, right? I just don't understand
> what's the
> challenge to switch a flag?
>
> Best,
> Jark
>
>
> On Sun, 10 Sept 2023 at 19:44, Dong Lin  wrote:
>
> > Hi Jark,
> >
> > Thanks for the comments. Please see my comments inline.
> >
> > On Sat, Sep 9, 2023 at 4:13 PM Jark Wu  wrote:
> >
> > > Hi Xuannan,
> > >
> > > I leave my comments inline.
> > >
> > > > In the case where a user wants to
> > > > use a CDC source and also determine backlog status based on watermark
> > > > lag, we still need to define the rule when that occurs
> > >
> > > This rule should be defined by the source itself (who knows backlog
> > best),
> > > not by the framework. In the case of CDC source, it reports
> > isBacklog=true
> > > during snapshot stage, and report isBacklog=false during changelog
> stage
> > if
> > > watermark-lag is within the threshold.
> > >
> >
> > I am not sure I fully understand the difference between adding a
> job-level
> > config vs. adding a per-source config.
> >
> > In the case of CDC, its watermark lag should be either unde-defined or
> > really large in the snapshot stage. As a result, the proposed job-level
> > config will be applied only in the changelog stage. So there is no
> > difference between these two approaches in this particular case, ri

Re: [VOTE] FLIP-357: Deprecate Iteration API of DataStream

2023-09-13 Thread Dong Lin
Thanks Wencong for the FLIP.

+1 (binding)

On Thu, Sep 14, 2023 at 12:36 PM Wencong Liu  wrote:

> Hi dev,
>
>
> I'd like to start a vote on FLIP-357.
>
>
> Discussion thread:
> https://lists.apache.org/thread/shf77phc0wzlbj06jsfj3nclxnm2mrv5
> FLIP:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-357%3A+Deprecate+Iteration+API+of+DataStream
>
>
> Best regards,
> Wencong Liu


Re: [VOTE] FLIP-355: Add parent dir of files to classpath using yarn.provided.lib.dirs

2023-09-13 Thread Dong Lin
Thanks Archit for the FLIP.

+1 (binding)

Regards,
Dong

On Thu, Sep 14, 2023 at 1:47 AM Archit Goyal 
wrote:

> Hi everyone,
>
> Thanks for reviewing the FLIP-355 Add parent dir of files to classpath
> using yarn.provided.lib.dirs :
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP+355%3A+Add+parent+dir+of+files+to+classpath+using+yarn.provided.lib.dirs
>
> Following is the discussion thread :
> https://lists.apache.org/thread/gv0ro4jsq4o206wg5gz9z5cww15qkvb9
>
> I'd like to start a vote for it. The vote will be open for at least 72
> hours (until September 15, 12:00AM GMT) unless there is an objection or an
> insufficient number of votes.
>
> Thanks,
> Archit Goyal
>


Re: [VOTE] FLIP-361: Improve GC Metrics

2023-09-13 Thread Dong Lin
Thanks for the FLIP!

+1(binding)

On Wed, Sep 13, 2023 at 9:16 PM Gyula Fóra  wrote:

> Hi All!
>
> Thanks for all the feedback on FLIP-361: Improve GC Metrics [1][2]
>
> I'd like to start a vote for it. The vote will be open for at least 72
> hours unless there is an objection or insufficient votes.
>
> Cheers,
> Gyula
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-361%3A+Improve+GC+Metrics
> [2] https://lists.apache.org/thread/qqqv54vyr4gbp63wm2d12q78m8h95xb2
>


Re: [VOTE] FLIP-334: Decoupling autoscaler and kubernetes and support the Standalone Autoscaler

2023-09-13 Thread Dong Lin
Thank you Rui for the proposal.

+1 (binding)

On Wed, Sep 13, 2023 at 10:52 AM Rui Fan <1996fan...@gmail.com> wrote:

> Hi all,
>
> Thanks for all the feedback about the FLIP-334:
> Decoupling autoscaler and kubernetes and
> support the Standalone Autoscaler[1].
> This FLIP was discussed in [2].
>
> I'd like to start a vote for it. The vote will be open for at least 72
> hours (until Sep 16th 11:00 UTC+8) unless there is an objection or
> insufficient votes.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-334+%3A+Decoupling+autoscaler+and+kubernetes+and+support+the+Standalone+Autoscaler
> [2] https://lists.apache.org/thread/kmm03gls1vw4x6vk1ypr9ny9q9522495
>
> Best,
> Rui
>


Re: [VOTE] Release flink-connector-hbase v3.0.0, release candidate 2

2023-09-12 Thread Dong Lin
+1 (binding)

- Verified that the source release can be built successfully.
- Verified that the checksum and gpg files match the corresponding source
release files and maven artifacts.
- Verified that the source archives do not contain any binary file.
- Checked that the source code tag looks good.
- Reviewed release notes.
- Reviewed website PR



On Tue, Sep 12, 2023 at 3:43 PM Leonard Xu  wrote:

> +1 (binding)
>
> - built from source code succeeded
> - verified signatures
> - verified hashsums
> - checked release notes
> - reviewed the web PR
>
> Best,
> Leonard
>
>
> > On Sep 11, 2023, at 7:01 PM, Danny Cranmer 
> wrote:
> >
> > Hey Martijn, thanks for picking this up.
> >
> > +1 (binding)
> >
> > - Release notes look good
> > - Sigs and checksums look good for source archive and Maven repo
> > - Verified there are no binaries in the source archive
> > - Tag exists in Github
> > - Reviewed website PR
> > - Contents of maven repo looks complete
> > - Source archive builds with Maven
> > - Reviewed NOTICE Files
> >  - Note: Copyright header year needs updating
> >
> > Thanks,
> > Danny
> >
> > On Tue, Sep 5, 2023 at 11:36 AM Ferenc Csaky  >
> > wrote:
> >
> >> Hi,
> >>
> >> Thanks Martijn for initiating the release!
> >>
> >> +1 (non-binding)
> >>
> >> - checked signatures and checksums
> >> - checked source has no binaries
> >> - checked LICENSE and NOTICE files
> >> - approved web PR
> >>
> >> Cheers,
> >> Ferenc
> >>
> >>
> >>
> >>
> >> --- Original Message ---
> >> On Monday, September 4th, 2023 at 12:54, Samrat Deb <
> decordea...@gmail.com>
> >> wrote:
> >>
> >>
> >>>
> >>>
> >>> Hi,
> >>>
> >>> +1 (non-binding)
> >>>
> >>> Verified NOTICE files
> >>> Verified CheckSum and signatures
> >>> Glanced through PR[1] , Looks good to me
> >>>
> >>> Bests,
> >>> Samrat
> >>>
> >>> [1]https://github.com/apache/flink-web/pull/591
> >>>
> >>>
>  On 04-Sep-2023, at 2:22 PM, Ahmed Hamdy hamdy10...@gmail.com wrote:
> 
>  Hi Martijn,
>  +1 (non-binding)
> 
>  - verified Checksums and signatures
>  - no binaries in source
>  - Checked NOTICE files contains migrated artifacts
>  - tag is correct
>  - Approved Web PR
> 
>  Best Regards
>  Ahmed Hamdy
> 
>  On Fri, 1 Sept 2023 at 15:35, Martijn Visser martijnvis...@apache.org
>  wrote:
> 
> > Hi everyone,
> >
> > Please review and vote on the release candidate #2 for the version
> >> 3.0.0,
> > as follows:
> > [ ] +1, Approve the release
> > [ ] -1, Do not approve the release (please provide specific comments)
> >
> > The complete staging area is available for your review, which
> >> includes:
> > * JIRA release notes [1],
> > * the official Apache source release to be deployed to
> >> dist.apache.org
> > [2],
> > which are signed with the key with fingerprint
> > A5F3BCE4CBE993573EC5966A65321B8382B219AF [3],
> > * all artifacts to be deployed to the Maven Central Repository [4],
> > * source code tag v3.0.0-rc2 [5],
> > * website pull request listing the new release [6].
> >
> > This replaces the old, cancelled vote of RC1 [7]. This version is the
> > externalized version which is compatible with Flink 1.16 and 1.17.
> >
> > The vote will be open for at least 72 hours. It is adopted by
> >> majority
> > approval, with at least 3 PMC affirmative votes.
> >
> > Thanks,
> > Release Manager
> >
> > [1]
> >
> >
> >>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352578
> > [2]
> >
> >
> >>
> https://dist.apache.org/repos/dist/dev/flink/flink-connector-hbase-3.0.0-rc2
> > [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > [4]
> >> https://repository.apache.org/content/repositories/orgapacheflink-1650
> > [5]
> >
> >> https://github.com/apache/flink-connector-hbase/releases/tag/v3.0.0-rc2
> > [6] https://github.com/apache/flink-web/pull/591
> > [7] https://lists.apache.org/thread/wbl6sc86q9s5mmz5slx4z09svh91cpr0
> >>
>
>


Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-12 Thread Dong Lin
use-case, maybe the right solution is to update
the source to support generating watermark


> > You are right that this is a limitation. However, this is only a
> short-term
> > limitation which we added to make sure that we can focus on the
> capability
> > to switch from backlog=true to backlog=false. In the future, we will
> remove
> > this limitation and also support switching from backlog=false to
> > backlog=true.
>
> I can understand it may be difficult to support runtime mode switching back
> and forth.
> However, I think this should be a limitation of FLIP-327, not FLIP-328.
> IIUC,
> FLIP-309 doesn't have this limitation, right? I just don't understand
> what's the
> challenge to switch a flag?
>

Instead of thinking from a developer's perspective, it might be better to
compare these two approaches based on which approach is more user-friendly.

Note that there is no downside to introducing this limitation in FLIP-328.
This is because there is no use-case for switching the backlog from false
to true until Flink runtime can utilize this change to improve its
performance, which is the case in the near future.

Here are the advantages of introducing this limitation in FLIP-328 compared
to introducing this limitation in FLIP-327:

1) Flink users can know that Flink job will NOT run in backlog=true mode
once the job is runing in backlog=false mode. Otherwise, users may be
expecting the job to run with higher throughput when watermark lag goes up.

It might be useful to note that FLIP-327 does not introduce any API that is
used directly by end users. So it is not straightforward to specify this
limitation for end users in FLIP-327.

2) Developers of Flink operators who want to take advantage of the APIs
introduced in FLIP-327 (e.g. processRecordAttributes1) know that they don't
need to handle the case where backlog switches from true to false.

Although we can move the limitation from FLIP-328 to FLIP-327, it will just
become harder to explain to Flink developers that "source can switch
backlog from false to true but you don't need to handle this situation in
these APIs".

I hope the above reasoning can explain why it is simpler to introduce this
limitation in FLIP-328. Please be aware that this is just a short-term
limitation that aims to minimize the back-and-forth change of APIs while
making the semantics clear to end users.

Best,
Dong


> Best,
> Jark
>
>
> On Sun, 10 Sept 2023 at 19:44, Dong Lin  wrote:
>
> > Hi Jark,
> >
> > Thanks for the comments. Please see my comments inline.
> >
> > On Sat, Sep 9, 2023 at 4:13 PM Jark Wu  wrote:
> >
> > > Hi Xuannan,
> > >
> > > I leave my comments inline.
> > >
> > > > In the case where a user wants to
> > > > use a CDC source and also determine backlog status based on watermark
> > > > lag, we still need to define the rule when that occurs
> > >
> > > This rule should be defined by the source itself (who knows backlog
> > best),
> > > not by the framework. In the case of CDC source, it reports
> > isBacklog=true
> > > during snapshot stage, and report isBacklog=false during changelog
> stage
> > if
> > > watermark-lag is within the threshold.
> > >
> >
> > I am not sure I fully understand the difference between adding a
> job-level
> > config vs. adding a per-source config.
> >
> > In the case of CDC, its watermark lag should be either unde-defined or
> > really large in the snapshot stage. As a result, the proposed job-level
> > config will be applied only in the changelog stage. So there is no
> > difference between these two approaches in this particular case, right?
> >
> > There are two advantages of the job-level config over per-source config:
> >
> > 1) Configuration is simpler. For example, suppose a user has a Flink job
> > that consumes records from multiple Kafka sources and wants to determine
> > backlog status for these Kafka sources using the same watermark lag
> > threshold, there is no need for users to repeatedly specify this
> threshold
> > for each source.
> >
> > 2) There is a smaller number of public APIs overall. In particular,
> instead
> > of repeatedly adding a setProcessingBacklogWatermarkLagThreshold() API
> for
> > every source operator that has even-time watermark lag defined, we only
> > need to add one job-level config. Less public API means better simplicity
> > and maintainability in general.
> >
> > On the other hand, per-source config will be necessary if users want to
> > apply different watermark lag thresholds for different sources in the
> same
> > job. Personally, I find this a b

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-10 Thread Dong Lin
Hi Jark,

Thanks for the comments. Please see my comments inline.

On Sat, Sep 9, 2023 at 4:13 PM Jark Wu  wrote:

> Hi Xuannan,
>
> I leave my comments inline.
>
> > In the case where a user wants to
> > use a CDC source and also determine backlog status based on watermark
> > lag, we still need to define the rule when that occurs
>
> This rule should be defined by the source itself (who knows backlog best),
> not by the framework. In the case of CDC source, it reports isBacklog=true
> during snapshot stage, and report isBacklog=false during changelog stage if
> watermark-lag is within the threshold.
>

I am not sure I fully understand the difference between adding a job-level
config vs. adding a per-source config.

In the case of CDC, its watermark lag should be either unde-defined or
really large in the snapshot stage. As a result, the proposed job-level
config will be applied only in the changelog stage. So there is no
difference between these two approaches in this particular case, right?

There are two advantages of the job-level config over per-source config:

1) Configuration is simpler. For example, suppose a user has a Flink job
that consumes records from multiple Kafka sources and wants to determine
backlog status for these Kafka sources using the same watermark lag
threshold, there is no need for users to repeatedly specify this threshold
for each source.

2) There is a smaller number of public APIs overall. In particular, instead
of repeatedly adding a setProcessingBacklogWatermarkLagThreshold() API for
every source operator that has even-time watermark lag defined, we only
need to add one job-level config. Less public API means better simplicity
and maintainability in general.

On the other hand, per-source config will be necessary if users want to
apply different watermark lag thresholds for different sources in the same
job. Personally, I find this a bit counter-intuitive for users to specify
different watermark lag thresholds in the same job.

Do you think there is any real-word use-case that requires this? Could you
provide a specific use-case where per-source config can provide an
advantage over the job-level config?


> I think it's not intuitive to combine it with the logical OR operation.
> Even for the
> combination logic of backlog status from different channels, FLIP-309 said
> it is
> "up to the operator to determine its output records' isBacklog value" and
> proposed
> 3 different strategies. Therefore, I think backlog status from a single
> source should
> be up to the source.


For both the job-level config and the per-source config, it is eventually
up to the user to decide the computation logic of the backlog status.
Whether this mechanism is implemented at the per-source level or framework
level is probably more like an implementation detail.

Eventually, I think the choice between these two approaches depends on
whether we have any use-case for users to specify different watermark lag
thresholds in the same job.


>
> IMO, a better API design is not how to resolve conflicts but not
> introducing conflicts.


Just to clarify, the current FLIP does not introduce any conflict. Each
source can have its own rule that specifies when the backlog can be true
(e.g. MySql CDC says the backlog should be true during the snapshot stage).
And we can have a job-level config that specifies when the backlog should
be true. Note that it is designed in such a way that none of these rules
specify when the backlog should be false. That is why there is no conflict
by definition.



Let the source determine backlog status removes the conflicts and I don't
> see big
> disadvantages.
>
> > It should not confuse the user that
> > DataStream#assignTimestampsAndWatermarks doesn't work with
> > backlog.watermark-lag-threshold, as it is not a source.
>
> Hmm, so this configuration may confuse Flink SQL users, because all
> watermarks
> are defined on the source DDL, but it may use a separate operator to emit
> watermarks
> if the source doesn't support emitting watermarks.
>

If I understand your comments correctly, you mean that we might have a
Flink SQL DDL with user-defined watermark expressions. And users also want
to set the backlog to true if the watermark generated by that
user-specified expression exceeds a threshold.

That is a good point and use-case. I agree we should also cover this
scenario. And we can update FLIP-328 to mention that the job-level config
will also be applicable when the watermark derived from the Flink SQL DDL
exceeds this threshold. Would this address your concern?


>
> > I think the description in the FLIP actually means the other way
> > around, where the job can never switch back to batch mode once it has
> > switched into streaming mode. This is to align with the current state
> > of FLIP-327[1], where only switching from batch to stream mode is
> > supported.
>
> This sounds like a limitation of FLIP-327 (that execution mode depends on
> backlog status).
> But the 

Re: [VOTE] FLIP-323: Support Attached Execution on Flink Application Completion for Batch Jobs

2023-09-10 Thread Dong Lin
Thanks Allison for proposing the FLIP.

+1 (binding)

On Fri, Sep 8, 2023 at 4:21 AM Allison Chang 
wrote:

> Hi everyone,
>
> Would like to start the VOTE for FLIP-323<
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-323%3A+Support+Attached+Execution+on+Flink+Application+Completion+for+Batch+Jobs>
> which proposes to introduce attached execution for batch jobs. The
> discussion thread can be found here<
> https://lists.apache.org/thread/d3toldk6qqjh2fnbmqthlfkj9rc6lwgl>:
>
>
> Best,
>
> Allison Chang
>
>


Re: [DISCUSS] FLIP-357: Deprecate Iteration API of DataStream

2023-08-31 Thread Dong Lin
Thanks Wencong for initiating the discussion.

+1 for the proposal.

On Fri, Sep 1, 2023 at 12:00 PM Wencong Liu  wrote:

> Hi devs,
>
> I would like to start a discussion on FLIP-357: Deprecate Iteration API of
> DataStream [1].
>
> Currently, the Iteration API of DataStream is incomplete. For instance, it
> lacks support
> for iteration in sync mode and exactly once semantics. Additionally, it
> does not offer the
> ability to set iteration termination conditions. As a result, it's hard
> for developers to
> build an iteration pipeline by DataStream in the practical applications
> such as machine learning.
>
> FLIP-176: Unified Iteration to Support Algorithms [2] has introduced a
> unified iteration library
> in the Flink ML repository. This library addresses all the issues present
> in the Iteration API of
> DataStream and could provide solution for all the iteration use-cases.
> However, maintaining two
> separate implementations of iteration in both the Flink repository and the
> Flink ML repository
> would introduce unnecessary complexity and make it difficult to maintain
> the Iteration API.
>
> As such I propose deprecating the Iteration API of DataStream and removing
> it completely in the next
> major version. In the future, if other modules in the Flink repository
> require the use of the
> Iteration API, we can consider extracting all Iteration implementations
> from the Flink ML repository
> into an independent module.
>
> Looking forward to your feedback.
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-357%3A+Deprecate+Iteration+API+of+DataStream
> [2]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=184615300
>
> Best regards,
>
> Wencong Liu


[DISCUSS] FLIP-331: Support EndOfStreamWindows and isOutputOnEOF operator attribute to optimize task deployment

2023-08-31 Thread Dong Lin
Hi all,

Jinhao (cc'ed) and I are opening this thread to discuss FLIP-331: Support
EndOfStreamWindows and isOutputOnEOF operator attribute to optimize task
deployment. The design doc can be found at
https://cwiki.apache.org/confluence/display/FLINK/FLIP-331%3A+Support+EndOfStreamWindows++and+isOutputOnEOF+operator+attribute+to+optimize+task+deployment
.

This FLIP introduces isOutputOnEOF operator attribute that JobManager can
use to optimize task deployment and resource utilization. In addition, it
also adds EndOfStreamWindows that can be used with the DataStream APIs
(e.g. cogroup, aggregate) to significantly increase throughput and reduce
resource utilization.

We would greatly appreciate any comment or feedback you may have on this
proposal.

Cheers,
Dong


Re: [DISCUSS] FLIP-327: Support stream-batch unified operator to improve job throughput when processing backlog data

2023-08-18 Thread Dong Lin
Hi Piotr,

Thanks for the explanation.

To recap our offline discussion, there is a concern regarding the
capability to dynamically switch between stream and batch modes. This
concern is around unforeseen behaviors such as bugs or performance
regressions, which we might not yet be aware of yet. The reason for this
concern is that this feature involves a fundamental impact on the Flink
runtime's behavior.

Due to the above concern, I agree it is reasonable to annotate related APIs
as experimental. This step would provide us with the flexibility to modify
these APIs if issues arise in the future. This annotation also serves as a
note to users that this functionality might not perform well as expected.

Though I believe that we can ensure the reliability of this feature through
good design and code reviews, comprehensive unit tests, and thorough
integration testing, I agree that it is reasonable to be extra cautious in
this case. Also, it should be OK to delay making these APIs as
non-experimental by 1-2 releases.

I have updated FLIP-327, FLIP-328, and FLIP-331 to mark APIs in these docs
as experimental. Please let me know if you think any other API should also
be marked as experimental.

Thanks!
Dong

On Wed, Aug 16, 2023 at 10:39 PM Piotr Nowojski 
wrote:

> Hi Dong,
>
> Operators API is unfortunately also our public facing API and I mean the
> APIs that we will add there should also be marked `@Experimental` IMO.
>
> The config options should also be marked as experimental (both
> annotated @Experimental and noted the same thing in the docs,
> if @Experimental annotation is not automatically mentioned in the docs).
>
> > Alternatively, how about we add a doc for
> checkpointing.interval-during-backlog explaining its impact/concern as
> discussed above?
>
> We should do this independently from marking the APIs/config options as
> `@Experimental`
>
> Best,
> Piotrek
>
> pt., 11 sie 2023 o 14:55 Dong Lin  napisał(a):
>
> > Hi Piotr,
> >
> > Thanks for the reply!
> >
> > On Fri, Aug 11, 2023 at 4:44 PM Piotr Nowojski  >
> > wrote:
> >
> > > Hi,
> > >
> > > Sorry for the long delay in responding!
> > >
> > > >  Given that it is an optional feature that can be
> > > > turned off by users, it might be OK to just let users try it out and
> we
> > > can
> > > > fix performance issues once we detect any of them. What do you think?
> > >
> > > I think it's fine. It would be best to mark this feature as
> experimental,
> > > and
> > > we say that the config keys or the default values might change in the
> > > future.
> > >
> >
> > In general I agree we can mark APIs that determine "whether to enable
> > dynamic switching between stream/batch mode" as experimental.
> >
> > However, I am not sure we have such an API yet. The APIs added in this
> FLIP
> > are intended to be used by operator developers rather than end users. End
> > users can enable this capability by setting
> > execution.checkpointing.interval-during-backlog = Long.MAX and uses a
> > source which might implicitly set backlog statu (e.g. HybridSource). So
> > execution.checkpointing.interval-during-backlog is the only user-facing
> > APIs that can always control whether this feature can be used.
> >
> > However, execution.checkpointing.interval-during-backlog itself is not
> tied
> > to FLIP-327.
> >
> > Do you mean we should set checkpointing.interval-during-backlog as
> > experimental? Alternatively, how about we add a doc for
> > checkpointing.interval-during-backlog explaining its impact/concern as
> > discussed above?
> >
> > Best,
> > Dong
> >
> >
> > > > Maybe we can revisit the need for such a config when we
> > introduce/discuss
> > > > the capability to switch backlog from false to true in the future.
> What
> > > do
> > > > you think?
> > >
> > > Sure, we can do that.
> > >
> > > Best,
> > > Piotrek
> > >
> > > niedz., 23 lip 2023 o 14:32 Dong Lin  napisał(a):
> > >
> > > > Hi Piotr,
> > > >
> > > > Thanks a lot for the explanation. Please see my reply inline.
> > > >
> > > > On Fri, Jul 21, 2023 at 10:49 PM Piotr Nowojski <
> > > piotr.nowoj...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Dong,
> > > > >
> > > > > Thanks a lot for the answers. I can now only briefly answer your
> last
> > > > > email.
> > > > >
> > > > >

Re: [DISCUSS] FLIP-330: Support specifying record timestamp requirement

2023-08-11 Thread Dong Lin
e added
> they are hard to take back and may cause problems for future developments.
> For this specific assumption, it doesn't come from any contract. It just
> happens to be true with the current implementation.
>
> Best,
>
> Xintong
>
>
>
> On Fri, Aug 11, 2023 at 10:43 AM Dong Lin  wrote:
>
> > Hi Xintong,
> >
> > Thanks for the detailed explanation of your concern. Let me chime in and
> > provide my thoughts on this issue.
> >
> > Please see my comments for each point inline.
> >
> > Overall, it seems that the main concern with this FLIP is that the 2%
> > throughput saving might not be worth the added implementation complexity.
> >
> > IMO, it really depends on what is Flink's *long-term* performance goal in
> > the batch-only scenario. Do we just need Flink to perform reasonably well
> > (maybe with 80-90% throughput of that of Spark) for batch, or do we
> expect
> > Flink to have best possible performance with minimal possible cost?
> >
> > If it is the former goal, then I agree it is debatable whether this FLIP
> is
> > worthwhile. And I would be OK if we decide to postpone this FLIP
> > indefinitely. However, we (the whole Flink developer) needs to be aware
> > that this effectively tells users that: You should only use Flink to run
> a
> > batch-only job if your job needs to be run in stream mode, and Flink does
> > not intended to be an optimal engine for batch computation.
> >
> > If it is the latter goal, the implementation complexity involved in this
> > FLIP will be an inevitable one that we should take rather than avoid.
> Also
> > note that it is also better to make infra change sooner than later. I
> would
> > say the resulting code (associated with this complexity) is also a
> valuable
> > asset of Flink (or any other engine) that intends to be stream-batch
> > unified.
> >
> > IMO, we should aim for the 2nd goal and give users (and developers) the
> > idea that it is a good investment to migrate their existing batch-only
> jobs
> > (and related infra) to Flink because Flink should be able to achieve the
> > best performance for batch-only jobs in the long term.
> >
> > What do you think?
> >
> > Best,
> > Dong
> >
> >
> > On Thu, Aug 10, 2023 at 9:16 PM Xintong Song 
> > wrote:
> >
> > > Hi Yunfeng,
> > >
> > > Thanks for preparing this FLIP. I'm respectful for the efforts you
> > already
> > > put into the PoC implementation and benchmarks. However, I have to say
> > I'm
> > > quite concerned about this proposal.
> > >
> > > 1. The FLIP is based on the assumption that in non-timestamp scenarios
> > > StreamRecord is the only possible sub-type of StreamElement. This
> > > assumption is true with the current sub-types, but is not by
> definition,
> > > and can be broken if we want to introduce more sub-types in future. Or
> > put
> > > it differently, introducing this optimization would limit us from
> > > flexibility extending StreamElement in future, because that may invalid
> > > this optimization and thus introduce regressions.
> > >
> >
> > The goal of this FLIP is to avoid the unnecessary per-record overhead
> (e.g.
> > related to timestamp) that is currently involved to process StreamRecord
> in
> > scenarios where StreamRecord does not need timestamp (which is known at
> job
> > compile time). I suppose we both agree that this can be achieved as of
> the
> > current Flink implementation.
> >
> > In the future, if we have more use-case that requires addition of new
> > sub-types of StreamElement, I believe we should still be able to address
> > such use-case without breaking the optimization introduced in this FLIP.
> > The intuition is that if the use-case knows for sure there is only one
> type
> > of StreamRecord, then the job should **always** be able to hardcode this
> > logic at compile time instead of checking the record type on a per-record
> > basis at runtime.
> >
> > Also know that we might not have to add new subtypes of StreamRecord in
> > order to achieve the future use-case. It seems like an implementation
> > detail that should be decided based on the concrete use-case.
> >
> > If this is not the case, can you please provide a concrete use-case (can
> be
> > a fabricated one) where this optimization can not be achieved? This can
> > help use evaluate whether this concern is realistic.
> >
> >
> > >
> > > 2. Changing LatencyMark

Re: [DISCUSS] FLIP-327: Support stream-batch unified operator to improve job throughput when processing backlog data

2023-08-11 Thread Dong Lin
Hi Piotr,

Thanks for the reply!

On Fri, Aug 11, 2023 at 4:44 PM Piotr Nowojski 
wrote:

> Hi,
>
> Sorry for the long delay in responding!
>
> >  Given that it is an optional feature that can be
> > turned off by users, it might be OK to just let users try it out and we
> can
> > fix performance issues once we detect any of them. What do you think?
>
> I think it's fine. It would be best to mark this feature as experimental,
> and
> we say that the config keys or the default values might change in the
> future.
>

In general I agree we can mark APIs that determine "whether to enable
dynamic switching between stream/batch mode" as experimental.

However, I am not sure we have such an API yet. The APIs added in this FLIP
are intended to be used by operator developers rather than end users. End
users can enable this capability by setting
execution.checkpointing.interval-during-backlog = Long.MAX and uses a
source which might implicitly set backlog statu (e.g. HybridSource). So
execution.checkpointing.interval-during-backlog is the only user-facing
APIs that can always control whether this feature can be used.

However, execution.checkpointing.interval-during-backlog itself is not tied
to FLIP-327.

Do you mean we should set checkpointing.interval-during-backlog as
experimental? Alternatively, how about we add a doc for
checkpointing.interval-during-backlog explaining its impact/concern as
discussed above?

Best,
Dong


> > Maybe we can revisit the need for such a config when we introduce/discuss
> > the capability to switch backlog from false to true in the future. What
> do
> > you think?
>
> Sure, we can do that.
>
> Best,
> Piotrek
>
> niedz., 23 lip 2023 o 14:32 Dong Lin  napisał(a):
>
> > Hi Piotr,
> >
> > Thanks a lot for the explanation. Please see my reply inline.
> >
> > On Fri, Jul 21, 2023 at 10:49 PM Piotr Nowojski <
> piotr.nowoj...@gmail.com>
> > wrote:
> >
> > > Hi Dong,
> > >
> > > Thanks a lot for the answers. I can now only briefly answer your last
> > > email.
> > >
> > > > It is possible that spilling to disks might cause larger overhead.
> IMO
> > it
> > > > is an orthogonal issue already existing in Flink. This is because a
> > Flink
> > > > job running batch mode might also be slower than its throughput in
> > stream
> > > > mode due to the same reason.
> > >
> > > Yes, I know, but the thing that worries me is that previously only a
> user
> > > alone
> > > could decide whether to use batch mode or streaming, and in practice
> one
> > > user would rarely (if ever) use both for the same problem/job/query. If
> > his
> > > intention was to eventually process live data, he was using streaming
> > even
> > > if there was a large backlog at the start (apart of some very few very
> > > power
> > > users).
> > >
> > > With this change, we want to introduce a mode that would be switching
> > back
> > > and forth between streaming and "batch in streaming" automatically. So
> a
> > > potential performance regression would be much more visible and painful
> > > at the same time. If batch query runs slower then it could, it's kind
> of
> > > fine as
> > > it will end at some point. If streaming query during large back
> pressure
> > > maybe
> > > temporary load spike switches to batch processing, that's a bigger
> deal.
> > > Especially if batch processing mode will not be able to actually even
> > > handle
> > > the normal load, after the load spike. In that case, the job could
> never
> > > recover
> > > from the backpressure/backlog mode.
> > >
> >
> > I understand you are concerned with the risk of performance regression
> > introduced due to switching to batch mode.
> >
> > After thinking about this more, I think this existing proposal meets the
> > minimum requirement of "not introducing regression for existing jobs".
> The
> > reason is that even if batch mode can be slower than stream mode for some
> > operators in some cases, this is an optional feature that will only be
> > enabled if a user explicitly overrides the newly introduced config to
> > non-default values. Existing jobs that simply upgrade their Flink library
> > version will not suffer any performance regression.
> >
> > More specifically, in order to switch to batch mode, users will need to
> > explicitly set execution.checkpointing.interval-during-backlog to 0. And
> > users can always explicit

Re: [DISCUSS] FLIP-330: Support specifying record timestamp requirement

2023-08-10 Thread Dong Lin
Hi Xintong,

Thanks for the detailed explanation of your concern. Let me chime in and
provide my thoughts on this issue.

Please see my comments for each point inline.

Overall, it seems that the main concern with this FLIP is that the 2%
throughput saving might not be worth the added implementation complexity.

IMO, it really depends on what is Flink's *long-term* performance goal in
the batch-only scenario. Do we just need Flink to perform reasonably well
(maybe with 80-90% throughput of that of Spark) for batch, or do we expect
Flink to have best possible performance with minimal possible cost?

If it is the former goal, then I agree it is debatable whether this FLIP is
worthwhile. And I would be OK if we decide to postpone this FLIP
indefinitely. However, we (the whole Flink developer) needs to be aware
that this effectively tells users that: You should only use Flink to run a
batch-only job if your job needs to be run in stream mode, and Flink does
not intended to be an optimal engine for batch computation.

If it is the latter goal, the implementation complexity involved in this
FLIP will be an inevitable one that we should take rather than avoid. Also
note that it is also better to make infra change sooner than later. I would
say the resulting code (associated with this complexity) is also a valuable
asset of Flink (or any other engine) that intends to be stream-batch
unified.

IMO, we should aim for the 2nd goal and give users (and developers) the
idea that it is a good investment to migrate their existing batch-only jobs
(and related infra) to Flink because Flink should be able to achieve the
best performance for batch-only jobs in the long term.

What do you think?

Best,
Dong


On Thu, Aug 10, 2023 at 9:16 PM Xintong Song  wrote:

> Hi Yunfeng,
>
> Thanks for preparing this FLIP. I'm respectful for the efforts you already
> put into the PoC implementation and benchmarks. However, I have to say I'm
> quite concerned about this proposal.
>
> 1. The FLIP is based on the assumption that in non-timestamp scenarios
> StreamRecord is the only possible sub-type of StreamElement. This
> assumption is true with the current sub-types, but is not by definition,
> and can be broken if we want to introduce more sub-types in future. Or put
> it differently, introducing this optimization would limit us from
> flexibility extending StreamElement in future, because that may invalid
> this optimization and thus introduce regressions.
>

The goal of this FLIP is to avoid the unnecessary per-record overhead (e.g.
related to timestamp) that is currently involved to process StreamRecord in
scenarios where StreamRecord does not need timestamp (which is known at job
compile time). I suppose we both agree that this can be achieved as of the
current Flink implementation.

In the future, if we have more use-case that requires addition of new
sub-types of StreamElement, I believe we should still be able to address
such use-case without breaking the optimization introduced in this FLIP.
The intuition is that if the use-case knows for sure there is only one type
of StreamRecord, then the job should **always** be able to hardcode this
logic at compile time instead of checking the record type on a per-record
basis at runtime.

Also know that we might not have to add new subtypes of StreamRecord in
order to achieve the future use-case. It seems like an implementation
detail that should be decided based on the concrete use-case.

If this is not the case, can you please provide a concrete use-case (can be
a fabricated one) where this optimization can not be achieved? This can
help use evaluate whether this concern is realistic.


>
> 2. Changing LatencyMarker into a RuntimeEvent is problematic. The purpose
> of LatencyMarker is to measure the end-to-end latency of a record traveling
> through the data flow. RuntimeEvents are consumed with higher priority than
> StreamRecord in the shuffle layer, which may introduce bias on the
> measurement result.
>

Thank you for mentioning this. I think we can update the proposed change
section (and the Flink implementation) so that LatencyMarker will be
consumed with the same priority as the StreamRecord. Would that address you
concern (apart of the possible implementation complexity)?


>
> 3. The proposed configuration option is, TBH, hard for users to understand.
> It requires in-depth understanding of the Flink internals. But this is
> probably not a big problem, if the plan is to eventually make this feature
> default and remove the configuration option.
>

I guess we both agree that it won't be a problem in the long term since we
should be able to remove this config with new operator APIs (e.g. we can
tighten the operator API, or add operator attribute to specify whether it
needs timestamp).

In the short term, this also won't be a problem since optimization is
turned off by default and users won't need to even know this config.
Advanced users can take advantage of this config to 

Re: [DISCUSS] FLIP-326: Enhance Watermark to Support Processing-Time Temporal Join

2023-07-24 Thread Dong Lin
Hi David,

Thank you for the detailed comments and the suggestion of this alternative
approach.

I agree with you that this alternative can also address the target use-case
with the same correctness. In comparison to the current FLIP, this
alternative indeed introduces much less complexity to the Flink runtime
internal implementation.

At a high level, this alternative is simulating a one-time emission of
Watermark(useProcessingTime=true) with periodic emission of
Watermark(timestamp=wall-lock-time).

One downside of this alternative is that it can introduce a bit of extra
per-record runtime overhead. This is because the ingestion time watermark
will be emitted periodically according to pipeline.auto-watermark-interval
(200 ms by default). Thus there is still a short period where the watermark
from the HybridSource can be lagging behind wall-clock time. For operators
whose logic depends on the watermark, such as TemporalRowTimeJoinOperator,
they will need to check build-side watermark and delay/buffer records on
the probe-side until it receives the next ingestion-time watermark.

The impact of this overhead probably depends on the throughput/watermark of
the probe-side records. On the other hand, given that join operator is
typically already heavy (due to state backend access and build-side
buffer), and the watermark from probe-side (e.g. Kafka) is probably also
lagging behind wall-clock time, it is probably not an issue in most cases.
Therefore I agree that it is worth trying this approach. We can revisit
this issue if we any issues around performance or usability of this
approach.

Another potential concern is that it requires the user to use ingestion
time. I am not sure we are able to do this in a backward-compatible way
yet. We probably need to go through the existing APIs around ingestion time
watermark to validate this.

BTW, with the introduction of RecordAttributes(isBacklog=true/false) from
FLIP-327
,
another short-term approach is to let TemporalProcessTimeJoinOperator keep
buffering records from MySQL/HybridSource as long as isBacklog=true, and
process them in a processing-time manner once it receives isBacklog=false.
This should also address the use-case targeted by FLIP-326. The only caveat
with this approach is that it is a bit hacky, because it requires
JoinOpertor to always buffer records when isBacklog=true, whereas
isBacklog's semantics only says it is "optional" to buffer records, which
can be an issue in the long term.

Thanks,
Dong

On Tue, Jul 25, 2023 at 2:37 AM David Anderson  wrote:

> I'm delighted to see interest in developing support for
> processing-time temporal joins.
>
> The proposed implementation seems rather complex, and I'm not
> convinced this complexity is justified/necessary. I'd like to outline
> a simpler alternative that I think would satisfy the key objectives.
>
> Key ideas:
>
> 1. Limit support to the HybridSource (or a derivative thereof). (E.g.,
> I'm guessing the MySQL CDC Source could be reworked to be a hybrid
> source.)
> 2. Have this HybridSource wait to begin emitting watermarks until it
> has handled all events from the bounded sources. (I'm not sure how the
> HybridSource handles this now; if this is an incompatible change, we
> can find a way to deal with that.)
> 3. Instruct users to use an ingestion time watermarking strategy for
> their unbounded source (the source the HybridSource handles last) if
> they want to do something like a processing time temporal join.
>
> One objection to this is the limitation of only supporting the
> HybridSource -- what about cases where the user has a single source,
> e.g., a Kafka topic? I'm suggesting the user would divide their
> build-side stream into two parts -- a bounded component that is fully
> ingested by the hybrid source before watermarking begins, followed by
> an unbounded component.
>
> I think this alternative handles use cases like processing-time
> temporal join rather nicely, without requiring any changes to
> watermarks or the core runtime.
>
> David
>
> On Thu, Jun 29, 2023 at 1:39 AM Martijn Visser 
> wrote:
> >
> > Hi Dong and Xuannan,
> >
> > I'm excited to see this FLIP. I think support for processing-time
> > temporal joins is something that the Flink users will greatly benefit
> > off. I specifically want to call-out that it's great to see the use
> > cases that this enables. From a technical implementation perspective,
> > I defer to the opinion of others with expertise on this topic.
> >
> > Best regards,
> >
> > Martijn
> >
> > On Sun, Jun 25, 2023 at 9:03 AM Xuannan Su 
> wrote:
> > >
> > > Hi all,
> > >
> > > Dong(cc'ed) and I are opening this thread to discuss our proposal to
> > > enhance the watermark to properly support processing-time temporal
> > > join, which has been documented in FLIP-326 [1].
> > >
> > > We want to 

Re: [DISCUSS] FLIP-327: Support stream-batch unified operator to improve job throughput when processing backlog data

2023-07-23 Thread Dong Lin
 up FLIP we could restart the discussion
> about
> switching back and forth.
>

Cool, I added the following statement to the motivation section.

"NOTE: this FLIP focuses only on the capability to switch from batch to
stream mode. If there is any extra API needed to support switching from
stream to batch mode, we will discuss them in a follow-up FLIP."

I am looking forward to reading your follow-up thoughts!

Best,
Dong


> Piotrek
>
> czw., 20 lip 2023 o 16:57 Dong Lin  napisał(a):
>
> > Hi Piotr,
> >
> > Thank you for the very detailed comments! Please see my reply inline.
> >
> > On Thu, Jul 20, 2023 at 12:24 AM Piotr Nowojski <
> piotr.nowoj...@gmail.com>
> > wrote:
> >
> > > Hi Dong,
> > >
> > > I have a couple of follow up questions about switching back and forth
> > > between streaming and batching mode.
> > > Especially around shuffle/watermark strategy, and keyed state backend.
> > >
> > > First of all, it might not always be beneficial to switch into the
> batch
> > > modes:
> > > - Shuffle strategy
> > > - Is sorting going to be purely in-memory? If not, obviously
> spilling
> > > to disks might cause larger overheads
> > >compared to not sorting the records.
> > >
> >
> > Sorting might require spilling data to disk depending on the input size.
> > The behavior of sorting w.r.t. memory/disk is expected to be exactly the
> > same as the behavior of input sorting automatically performed by Flink
> > runtime in batch mode for keyed inputs.
> >
> > More specifically, ExternalSorter
> > <
> >
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ExternalSorter.java
> > >
> > is
> > currently used to sort keyed inputs in batch mode. It is automatically
> used
> > by Flink runtime in OneInputStreamTask (here
> > <
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java#L114
> > >)
> > and in MultiInputSortingDataInput (here
> > <
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java#L188
> > >).
> > We plan to re-use the same code/mechanism to do sorting.
> >
> > It is possible that spilling to disks might cause larger overhead. IMO it
> > is an orthogonal issue already existing in Flink. This is because a Flink
> > job running batch mode might also be slower than its throughput in stream
> > mode due to the same reason. However, even though it is possible in
> theory,
> > I expect that in practice the throughput of using sorting +
> > BatchExecutionKeyedStateBackend should be much higher than using other
> > keyed statebackends when the amount of data is large. As a matter of
> fact,
> > we have not heard of complaints of such performance regression issues in
> > batch mode.
> >
> > The primary goal of this FLIP is to allow the operator to run at the same
> > throughput (in stream mode when there is backlog) as it can currently do
> in
> > batch mode. And this goal is not affected by the disk overhead issue
> > mentioned above.
> >
> > I am thinking maybe we can treat it as an orthogonal performance
> > optimization problem instead of solving this problem in this FLIP?
> >
> > - If it will be at least partially in-memory, does Flink have some
> > > mechanism to reserve optional memory that
> > >   can be revoked if a new operator starts up? Can this memory be
> > > redistributed? Ideally we should use as
> > >   much as possible of the available memory to avoid spilling costs,
> > but
> > > also being able to revoke that memory
> > >
> >
> > This FLIP does not support dynamically revoking/redistribuitng managed
> > memory used by the ExternalSorter.
> >
> > For operators with isInternalSorterSupported = true, we will allocate to
> > this operator execution.sorted-inputs.memory
> > <
> >
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java#L144
> > >
> > amount of managed memory. This is the same as how Flink allocates managed
> > memory to an operator when this operator has keyed inputs in batch mode.
> >
> > Note that this FLIP intends to support operators to sort inputs whenever
> > there is backlog. And there i

[RESULT][VOTE] FLIP-309: Support using larger checkpointing interval when source is processing backlog

2023-07-20 Thread Dong Lin
Hi all,

Thanks everyone for your review and the votes!

I am happy to announce that FLIP-309: Support using larger checkpointing
interval when source is processing backlog [1] has been accepted.

There are 11 binding votes and 1 non-binding vote [2]:
- Rui Fan (binding)
- Jing Ge (binding)
- Piotr Nowojski (binding)
- Jark Wu (binding)
- Jingsong Li (binding)
- Leonard Xu (binding)
- Guowei Ma (binding)
- Yu Xia (binding)
- Zhu Zhu (binding)
- Stefan Richter (binding)
- Dong Lin (binding)
- Hang Ruan

There is no disapproving vote.

Cheers,
Dong

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-309
%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
[2] https://lists.apache.org/thread/1rl1hm7ssr9hyb20p7m20gzt6j245j8n


Re: [DISCUSS] FLIP-327: Support stream-batch unified operator to improve job throughput when processing backlog data

2023-07-20 Thread Dong Lin
heckpointing can explode
> - switch to `BatchExecutionKeyedStateBackend` only if NDK * 2 >= #records
> - do not sort if last NDKs (or EMA of NDK?) 1.5 <= #records
>
> Or even maybe for starters something even simpler and then test out
> something more fancy as a follow up?
>

I agree it is worth investigating these ideas to further optimize the
performance during backlog.

I just think these can be done independently after this FLIP. The focus of
this FLIP is to re-use in stream mode the same optimization which we
already use in batch mode, rather than inventing or improving the
performance of these existing optimizations.

Given that there are already a lot of new mechanism/features to discuss and
address in this FLIP, I am hoping we can limit the scope of this FLIP to
re-use the existing optimization, and do these extra optimization
opportunities as future work.

What do you think?


>
> At the same time, `execution.checkpointing.interval-during-backlog=0` seems
> a weird setting to me, that I would
> not feel safe recommending to anyone. If processing of a backlog takes a
> long time, a job might stop making
> any progress due to some random failures. Especially dangerous if a job

switches from streaming mode back to
> backlog processing due to some reasons, as that could happen months after
> someone started a job with this
> strange setting. So should we even have it? I would simply disallow it. I
>

Good point. I do agree we need to further work to improve the failover
performance in case any task fails.

As of the current FLIP, if any task fails during backlog and
execution.checkpointing.interval-during-backlog = 0, we will need to
restart all operators to the last checkpointed state and continue
processing backlog. And this can be a lot of rollback since there is no
checkpoint during backlog. And this can also be worse than batch since this
FLIP currently does not support exporting/saving records to local disk (or
shuffle service) so that a failed task can re-consume the records from the
upstream task (or shuffle service) in the same way as how Flink failover a
task in batch mode.

I think we can extend this FLIP to solve this problem so that it can have
at least the same behavior/performance as batch-mode job. The idea is to
also follow what batch mode does. For example, we can trigger a checkpoint
when isBacklog switches to true, and every operator should buffer its
output in the TM local disk (or remote shuffle service). Therefore, after a
task fails, it can restart from the last checkpoint and re-consume data
buffered in the upstream task.

I will update FLIP as described above. Would this address your concern?



> could see a power setting like:
> `execution.backlog.use-full-batch-mode-on-start (default false)`
>

I am not sure I fully understand this config or its motivation. Can you
help explain the exact semantics of this config?


> that would override any heuristic of switching to backlog if someone is
> submitting a new job that starts with
> `isBacklog=true`.
>
> Or we could limit the scope of this FLIP to only support starting with
> batch mode and switching only once to
> streaming, and design a follow up with switching back and forth?
>

Sure, that sounds good to me. I am happy to split this FLIP into two FLIPs
so that we can make incremental progress.

Best,
Dong


> I'm looking forwards to hearing/reading out your thoughts.
>
> Best,
> Piotrek
>
>
> śr., 12 lip 2023 o 12:38 Jing Ge  napisał(a):
>
> > Hi Dong,
> >
> > Thanks for your reply!
> >
> > Best regards,
> > Jing
> >
> > On Wed, Jul 12, 2023 at 3:25 AM Dong Lin  wrote:
> >
> > > Hi Jing,
> > >
> > > Thanks for the comments. Please see my reply inline.
> > >
> > > On Wed, Jul 12, 2023 at 5:04 AM Jing Ge 
> > > wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > Thanks for the clarification. Now it is clear for me. I got
> additional
> > > noob
> > > > questions wrt the internal sorter.
> > > >
> > > > 1. when to call setter to set the internalSorterSupported to be true?
> > > >
> > >
> > > Developer of the operator class (i.e. those classes which implements
> > > `StreamOperator`) should override the `#getOperatorAttributes()` API to
> > set
> > > internalSorterSupported to true, if he/she decides to sort records
> > > internally in the operator.
> > >
> > >
> > > > 2
> > > > *"For those operators whose throughput can be considerably improved
> > with
> > > an
> > > > internal sorter, update it to take advantage of the internal sorter
> > when
> > > > its input has isBa

[VOTE] FLIP-309: Support using larger checkpointing interval when source is processing backlog

2023-07-17 Thread Dong Lin
Hi all,

We would like to start the vote for FLIP-309: Support using larger
checkpointing interval when source is processing backlog [1]. This FLIP was
discussed in this thread [2].

The vote will be open until at least July 21st (at least 72 hours), following
the consensus voting process.

Cheers,
Yunfeng and Dong

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-309
%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
[2] https://lists.apache.org/thread/l1l7f30h7zldjp6ow97y70dcthx7tl37


Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-17 Thread Dong Lin
Hi all,

I have updated FLIP-309 as suggested by Piotr to include a reference to
FLIP-328 in the future work section.

Piotra, Stephan, and I discussed offline regarding the choice
between execution.checkpointing.max-interval and
execution.checkpointing.interval-during-backlog.
The advantage of using "max-interval" is that Flink runtime can have more
flexibility to decide when/how to adjust checkpointing intervals (based on
information other than backlog). The advantage of using
"interval-during-backlog" is that it is clearer to the user when/how this
configured interval is used. Since there is no immediate need for the extra
flexibility as of this FLIP, we agreed to go with interval-during-backlog
for now. And we can rename this config to e.g.
execution.checkpointing.max-interval when needed in the future.

Thanks everyone for all the reviews and suggestions! And special thanks to
Piotr and Stephan for taking extra time to provide detailed reviews and
suggestions offline!

Since there is no further comment, I will open the voting thread for this
FLIP.

Cheers,
Dong


On Fri, Jul 14, 2023 at 11:39 PM Piotr Nowojski 
wrote:

> Hi All,
>
> We had a lot of off-line discussions. As a result I would suggest dropping
> the idea of introducing an end-to-end-latency concept, until
> we can properly implement it, which will require more designing and
> experimenting. I would suggest starting with a more manual solution,
> where the user needs to configure concrete parameters, like
> `execution.checkpointing.max-interval` or `execution.flush-interval`.
>
> FLIP-309 looks good to me, I would just rename
> `execution.checkpointing.interval-during-backlog` to
> `execution.checkpointing.max-interval`.
>
> I would also reference future work, that a solution that would allow set
> `isProcessingBacklog` for sources like Kafka will be introduced via
> FLIP-328 [1].
>
> Best,
> Piotrek
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
>
> śr., 12 lip 2023 o 03:49 Dong Lin  napisał(a):
>
> > Hi Piotr,
> >
> > I think I understand your motivation for suggeseting
> > execution.slow-end-to-end-latency now. Please see my followup comments
> > (after the previous email) inline.
> >
> > On Wed, Jul 12, 2023 at 12:32 AM Piotr Nowojski 
> > wrote:
> >
> > > Hi Dong,
> > >
> > > Thanks for the updates, a couple of comments:
> > >
> > > > If a record is generated by a source when the source's
> > > isProcessingBacklog is true, or some of the records used to
> > > > derive this record (by an operator) has isBacklog = true, then this
> > > record should have isBacklog = true. Otherwise,
> > > > this record should have isBacklog = false.
> > >
> > > nit:
> > > I think this conflicts with "Rule of thumb for non-source operators to
> > set
> > > isBacklog = true for the records it emits:"
> > > section later on, when it comes to a case if an operator has mixed
> > > isBacklog = false and isBacklog = true inputs.
> > >
> > > > execution.checkpointing.interval-during-backlog
> > >
> > > Do we need to define this as an interval config parameter? Won't that
> add
> > > an option that will be almost instantly deprecated
> > > because what we actually would like to have is:
> > > execution.slow-end-to-end-latency and execution.end-to-end-latency
> > >
> >
> > I guess you are suggesting that we should allow users to specify a higher
> > end-to-end latency budget for those records that are emitted by two-phase
> > commit sink, than those records that are emitted by none-two-phase commit
> > sink.
> >
> > My concern with this approach is that it will increase the complexity of
> > the definition of "processing latency requirement", as well as the
> > complexity of the Flink runtime code that handles it. Currently, the
> > FLIP-325 defines end-to-end latency as an attribute of the records that
> is
> > statically assigned when the record is generated at the source,
> regardless
> > of how it will be emitted later in the topology. If we make the changes
> > proposed above, we would need to define the latency requirement w.r.t.
> the
> > attribute of the operators that it travels through before its result is
> > emitted, which is less intuitive and more complex.
> >
> > For now, it is not clear whether it is necessary to have two categories
> of
> > latency requirement for the same job. Maybe it is reasonable to assume
> that
> &g

[jira] [Created] (FLINK-32594) Use blocking ResultPartitionType if operator only outputs records on EOF (FLIP-331)

2023-07-15 Thread Dong Lin (Jira)
Dong Lin created FLINK-32594:


 Summary: Use blocking ResultPartitionType if operator only outputs 
records on EOF (FLIP-331)
 Key: FLINK-32594
 URL: https://issues.apache.org/jira/browse/FLINK-32594
 Project: Flink
  Issue Type: New Feature
Reporter: Dong Lin






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-11 Thread Dong Lin
Hi Piotr,

I think I understand your motivation for suggeseting
execution.slow-end-to-end-latency now. Please see my followup comments
(after the previous email) inline.

On Wed, Jul 12, 2023 at 12:32 AM Piotr Nowojski 
wrote:

> Hi Dong,
>
> Thanks for the updates, a couple of comments:
>
> > If a record is generated by a source when the source's
> isProcessingBacklog is true, or some of the records used to
> > derive this record (by an operator) has isBacklog = true, then this
> record should have isBacklog = true. Otherwise,
> > this record should have isBacklog = false.
>
> nit:
> I think this conflicts with "Rule of thumb for non-source operators to set
> isBacklog = true for the records it emits:"
> section later on, when it comes to a case if an operator has mixed
> isBacklog = false and isBacklog = true inputs.
>
> > execution.checkpointing.interval-during-backlog
>
> Do we need to define this as an interval config parameter? Won't that add
> an option that will be almost instantly deprecated
> because what we actually would like to have is:
> execution.slow-end-to-end-latency and execution.end-to-end-latency
>

I guess you are suggesting that we should allow users to specify a higher
end-to-end latency budget for those records that are emitted by two-phase
commit sink, than those records that are emitted by none-two-phase commit
sink.

My concern with this approach is that it will increase the complexity of
the definition of "processing latency requirement", as well as the
complexity of the Flink runtime code that handles it. Currently, the
FLIP-325 defines end-to-end latency as an attribute of the records that is
statically assigned when the record is generated at the source, regardless
of how it will be emitted later in the topology. If we make the changes
proposed above, we would need to define the latency requirement w.r.t. the
attribute of the operators that it travels through before its result is
emitted, which is less intuitive and more complex.

For now, it is not clear whether it is necessary to have two categories of
latency requirement for the same job. Maybe it is reasonable to assume that
if a job has two-phase commit sink and the user is OK to emit some results
at 1 minute interval, then more likely than not the user is also OK to emit
all results at 1 minute interval, include those that go through
none-two-phase commit sink?

If we do want to support different end-to-end latency depending on whether
the operator is emitted by two-phase commit sink, I would prefer to still
use execution.checkpointing.interval-during-backlog instead of
execution.slow-end-to-end-latency. This allows us to keep the concept of
end-to-end latency simple. Also, by explicitly including "checkpointing
interval" in the name of the config that directly affects checkpointing
interval, we can make it easier and more intuitive for users to understand
the impact and set proper value for such configs.

What do you think?

Best,
Dong


> Maybe we can introduce only `execution.slow-end-to-end-latency` (% maybe a
> better name), and for the time being
> use it as the checkpoint interval value during backlog?


> Or do you envision that in the future users will be configuring only:
> - execution.end-to-end-latency
> and only optionally:
> - execution.checkpointing.interval-during-backlog
> ?
>
> Best Piotrek
>
> PS, I will read the summary that you have just published later, but I think
> we don't need to block this FLIP on the
> existence of that high level summary.
>
> wt., 11 lip 2023 o 17:49 Dong Lin  napisał(a):
>
> > Hi Piotr and everyone,
> >
> > I have documented the vision with a summary of the existing work in this
> > doc. Please feel free to review/comment/edit this doc. Looking forward to
> > working with you together in this line of work.
> >
> >
> >
> https://docs.google.com/document/d/1CgxXvPdAbv60R9yrrQAwaRgK3aMAgAL7RPPr799tOsQ/edit?usp=sharing
> >
> > Best,
> > Dong
> >
> > On Tue, Jul 11, 2023 at 1:07 AM Piotr Nowojski  >
> > wrote:
> >
> > > Hi All,
> > >
> > > Me and Dong chatted offline about the above mentioned issues (thanks
> for
> > > that offline chat
> > > I think it helped both of us a lot). The summary is below.
> > >
> > > > Previously, I thought you meant to add a generic logic in
> > > SourceReaderBase
> > > > to read existing metrics (e.g. backpressure) and emit the
> > > > IsProcessingBacklogEvent to SourceCoordinator. I am sorry if I have
> > > > misunderstood your suggetions.
> > > >
> > > > After double-checking your previous suggestion, I am wondering if you
> > are
> > >

Re: [DISCUSS] FLIP-327: Support stream-batch unified operator to improve job throughput when processing backlog data

2023-07-11 Thread Dong Lin
Hi Jing,

Thanks for the comments. Please see my reply inline.

On Wed, Jul 12, 2023 at 5:04 AM Jing Ge  wrote:

> Hi Dong,
>
> Thanks for the clarification. Now it is clear for me. I got additional noob
> questions wrt the internal sorter.
>
> 1. when to call setter to set the internalSorterSupported to be true?
>

Developer of the operator class (i.e. those classes which implements
`StreamOperator`) should override the `#getOperatorAttributes()` API to set
internalSorterSupported to true, if he/she decides to sort records
internally in the operator.


> 2
> *"For those operators whose throughput can be considerably improved with an
> internal sorter, update it to take advantage of the internal sorter when
> its input has isBacklog=true.*
> *Typically, operators that involve aggregation operation (e.g. join,
> cogroup, aggregate) on keyed inputs can benefit from using an internal
> sorter."*
>
> *"The operator that performs CoGroup operation will instantiate two
> internal sorter to sorts records from its two inputs separately. Then it
> can pull the sorted records from these two sorters. This can be done
> without wrapping input records with TaggedUnion<...>. In comparison, the
> existing DataStream#coGroup needs to wrap input records with
> TaggedUnion<...> before sorting them using one external sorter, which
> introduces higher overhead."*
>
> According to the performance test, it seems that internal sorter has better
> performance than external sorter. Is it possible to make those operators
> that can benefit from it use internal sorter by default?
>

Yes, it is possible. After this FLIP is done, users can use
DataStream#coGroup with EndOfStreamWindows as the window assigner to
co-group two streams in effectively the batch manner. An operator that uses
an internal sorter will be used to perform the co-group operation. There is
no need for users of the DataStream API to explicitly know or set the
internal sorter in anyway.

In the future, we plan to incrementally optimize other aggregation
operation (e.g. aggregate) on the DataStream API when EndOfStreamWindows is
used as the window assigner.

Best,
Dong


>
> Best regards,
> Jing
>
>
> On Tue, Jul 11, 2023 at 2:58 PM Dong Lin  wrote:
>
> > Hi Jing,
> >
> > Thank you for the comments! Please see my reply inline.
> >
> > On Tue, Jul 11, 2023 at 5:41 AM Jing Ge 
> > wrote:
> >
> > > Hi Dong,
> > >
> > > Thanks for the proposal! The FLIP is already in good shape. I got some
> > NIT
> > > questions.
> > >
> > > 1. It is a little bit weird to write the hint right after the
> motivation
> > > that some features have been moved to FLIP-331, because at that time,
> > > readers don't know the context about what features does it mean. I
> would
> > > suggest moving the note to the beginning of "Public interfaces"
> sections.
> > >
> >
> > Given that the reviewer who commented on this email thread before I
> > refactored the FLIP (i.e. Piotr) has read FLP-331, I think it is simpler
> to
> > just remove any mention of FLIP-331. I have updated the FLIP accordingly.
> >
> >
> > > 2. It is also a little bit weird to describe all behaviour changes at
> > first
> > > but only focus on one single feature, i.e. how to implement
> > > internalSorterSupported. TBH, I was lost while I was reading the Public
> > > interfaces. Maybe change the FLIP title? Another option could be to
> > write a
> > > short summary of all features and point out that this FLIP will only
> > focus
> > > on the internalSorterSupported feature. Others could be found in
> > FLIP-331.
> > > WDYT?
> > >
> >
> > Conceptually, the purpose of this FLIP is to allow a stream mode job to
> run
> > parts of the topology in batch mode so that it can apply
> > optimizations/computations that can not be used together with
> checkpointing
> > (and thus not usable in stream mode). Although internal sorter is the
> only
> > optimization immediately supported in this FLIP, this FLIP lays the
> > foundation to support other optimizations in the future, such as using
> GPU
> > to process a bounded stream of records.
> >
> > Therefore, I find it better to keep the current title rather than
> limiting
> > the scope to internal sorter. What do you think?
> >
> >
> >
> > > 3. There should be a typo at 4) Checkpoint and failover strategy ->
> Mixed
> > > mode ->
> > >
> > >- If any task fails when isBacklog=false true, this task is
> restarted
> >

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-11 Thread Dong Lin
Hi Piotr,

Thanks for the comments. Please see my reply inline.

On Wed, Jul 12, 2023 at 12:32 AM Piotr Nowojski 
wrote:

> Hi Dong,
>
> Thanks for the updates, a couple of comments:
>
> > If a record is generated by a source when the source's
> isProcessingBacklog is true, or some of the records used to
> > derive this record (by an operator) has isBacklog = true, then this
> record should have isBacklog = true. Otherwise,
> > this record should have isBacklog = false.
>
> nit:
> I think this conflicts with "Rule of thumb for non-source operators to set
> isBacklog = true for the records it emits:"
> section later on, when it comes to a case if an operator has mixed
> isBacklog = false and isBacklog = true inputs.
>

Hmm... I double checked these paragraphs but could not find the conflicts.
Maybe I have missed something. Could you help explain what the conflict is?


>
> > execution.checkpointing.interval-during-backlog
>
> Do we need to define this as an interval config parameter? Won't that add
> an option that will be almost instantly deprecated
> because what we actually would like to have is:
> execution.slow-end-to-end-latency and execution.end-to-end-latency
>
> Maybe we can introduce only `execution.slow-end-to-end-latency` (% maybe a
> better name), and for the time being
> use it as the checkpoint interval value during backlog?
>

Good point!

I think it is feasible (and simpler in the long term) to have only 2
configs (i.e. execution.checkpointing.interval,
execution.end-to-end-latency), instead 3 configs (e.g.
execution.checkpointing.interval, execution.end-to-end-latency,
execution.checkpointing.interval-during-backlog), with the following
semantics:

1) *execution.checkpointing.interval*
It is used as the checkpointing interval if any of the following conditions
are true.
- execution.end-to-end-latency is set to null
- the job does not have any two-phase-commit sink that is processing
non-backlog records.

Typically, users should set execution.checkpointing.interval to upper-bound
the amount of work that will be redo after job failover.

2) *execution.end-to-end-latency*
It is the processing latency requirement for non-backlog records. If it is
not null, and if a two-phase commit sink is processing non-backlog records,
then the checkpointing interval will be set to
execution.end-to-end-latency. Its default value is null.

In order to achieve this goal, in addition to renaming the config from
execution.checkpointing.interval-during-backlog to execution.end-to-end
latency, we will need to add RecordAttributes (from FLIP-325) to propagate
isBacklog value to sink operator, add public API for sink operator, and
update every two-phase commit sink operator to report isBacklog status to
JM so that JM can adjust checkpointing interval.

Overall I would also prefer to take the long term approach rather than
adding a config that we will deprecate in the near future.

If this looks good to you overall, I will update the FLIP as described
above.

BTW, I am not sure I get the idea of why we need both
execution.slow-end-to-end-latency and execution.end-to-end-latency, or why
we need to have "slow" in the config name. Can you help explain it?

What do you think?

Best,
Dong


> Or do you envision that in the future users will be configuring only:
> - execution.end-to-end-latency
> and only optionally:
> - execution.checkpointing.interval-during-backlog
> ?
>
> Best Piotrek
>
> PS, I will read the summary that you have just published later, but I think
> we don't need to block this FLIP on the
> existence of that high level summary.
>
> wt., 11 lip 2023 o 17:49 Dong Lin  napisał(a):
>
> > Hi Piotr and everyone,
> >
> > I have documented the vision with a summary of the existing work in this
> > doc. Please feel free to review/comment/edit this doc. Looking forward to
> > working with you together in this line of work.
> >
> >
> >
> https://docs.google.com/document/d/1CgxXvPdAbv60R9yrrQAwaRgK3aMAgAL7RPPr799tOsQ/edit?usp=sharing
> >
> > Best,
> > Dong
> >
> > On Tue, Jul 11, 2023 at 1:07 AM Piotr Nowojski  >
> > wrote:
> >
> > > Hi All,
> > >
> > > Me and Dong chatted offline about the above mentioned issues (thanks
> for
> > > that offline chat
> > > I think it helped both of us a lot). The summary is below.
> > >
> > > > Previously, I thought you meant to add a generic logic in
> > > SourceReaderBase
> > > > to read existing metrics (e.g. backpressure) and emit the
> > > > IsProcessingBacklogEvent to SourceCoordinator. I am sorry if I have
> > > > misunderstood your suggetions.
> > > >
> > > > After double-checki

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-11 Thread Dong Lin
I need more resources!".
> > While, when there is no backpressure and latency doesn't matter
> (isProcessingBacklog=true), we can limit the resource
> > usage
>
> After thinking this over:
> - the case that we don't have "isProcessingBacklog" information, but the
> source operator is
>   back pressured, must be intermittent. EIther back pressure will go away,
> or shortly we should
>   reach the "isProcessingBacklog" state anyway
> - and even if we implement some back pressure detecting algorithm to switch
> the runtime into the
>   "high latency mode", we can always report that as "isProcessingBacklog"
> anyway, as runtime should
>react the same way in both cases (backpressure and "isProcessingBacklog
> states).
>
> ===
>
> With a common understanding of the final solution that we want to have in
> the future, I'm pretty much fine with the current
> FLIP-309 proposal, with a couple of remarks:
> 1. Could you include in the FLIP-309 the long term solution as we have
> discussed.
> a) Would be nice to have some diagram showing how the
> "isProcessingBacklog" information would be travelling,
>  being aggregated and what will be done with that information.
> (from SourceReader/SplitEnumerator to some
> "component" aggregating it, and then ... ?)
> 2. For me "processing backlog" doesn't necessarily equate to "backpressure"
> (HybridSource can be
> both NOT backpressured and processing backlog at the same time). If you
> think the same way, can you include that
> definition of "processing backlog" in the FLIP including its relation
> to the backpressure state? If not, we need to align
> on that definition first :)
>
> Also I'm missing a big picture description, that would show what are you
> trying to achieve and what's the overarching vision
> behind all of the current and future FLIPs that you are planning in this
> area (FLIP-309, FLIP-325, FLIP-327, FLIP-331, ...?).
> Or was it described somewhere and I've missed it?
>
> Best,
> Piotrek
>
>
>
> czw., 6 lip 2023 o 06:25 Dong Lin  napisał(a):
>
> > Hi Piotr,
> >
> > I am sorry if you feel unhappy or upset with us for not following/fixing
> > your proposal. It is not my intention to give you this feeling. After
> all,
> > we are all trying to make Flink better, to support more use-case with the
> > most maintainable code. I hope you understand that just like you, I have
> > also been doing my best to think through various design options and
> taking
> > time to evalute the pros/cons. Eventually, we probably still need to
> reach
> > consensus by clearly listing and comparing the objective pros/cons of
> > different proposals and identifying the best choice.
> >
> > Regarding your concern (or frustration) that we are always finding issues
> > in your proposal, I would say it is normal (and probably necessary) for
> > developers to find pros/cons in each other's solutions, so that we can
> > eventually pick the right one. I will appreciate anyone who can correctly
> > pinpoint the concrete issue in my proposal so that I can improve it or
> > choose an alternative solution.
> >
> > Regarding your concern that we are not spending enough effort to find
> > solutions and that the problem in your solution can be solved in a
> minute,
> > I would like to say that is not true. For each of your previous
> proposals,
> > I typically spent 1+ hours thinking through your proposal to understand
> > whether it works and why it does not work, and another 1+ hour to write
> > down the details and explain why it does not work. And I have had a
> variety
> > of offline discussions with my colleagues discussing various proposals
> > (including yours) with 6+ hours in total. Maybe I am not capable enough
> to
> > fix those issues in one minute or so so. If you think your proposal can
> be
> > easily fixed in one minute or so, I would really appreciate it if you can
> > think through your proposal and fix it in the first place :)
> >
> > For your information, I have had several long discussions with my
> > colleagues at Alibaba and also Becket on this FLIP. We have seriously
> > considered your proposals and discussed in detail what are the pros/cons
> > and whether we can improve these solutions. The initial version of this
> > FLIP (which allows the source operator to specify checkpoint intervals)
> > does not get enough support due to concerns of not being generic (i.e.
> > users need to specify checkpoint intervals on a per-sourc

Re: [DISCUSS] FLIP-327: Support stream-batch unified operator to improve job throughput when processing backlog data

2023-07-11 Thread Dong Lin
Hi Jing,

Thank you for the comments! Please see my reply inline.

On Tue, Jul 11, 2023 at 5:41 AM Jing Ge  wrote:

> Hi Dong,
>
> Thanks for the proposal! The FLIP is already in good shape. I got some NIT
> questions.
>
> 1. It is a little bit weird to write the hint right after the motivation
> that some features have been moved to FLIP-331, because at that time,
> readers don't know the context about what features does it mean. I would
> suggest moving the note to the beginning of "Public interfaces" sections.
>

Given that the reviewer who commented on this email thread before I
refactored the FLIP (i.e. Piotr) has read FLP-331, I think it is simpler to
just remove any mention of FLIP-331. I have updated the FLIP accordingly.


> 2. It is also a little bit weird to describe all behaviour changes at first
> but only focus on one single feature, i.e. how to implement
> internalSorterSupported. TBH, I was lost while I was reading the Public
> interfaces. Maybe change the FLIP title? Another option could be to write a
> short summary of all features and point out that this FLIP will only focus
> on the internalSorterSupported feature. Others could be found in FLIP-331.
> WDYT?
>

Conceptually, the purpose of this FLIP is to allow a stream mode job to run
parts of the topology in batch mode so that it can apply
optimizations/computations that can not be used together with checkpointing
(and thus not usable in stream mode). Although internal sorter is the only
optimization immediately supported in this FLIP, this FLIP lays the
foundation to support other optimizations in the future, such as using GPU
to process a bounded stream of records.

Therefore, I find it better to keep the current title rather than limiting
the scope to internal sorter. What do you think?



> 3. There should be a typo at 4) Checkpoint and failover strategy -> Mixed
> mode ->
>
>- If any task fails when isBacklog=false true, this task is restarted to
>re-process its input from the beginning.
>
>
Thank you for catching this issue. It is fixed now.

Best,
Dong


>
>
> Best regards
> Jing
>
>
> On Thu, Jul 6, 2023 at 1:24 PM Dong Lin  wrote:
>
> > Hi Piotr,
> >
> > Thanks for your comments! Please see my reply inline.
> >
> > On Wed, Jul 5, 2023 at 11:44 PM Piotr Nowojski  >
> > wrote:
> >
> > > Hi Dong,
> > >
> > > I have a couple of questions.
> > >
> > > Could you explain why those properties
> > >
> > > @Nullable private Boolean isOutputOnEOF = null;
> > > @Nullable private Boolean isOutputOnCheckpoint = null;
> > > @Nullable private Boolean isInternalSorterSupported = null;
> > >
> > > must be `@Nullable`, instead of having the default value set to
> `false`?
> > >
> >
> > By initializing these private variables in OperatorAttributesBuilder as
> > null, we can implement `OperatorAttributesBuilder#build()` in such a way
> > that it can print DEBUG level logging to say "isOutputOnCheckpoint is not
> > explicitly set". This can help user/SRE debug performance issues (or lack
> > of the expected optimization) due to operators not explicitly setting the
> > right operator attribute.
> >
> > For example, we might want a job to always use the longer checkpointing
> > interval (i.e. execution.checkpointing.interval-during-backlog) if all
> > running operators have isOutputOnCheckpoint==false, and use the short
> > checkpointing interval otherwise. If a user has explicitly configured the
> > execution.checkpointing.interval-during-backlog but the two-phase commit
> > sink library has not been upgraded to set isOutputOnCheckpoint=true, then
> > the job will end up using the long checkpointing interval, and it will be
> > useful to figure out what is going wrong in this case by checking the
> log.
> >
> > Note that the default value of these fields of the OperatorAttributes
> > instance built by OperatorAttributesBuilder will still be false. The
> > following is mentioned in the Java doc of
> > `OperatorAttributesBuilder#build()`:
> >
> >  /**
> >   * If any operator attribute is null, we will log it at DEBUG level and
> > use the following
> >   * default values.
> >   * - isOutputOnEOF defaults to false
> >   * - isOutputOnCheckpoint defaults to false
> >   * - isInternalSorterSupported defaults to false
> >   */
> >
> >
> > >
> > > Second question, have you thought about cases where someone is
> > > either bootstrapping from a streaming source like Kafka
> > > or simply trying to catch up after a long period o

Re: [DISCUSS] FLIP-325: Support configuring end-to-end allowed latency

2023-07-11 Thread Dong Lin
Hi Stefan,

Thanks for all the comments! That is really helpful and I have updated the
FLIP based on your comments. Please see my reply inline.

On Mon, Jul 10, 2023 at 10:23 PM Stefan Richter
 wrote:

> Hi,
>
> After reading through the discussion, I think the FLIP should provide
> additional details and explanations about the exact semantics of the
> end-to-end latency configuration and how it interacts with all other
> configurations around latency, such as checkpointing. In this context, I
> have a few more questions:
>

Good point. I agree the FLIP should provide additional details as you
suggested. I have updated FLIP with a "High-level Overview" section. Can
you see if that section could answer your questions?


>
> 1. If the idea of the configuration is to enable operators to apply some
> batching as optimization within the bounds of the

configured latency, how would we use that information to globally optimize
> the operators in a job? Today, no operator knows about the assumed latency
> of its downstream operators. So if the goal is to provide a target latency
> for the whole pipeline, how do we plan to split the “budget” among all
> operators and how can operators locally decide how much latency is ok to
> introduce?


We will not split the "budget" among all operators.
Suppose execution.end-to-end-latency is configured to be larger than 0,
then each operator can *optionally* buffer records until its flush() method
is invoked by Flink runtime, which should happen roughly once every
configured interval. So we expect each operator to be able to buffer
records for up to the configured interval.

Also note that execution.end-to-end-latency is a loose contract and we do
not guarantee the actual end-to-end latency will always be within this
bound.


>

Or did I misunderstand and the configuration is per operator and adds up
> for the whole pipeline? How do window operators fit

into this model, in particular if the window duration is longer than the
> configured end-to-end latency? Would they just forward and ignore flush
> events? But arguably this would lead far away from any end-to-end time
> guarantees.
>

Good question! This case is not covered by the current FLIP and it should
be explicitly explained.

After thinking about this, I think operators that currently buffer records
(e.g. window operator, two-phase commit sink) should do the following:
- When flush() is invoked, it records the fact that flush() has been
invoked (and the last ignored flushEventId) without actually emitting
records.
- When it is able to emit records (e.g. checkpoint triggered), it should
check whether a flush() has been invoked when it was buffering those
records. If yes, then it should additionally emit FlushEvent (using the
last ignored flushEventId) right after it emits the buffered records.

I have updated the public API and the proposed changes section so that we
can enforce the behavior described above. Can you check if it answers your
question?


> 2. How does this idea interact with checkpointing. I know this question
> was asked before and the answer was that this should be independent, but I
> don’t see how it can be independent for exactly-once where we should only
> be able to produce outputs on checkpoints. Would end-to-end latency config
> be rather useless if the checkpoint interval is greater than the end-to-end
> latency config? Or are you planning to adjust the checkpointing interval
> w.r.t. the latency config?
>

The following are mentioned as part of execution.end-to-end-latency's
description: "It's important to note that the actual latency can exceed
this configured value due to factors such as backlog, per-record processing
delays, or operators that hold records until the next checkpoint".

Briefly speaking, execution.end-to-end-latency does not provide a hard
guarantee since we are not able to guarantee it anyway. If the configs are
conflict, such as when execution.end-to-end-latency <
execution.checkpointint.interval and there is two-phase commit sink, the
semantics of existing configuration should be enforced with higher priority
than execution.end-to-end-latency.

I updated the FLIP to include the following statement: "This config is
compatible with all existing configurations/operators in the sense that the
semantics of all existing configuration/operator will be preserved and
enforced with higher priority than execution.end-to-end-latency".

And yes, the end-to-end latency config might be useless if the checkpoint
interval is greater than the end-to-end latency config AND there is
two-phase commit sink. But note that it might be useful if operators before
the two-phase commit sink are the performance bottleneck whose throughput
can increase with extra buffer time.

This FLIP will not adjust checkpointing intervals based on the latency
config. Users are free to adjust the config by themselves as appropriate.


>
> 3. Why do we need record attributes AND flush events? Couldn't the flush
> 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-11 Thread Dong Lin
other? If job is backpressured,
> > we should follow a) and expose to autoscaler/users information "Hey! I'm
> barely keeping up! I need more resources!".
> > While, when there is no backpressure and latency doesn't matter
> (isProcessingBacklog=true), we can limit the resource
> > usage
>
> After thinking this over:
> - the case that we don't have "isProcessingBacklog" information, but the
> source operator is
>   back pressured, must be intermittent. EIther back pressure will go away,
> or shortly we should
>   reach the "isProcessingBacklog" state anyway
> - and even if we implement some back pressure detecting algorithm to switch
> the runtime into the
>   "high latency mode", we can always report that as "isProcessingBacklog"
> anyway, as runtime should
>react the same way in both cases (backpressure and "isProcessingBacklog
> states).
>

I agree. Thanks for providing the thoughts and the summary.


>
> ===
>
> With a common understanding of the final solution that we want to have in
> the future, I'm pretty much fine with the current
> FLIP-309 proposal, with a couple of remarks:
> 1. Could you include in the FLIP-309 the long term solution as we have
> discussed.
> a) Would be nice to have some diagram showing how the
> "isProcessingBacklog" information would be travelling,
>  being aggregated and what will be done with that information.
> (from SourceReader/SplitEnumerator to some
> "component" aggregating it, and then ... ?)
>

Sure, I added a figure in FLIP-309 which shows an example job that joins
records from two inputs with different isProcessingBacklog status. The
figure shows that the isBacklog information travels through the job graph
via the RecordAttributes event.

Not sure this figure is sufficient though. I also added more text to
describe the definition, semantics, and rules of thumb for determining
the isBacklog value across operators in the job.

2. For me "processing backlog" doesn't necessarily equate to "backpressure"
> (HybridSource can be
> both NOT backpressured and processing backlog at the same time). If you
> think the same way, can you include that
> definition of "processing backlog" in the FLIP including its relation
> to the backpressure state? If not, we need to align
> on that definition first :)
>

Yes, I share the same thoughts with you and agree that  "processing
backlog" doesn't necessarily equate to "backpressure".

It is a good point that we should describe in more detail the definition
and the semantics of "processing backlog", as well as how that information
should be determined in each operator and propagate throughout the job
graph. I have updated FLIP-309 with a "High-level overview of
isProcessingBacklog" section to provide this information.

Could you take another look and see if there is further information needed?


>
> Also I'm missing a big picture description, that would show what are you
> trying to achieve and what's the overarching vision
> behind all of the current and future FLIPs that you are planning in this
> area (FLIP-309, FLIP-325, FLIP-327, FLIP-331, ...?).
> Or was it described somewhere and I've missed it?
>

Certainly, I agree it is useful to discuss the high-level vision behind
these FLIPs so that we can be on the same page when discussing these FLIPs
in collaboration.

The overall vision behind these FLIPs is to maximize Flink performance for
stream-batch unified use-case (e.g. those use-cases that need to process a
bounded set of stale data followed by an unbounded set of fresh data).
Currently, even though Flink can generate correctness results for these
use-cases, its performance can be considerably slower than what should be
achievable (e.g. if we can switch from batch to stream mode during the same
job execution). FLIP-309, FLIP-325, FLIP-327, FLIP-331 are aimed at
addressing this problem by improving different parts of the Flink
(checkpoint, operator, task scheduling etc.).

I will summarize the problem, vision, solution and related FLIPs in a
google doc and share it with you later in this email, probably in 1-2 days.

Best,
Dong


>
> Best,
> Piotrek
>
>
>
> czw., 6 lip 2023 o 06:25 Dong Lin  napisał(a):
>
> > Hi Piotr,
> >
> > I am sorry if you feel unhappy or upset with us for not following/fixing
> > your proposal. It is not my intention to give you this feeling. After
> all,
> > we are all trying to make Flink better, to support more use-case with the
> > most maintainable code. I hope you understand that just like you, I have
> > also been doing my best to think through various design options and
> t

Re: [DISCUSS] FLIP-325: Support configuring end-to-end allowed latency

2023-07-08 Thread Dong Lin
Hi Jing,

Thanks for the suggestions. Please see my reply inline.

On Fri, Jul 7, 2023 at 3:50 PM Jing Ge  wrote:

> Hi Dong,
>
> Thanks for your clarification.
>
>
> > Actually, I think it could make sense to toggle isBacklog between true
> and
> > false while the job is running.
> >
>
> If isBacklog is toggled too often back and forth(e.g. by unexpected
> mistake, unstable system, etc), a large amount of RecordAttributes might be
> triggered, which will lead to performance issues. This should not be the
> right way to use RecordAttributes right? Devs and users should be aware of
> it and know how to monitor, maintain, and fix issues.
>
> Your reply contains valuable information. It might make sense to add them
> into the FLIP:
>
> 1. It is up to the operator to decide when to emit RecordAttributes. But
> devs and users should be aware that the number of RecordAttributes should
> not be too high to cause performance issues.
>

Sure, I have updated the FLIP to include the following statement:

"Note: It is up to the operator implementation to decide when (and how
often) to emit RecordAttributes, similar to how operators emit
RecordAttributes. The overhead of emitting Watermark is similar to the
overhead of emitting Watermark"


> 2. Although users can decide how to configure them, the end-to-end latency
> should be (commonly?) configured lower than the checkpoint interval.
>

Since this is related to the performance tuning rather than
correctness/functionality of the core APIs, I added the following sentence
in the Appendix section:

"We expect that end-to-end latency will typically be configured with a
value lower than the checkpoint interval"

3. The three ways you mentioned for how to derive isBacklog.
>

Sure, I have updated the FLIP to include the following information:

"In the future, we expect IsProcessingBacklog can very likely be determined
using the following strategies ..."

Best,
Dong


>
> WDYT?
>
> Best regards,
> Jing
>
>
> On Fri, Jul 7, 2023 at 3:13 AM Dong Lin  wrote:
>
> > Hi Jing,
> >
> > Thanks for the comments. Please see my reply inline.
> >
> > On Fri, Jul 7, 2023 at 5:40 AM Jing Ge 
> wrote:
> >
> > > Hi,
> > >
> > > Thank you all for the inspired discussion. Really appreciate it!
> > >
> > > @Dong I'd like to ask some (stupid) questions to make sure I understand
> > > your thoughts correctly.
> > >
> > > 1. It will make no sense to send the same type of RecordAttributes
> right?
> > > e.g.  if one RecordAttributes(isBacklog=true) has been sent, a new
> > > RecordAttributes will be only sent when isBacklog is changed to be
> false,
> > > and vice versa. In this way, the number of RecordAttributes will be
> very
> > > limited.
> > >
> >
> > Yes, you are right. Actually, this is what we plan to do when we update
> > operators to emit RecordAttributes via `Output#emitRecordAttributes()`.
> >
> > Note that the FLIP does not specify the frequency of how operators should
> > invoke `Output#emitRecordAttributes()`. It is up to the operator
> > to decide when to emit RecordAttributes.
> >
> >
> > > 2. Since source readers can invoke Output#emitRecordAttributes to emit
> > > RecordAttributes(isBacklog=true/false), it might be weird to send
> > > RecordAttributes with different isBacklog back and forth too often.
> Devs
> > > and users should pay attention to it. Something is wrong when such a
> > thing
> > > happens(metrics for monitoring?). Is this correct?
> > >
> >
> >
>
> > Actually, I think it could make sense to toggle isBacklog between true
> and
> > false while the job is running.
> >
> >
>
> > Suppose the job is reading from user-action data from Kafka and there is
> a
> > traffic spike for 2 hours. If the job keeps running in pure stream mode,
> > the watermark lag might keep increasing during this period because the
> > job's processing capability can not catch up with the Kafka input
> > throughput. In this case, it can be beneficial to dynamically switch
> > isBacklog to true when watermarkLag exceeds a given threshold (e.g. 5
> > minutes), and switch isBacklog to false again when the watermarkLag is
> low
> > enough (30 seconds).
> >
> >
> > > 3. Is there any relationship between end-to-end-latency and checkpoint
> > > interval that users should pay attention to? In the example described
> in
> > > the FLIP, both have the same value, 2 min. What about
> end-to-end-latency
> > is
> > > configured

Re: [DISCUSS] FLIP-329: Add operator attribute to specify support for object-reuse

2023-07-06 Thread Dong Lin
Hi Jing,

Thank you for the suggestion. Yes, we can extend it to support null if in
the future we find any use-case for this flexibility.

Best,
Dong

On Thu, Jul 6, 2023 at 7:55 PM Jing Ge  wrote:

> Hi Dong,
>
> one scenario I could imagine is that users could enable global object
> reuse features but force deep copy for some user defined specific functions
> because of any limitations. But that is only my gut feeling. And agree, we
> could keep the solution simple for now as FLIP described and upgrade to 3VL
> once there are such real requirements that are rising.
>
> Best regards,
> Jing
>
> On Thu, Jul 6, 2023 at 12:30 PM Dong Lin  wrote:
>
>> Hi Jing,
>>
>> Thank you for the detailed explanation. Please see my reply inline.
>>
>> On Thu, Jul 6, 2023 at 3:17 AM Jing Ge  wrote:
>>
>>> Hi Xuannan, Hi Dong,
>>>
>>> Thanks for your clarification.
>>>
>>> @Xuannan
>>>
>>> A Jira ticket has been created for the doc update:
>>> https://issues.apache.org/jira/browse/FLINK-32546
>>>
>>> @Dong
>>>
>>> I don't have a concrete example. I just thought about it from a
>>> conceptual or pattern's perspective. Since we have 1. coarse-grained global
>>> switch(CGS as abbreviation), i.e. the pipeline.object-reuse and 2.
>>> fine-grained local switch(FGS as abbreviation), i.e. the
>>> objectReuseCompliant variable for specific operators/functions, there will
>>> be the following patterns with appropriate combinations:
>>>
>>> pattern 1: coarse-grained switch only. Local object reuse will be
>>> controlled by the coarse-grained switch:
>>> 1.1 cgs == true -> local object reused enabled
>>> 1.2 cgs == true  -> local object reused enabled
>>> 1.3 cgs == false -> local object reused disabled, i.e. deep copy enabled
>>> 1.4 cgs == false -> local object reused disabled, i.e. deep copy enabled
>>>
>>> afaiu, this is the starting point. I wrote 4 on purpose to make the
>>> regression check easier. We can consider it as the combinations with
>>> cgs(true/false) and fgs(true/false) while fgs is ignored.
>>>
>>> Now we introduce fine-grained switch. There will be two patterns:
>>>
>>> pattern 2: fine-grained switch over coarse-grained switch.
>>> Coarse-grained switch will be ignored when the local fine-grained switch
>>> has different value:
>>> 2.1 cgs == true and fgs == true -> local object reused enabled
>>> 2.2 cgs == true and fgs == false -> local object reused disabled, i.e.
>>> deep copy enabled
>>> 2.3 cgs == false and fgs == true -> local object reused enabled
>>> 2.4 cgs == false and fgs == false -> local object reused disabled, i.e.
>>> deep copy enabled
>>>
>>> cgs is actually ignored.
>>>
>>> Current FLIP is using a slightly different pattern:
>>>
>>> pattern 3: fine-grained switch over coarse-grained switch only when
>>> coarse-grained switch is off, i..e cgs OR fgs:
>>> 3.1 cgs == true and fgs == true -> local object reused enabled
>>> 3.2 cgs == true and fgs == false -> local object reused enabled
>>> 3.3 cgs == false and fgs == true -> local object reused enabled
>>> 3.4 cgs == false and fgs == false -> local object reused disabled, i.e.
>>> deep copy enabled
>>>
>>> All of those patterns are rational and each has different focus. It
>>> depends on the real requirement to choose one of them.
>>>
>>> As we can see, if fgs is using 2VL, there is a regression between
>>> pattern 1 and pattern 2. You are absolutely right in this case. That's why
>>> I suggested 3VL, i.e. fgs will have triple values: true, false,
>>> unknown(e.g. null)
>>>
>>> pattern 4: 3VL fgs with the null as init value (again, there are just
>>> two combination, I made it 4 on purpose):
>>> 4.1 cgs == true and fgs == null -> local object reused enabled
>>> 4.2 cgs == true and fgs == null -> local object reused enabled
>>> 4.3 cgs == false and fgs == null -> local object reused disabled, i.e.
>>> deep copy enabled
>>> 4.4 cgs == false and fgs == null -> local object reused disabled, i.e.
>>> deep copy enabled
>>>
>>> Since the default value of fgs is null, pattern 4 is backward compatible
>>> with pattern 1, which means no regression.
>>>
>>> Now we will set value to fgs and follow the pattern 2:
>>> 4.5 cgs == true and fgs == true -> local object reused ena

Re: [DISCUSS] FLIP-325: Support configuring end-to-end allowed latency

2023-07-06 Thread Dong Lin
o be 100%, 99%,
or 90%? I would prefer not to have user worry about this if option-1 or
option-2 can be used.

RecordAttributes would be necessary in order to support option-1 and
option-2 well.


> 5. NIT: Just like we talked about in another thread, JavaBean naming
> convention is recommended, i.e. isBacklog() & setBacklog() instead of
> getIsBacklog() and setIsBacklog().
>

Yeah, thanks for the suggestion. I have updated the FLIP as suggested.

Best,
Dong


>
> Best regards,
> Jing
>
> On Thu, Jul 6, 2023 at 2:38 PM Dong Lin  wrote:
>
> > Hi Shammon,
> >
> > Thanks for your comments. Please see my reply inline.
> >
> >
> > On Thu, Jul 6, 2023 at 12:47 PM Shammon FY  wrote:
> >
> > > Hi,
> > >
> > > Thanks for your replay @Dong. I really agree with Piotr's points and I
> > > would like to share some thoughts from my side.
> > >
> > > About the latency for mini-batch mechanism in Flink SQL, I still think
> > the
> > > description in the FLIP is not right. If there are N operators and the
> > > whole process time for data in the job is `t`, then the latency in
> > > mini-batch will be `table.exec.mini-batch.allow-latency`+`t`, not `
> > > table.exec.mini-batch.allow-latency`*N. I think this is one of the
> > > foundations of this FLIP, and you may need to confirm it again.
> > >
> >
> > Given that we agree to have a mechanism to support end-to-end latency for
> > DataStream programs, I think the exact semantics of
> > table.exec.mini-batch.allow-latency will not affect the motivation or API
> > design of this FLIP. I have updated the FLIP to remove any mention of
> > table.exec.mini-batch.allow-latency.
> >
> >
> > >
> > > I think supporting similar mechanisms in the runtime and balance
> latency
> > > and throughput dynamically for all flink jobs is a very good idea, and
> I
> > > have some questions for that.
> > >
> > > 1. We encounter a situation where the workload is high when processing
> > > snapshot data and we need mini-batch in sql for performance reason. But
> > the
> > > workload is low when processing delta data, we need to automatically
> > adjust
> > > the mini-batch SQL for them, or even cancel the mini-batch during delta
> > > processing. I think this FLIP meets our needs, but I think we need a
> > > general solution which covers all source types in flink, and the
> > > `isBacklog` in the FLIP is only one strategy.
> > >
> >
> > The focus of this FLIP is to allow Flink runtime to adjust the behavior
> of
> > operators (e.g. the buffer time) based on the IsBacklog status of sources
> > and the user-specified execution.end-to-end-latency (effective only when
> > there is no backlog). The FLIP assumes there is already a strategy for
> > sources to determine the IsProcessingBacklog status without adding more
> > strategies.
> >
> > I agree it is useful to introduce more strategies to determine the the
> > IsProcessingBacklog status for sources. We can determine the
> > IsProcessingBacklog status based on the backpressure metrics, the
> > event-time watermark lag, or anything we find reasonable. I would like to
> > work on this in follow-up FLIPs and that we don't work on too many things
> > in the same FLIP.
> >
> > Would this be OK with you?
> >
> >
> > > From the FLIP I think there should be two parts: dynamic trigger flush
> > > event in JM and dynamic trigger flush operations in Operator. We need
> to
> > > introduce much more general interfaces for them, such as
> > > `DynamicFlushStrategy` in JM and `DynamicFlushOperation` in TM? As
> Piotr
> > > mentioned above, we can collect many information from TM locally such
> as
> > > backpressure, queue size and `Operator` can decide whether to buffer
> data
> > > or process it immediately.  JM is also the same, it can decide to send
> > > flush events on a regular basis or send them based on the collected
> > metrics
> > > information and other information, such as the isBacklog in the FLIP.
> > >
> > > 2. I really don't get enough benefits for `RecordAttribute` in the FLIP
> > and
> > > as Piotr mentioned above too, it will generate a large number of
> > messages,
> > >
> >
> > If there is any sentence in the FLIP that suggests we will emit a lot of
> > RecordAttribute, sorry for that and I would fix it.
> >
> > Currently, the FLIP provides the `Output#emitRecordAttributes()` for
> > operators 

Re: [DISCUSS] FLIP-325: Support configuring end-to-end allowed latency

2023-07-06 Thread Dong Lin
pect? So in case of backpressure, we should still
> > adhere to the configured e2e latency,
> >   and wait for the user or autoscaler to scale up the job?
> >
> > In case of a), I think the concept of "isProcessingBacklog" is not
> needed,
> > we could steer the behaviour only
> > using the backpressure information.
> >
> > On the other hand, in case of b), "isProcessingBacklog" information might
> > be helpful, to let Flink know that
> > we can safely decrease the e2e latency/checkpoint interval even if there
> > is no backpressure, to use fewer
> > resources (and let the autoscaler scale down the job).
> >
> > Do we want to have both, or only one of those? Do a) and b) complement
> one
> > another? If job is backpressured,
> > we should follow a) and expose to autoscaler/users information "Hey! I'm
> > barely keeping up! I need more resources!".
> > While, when there is no backpressure and latency doesn't matter
> > (isProcessingBacklog=true), we can limit the resource
> > usage.
> >
> > And a couple of more concrete remarks about the current proposal.
> >
> > 1.
> >
> > > I think the goal is to allow users to specify an end-to-end latency
> > budget for the job.
> >
> > I fully agree with this, but in that case, why are you proposing to add
> > `execution.flush.interval`? That's
> > yet another parameter that would affect e2e latency, without actually
> > defining it. We already have things
> > like: execution.checkpointing.interval, execution.buffer-timeout. I'm
> > pretty sure very few Flink users would be
> > able to configure or understand all of them.
> >
> > I think we should simplify configuration and try to define
> > "execution.end-to-end-latency" so the runtime
> > could derive other things from this new configuration.
> >
> > 2. How do you envision `#flush()` and `#snapshotState()` to be connected?
> > So far, `#snapshotState()`
> > was considered as a kind of `#flush()` signal. Do we need both? Shouldn't
> > `#flush()` be implicitly or
> > explicitly attached to the `#snapshotState()` call?
> >
> > 3. What about unaligned checkpoints if we have separate `#flush()`
> > event/signal?
> >
> > 4. How should this be working in at-least-once mode (especially sources
> > that are configured to be working
> > in at-least-once mode)?.
> >
> > 5. How is this FLIP connected with FLI-327? I think they are trying to
> > achieve basically the same thing:
> > optimise when data should be flushed/committed to balance between
> > throughput and latency.
> >
> > 6.
> >
> > > Add RecordAttributesBuilder and RecordAttributes that extends
> > StreamElement to provide operator with essential
> > > information about the records they receive, such as whether the records
> > are already stale due to backlog.
> >
> > Passing along `RecordAttribute` for every `StreamElement` would be an
> > extremely inefficient solution.
> >
> > If at all, this should be a marker propagated through the JobGraph vie
> > Events or sent from JM to TMs via an RPC
> > that would mark "backlog processing started/ended". Note that Events
> might
> > be costly, as they need to be
> > broadcasted. So with a job having 5 keyBy exchanges and parallelism of
> > 1000, the number of events sent is
> > ~4 000 000, while the number of RPCs would be only 5000.
> >
> > In case we want to only check for the backpressure, we don't need any
> > extra signal. Operators/subtasks can
> > get that information very easily from the TMs runtime.
> >
> > Best,
> > Piotrek
> >
> > czw., 29 cze 2023 o 17:19 Dong Lin  napisał(a):
> >
> >> Hi Shammon,
> >>
> >> Thanks for your comments. Please see my reply inline.
> >>
> >> On Thu, Jun 29, 2023 at 6:01 PM Shammon FY  wrote:
> >>
> >> > Hi Dong and Yunfeng,
> >> >
> >> > Thanks for bringing up this discussion.
> >> >
> >> > As described in the FLIP, the differences between `end-to-end latency`
> >> and
> >> > `table.exec.mini-batch.allow-latency` are: "It allows users to specify
> >> the
> >> > end-to-end latency, whereas table.exec.mini-batch.allow-latency
> applies
> >> to
> >> > each operator. If there are N operators on the path from source to
> sink,
> >> > the end-to-end latency could be up to
> >> table.exec.mini-batch.allow-latency *
>

Re: [DISCUSS] FLIP-327: Support stream-batch unified operator to improve job throughput when processing backlog data

2023-07-06 Thread Dong Lin
Hi Piotr,

Thanks for your comments! Please see my reply inline.

On Wed, Jul 5, 2023 at 11:44 PM Piotr Nowojski 
wrote:

> Hi Dong,
>
> I have a couple of questions.
>
> Could you explain why those properties
>
> @Nullable private Boolean isOutputOnEOF = null;
> @Nullable private Boolean isOutputOnCheckpoint = null;
> @Nullable private Boolean isInternalSorterSupported = null;
>
> must be `@Nullable`, instead of having the default value set to `false`?
>

By initializing these private variables in OperatorAttributesBuilder as
null, we can implement `OperatorAttributesBuilder#build()` in such a way
that it can print DEBUG level logging to say "isOutputOnCheckpoint is not
explicitly set". This can help user/SRE debug performance issues (or lack
of the expected optimization) due to operators not explicitly setting the
right operator attribute.

For example, we might want a job to always use the longer checkpointing
interval (i.e. execution.checkpointing.interval-during-backlog) if all
running operators have isOutputOnCheckpoint==false, and use the short
checkpointing interval otherwise. If a user has explicitly configured the
execution.checkpointing.interval-during-backlog but the two-phase commit
sink library has not been upgraded to set isOutputOnCheckpoint=true, then
the job will end up using the long checkpointing interval, and it will be
useful to figure out what is going wrong in this case by checking the log.

Note that the default value of these fields of the OperatorAttributes
instance built by OperatorAttributesBuilder will still be false. The
following is mentioned in the Java doc of
`OperatorAttributesBuilder#build()`:

 /**
  * If any operator attribute is null, we will log it at DEBUG level and
use the following
  * default values.
  * - isOutputOnEOF defaults to false
  * - isOutputOnCheckpoint defaults to false
  * - isInternalSorterSupported defaults to false
  */


>
> Second question, have you thought about cases where someone is
> either bootstrapping from a streaming source like Kafka
> or simply trying to catch up after a long period of downtime in a purely
> streaming job? Generally speaking a cases where
> user doesn't care about latency in the catch up phase, regardless if the
> source is bounded or unbounded, but wants to process
> the data as fast as possible, and then switch dynamically to real time
> processing?
>

Yes, I have thought about this. We should allow this job to effectively run
in batch mode when the job is in the catch-up phase. FLIP-327 is actually
an important step toward addressing this use-case.

In order to address the above use-case, all we need is a way for source
operator (e.g. Kafka) to tell Flink runtime (via IsProcessingBacklog)
whether it is in the catch-up phase.

Since every Kafka message has event-timestamp, we can allow users to
specify a job-level config such as backlog-watermark-lag-threshold, and
consider a Kafka Source to have IsProcessingBacklog=true if system_time -
watermark > backlog-watermark-lag-threshold. This effectively allows us to
determine whether Kafka is in the catch up phase.

Once we have this capability (I plan to work on this in FLIP-328), we can
directly use the features proposed in FLIP-325 and FLIP-327 to optimize the
above use-case.

What do you think?

Best,
Dong


>
> Best,
> Piotrek
>
> niedz., 2 lip 2023 o 16:15 Dong Lin  napisał(a):
>
> > Hi all,
> >
> > I am opening this thread to discuss FLIP-327: Support stream-batch
> unified
> > operator to improve job throughput when processing backlog data. The
> design
> > doc can be found at
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+stream-batch+unified+operator+to+improve+job+throughput+when+processing+backlog+data
> > .
> >
> > This FLIP enables a Flink job to initially operate in batch mode,
> achieving
> > high throughput while processing records that do not require low
> processing
> > latency. Subsequently, the job can seamlessly transition to stream mode
> for
> > processing real-time records with low latency. Importantly, the same
> state
> > can be utilized before and after this mode switch, making it particularly
> > valuable when users wish to bootstrap the job's state using historical
> > data.
> >
> > We would greatly appreciate any comments or feedback you may have on this
> > proposal.
> >
> > Cheers,
> > Dong
> >
>


Re: [DISCUSS] FLIP-329: Add operator attribute to specify support for object-reuse

2023-07-06 Thread Dong Lin
m a conceptual perspective and trying to
make it similar to hierarchical RBAC. However, after thinking through this,
I still could not find a use-case where we actually need this flexibility.
In particular, for cases where a user has explicitly configured
pipeline.object-reuse to true, that means the user already knows (or takes
the responsibility of ensuring) that correctness can be achieved, why would
the user want to explicitly disable the object-reuse for an operator?

Regards,
Dong


>
> It is commonly used in the hierarchical RBAC to enable more fine-grained
> access control of sub role.
>
> I hope I have been able to explain myself clearly. Looking forward to your
> feedback.
>
> Best regards,
> Jing
>
>
>
> On Wed, Jul 5, 2023 at 12:47 PM Dong Lin  wrote:
>
>> Hi Jing,
>>
>> Thanks for the comments! Please find below my comments, which are based
>> on the offline discussion with Xuannan.
>>
>> On Wed, Jul 5, 2023 at 1:36 AM Jing Ge  wrote:
>>
>>> Hi Xuannan, Hi Dong
>>>
>>> Thanks for the Proposal! After reading the FLIP, I'd like to ask some
>>> questions:
>>>
>>> 1. Naming convention for boolean variables. It is recommended to follow
>>> JavaBean [1], i.e. objectReuseCompliant as the variable name with
>>> isObjectReuseCompliant() and setObjectReuseCompliant() as the methods' name.
>>>
>>>
>> Good point. We have updated the FLIP as suggested.
>>
>>
>>>
>>> 2.
>>>
>>>-
>>>
>>>*If pipeline.object-reuse is set to true, records emitted by this
>>>operator will be re-used.*
>>>-
>>>
>>>*Otherwise, if getIsObjectReuseCompliant() returns true, records
>>>emitted by this operator will be re-used.*
>>>-
>>>
>>>*Otherwise, records emitted by this operator will be deep-copied
>>>before being given to the next operator in the chain.*
>>>
>>>
>>> If I understand you correctly,  the hard coding objectReusedCompliant
>>> should have higher priority over the configuration, the checking logic
>>> should be:
>>>
>>>-
>>>
>>>*If getIsObjectReuseCompliant() returns true, records emitted by
>>>this operator will be re-used.*
>>>-
>>>
>>>*Otherwise, if pipeline.object-reuse is set to true, records emitted
>>>by this operator will be re-used.*
>>>-
>>>
>>>*Otherwise, records emitted by this operator will be deep-copied
>>>before being given to the next operator in the chain.*
>>>
>>>
>>> The results are the same but the checking logics are different, but
>>> there are some additional thoughts, which lead us to the next question.
>>>
>>
>>>
>>
>>> 3. Current design lets specific operators enable object reuse and ignore
>>> the global config. There could be another thought, on the contrary: if an
>>> operator has hard coded the objectReuseCompliant as false, i.e. disable
>>> object reuse on purpose, records should not be reused even if the global
>>> config pipeline.object-reused is set to true, which turns out that the
>>> objectReuseCompliant could be a triple value logic: ture: force object
>>> reusing; false: force deep-copying; unknown: depends on
>>> pipeline.object-reuse config.
>>>
>>
>> With the current proposal, if pipeline.object-reused == true and
>> operatorA's objectReuseCompliant == false, Flink will not deep-copy
>> operatorA's output. I think you are suggesting changing the behavior such
>> that Flink should deep-copy the operatorA's output.
>>
>> Could you explain what is the advantage of this approach compared to the
>> approach described in the FLIP?
>>
>> My concern with this approach is that it can cause performance
>> regression. This is an operator's objectReuseCompliant will be false by
>> default unless it is explicitly overridden. For those jobs which are
>> currently configured with pipeline.object-reused = true, these jobs will
>> likely start to have lower performance (due to object deep-copy) after
>> upgrading to the newer Flink version.
>>
>> Best,
>> Dong
>>
>>
>>>
>>> Best regards,
>>> Jing
>>>
>>>
>>> [1] https://en.wikipedia.org/wiki/JavaBeans
>>>
>>> On Mon, Jul 3, 2023 at 4:25 AM Xuannan Su  wrote:
>>>
>>>> Hi all,
>>>>
>>>> Dong(cc'ed) and I are opening this thread to discuss our proposal to
>>>> add operator attribute to allow operator to specify support for
>>>> object-reuse [1].
>>>>
>>>> Currently, the default configuration for pipeline.object-reuse is set
>>>> to false to avoid data corruption, which can result in suboptimal
>>>> performance. We propose adding APIs that operators can utilize to
>>>> inform the Flink runtime whether it is safe to reuse the emitted
>>>> records. This enhancement would enable Flink to maximize its
>>>> performance using the default configuration.
>>>>
>>>> Please refer to the FLIP document for more details about the proposed
>>>> design and implementation. We welcome any feedback and opinions on
>>>> this proposal.
>>>>
>>>> Best regards,
>>>>
>>>> Dong and Xuannan
>>>>
>>>> [1]
>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255073749
>>>>
>>>


Re: [DISCUSS] FLIP-325: Support configuring end-to-end allowed latency

2023-07-06 Thread Dong Lin
ven when execution.end-to-end-latency is configured.

The goal of FLIP-327 requires us to update the logics of building JobGraph,
such as allowing the output edge of an operator to be blocking in the
stream mode, and add managed memory for an operator that claims it uses an
internal sorter.


> 6.
>
> > Add RecordAttributesBuilder and RecordAttributes that extends
> StreamElement to provide operator with essential
> > information about the records they receive, such as whether the records
> are already stale due to backlog.
>
> Passing along `RecordAttribute` for every `StreamElement` would be an
> extremely inefficient solution.
>

Currently, the FLIP provides the `Output#emitRecordAttributes()` for
operators (e.g. source reader) to emit RecordAttributes. The FLIP leaves
the operator to decide the frequency and value of the emitted
RecordAttributes.

Our plan is to let SourceReader emit RecordAttributes only when its value
(e.g. isBacklog) differs from the value of the RecordAttributes it has
emitted earlier. It should avoid resending RecordAttributes with the same
value, similar to how Flink currently avoids resending
Watermark/WatermarkStatus with the same value.

Would it address your concern?


> If at all, this should be a marker propagated through the JobGraph vie
> Events or sent from JM to TMs via an RPC
> that would mark "backlog processing started/ended". Note that Events might
> be costly, as they need to be
> broadcasted. So with a job having 5 keyBy exchanges and parallelism of
> 1000, the number of events sent is
> ~4 000 000, while the number of RPCs would be only 5000.
>

I think we probably need to propagate the IsBacklog (or IsBackPressure,
whatever name we find reasonable) signal from sources to downstream nodes
similar to how we currently propagate watermarks. The benefit is that we
can effectively align the IsBacklog signal with the data records, and tell
operator the following information:

records received before receiving RecordAttributes(isBacklog=false) can to
be processed/emitted with high processing latency because these records are
already very stale due to backlog; but records received after
RecordAttributes(isBacklog=false) are fresh and need to be processed with
low processing latency.

It would be hard to have this semantics if we let JM send the signal
directly to all the TMs.


>
> In case we want to only check for the backpressure, we don't need any extra
> signal. Operators/subtasks can
> get that information very easily from the TMs runtime.


We would need the extra signal to cover the case where the IsBacklog status
needs to be derived from source state (e.g. MySQL source snapshot/binlog
stage).

When we want to only check for the backpressure, another potential benefit
of propagating isBacklog from source operators to downstream operators is
that we might take advantage of features in FLIP-327, dynamically disable
checkpoint, and apply sorting to optimize the operator throughput. IMHO,
such optimization is probably easier to understand and operate by having
source operators decide the backpressure, than letting each operator decide
the pressure status using local information.

What do you think? I am looking forward to your suggestions!

Best,
Dong


> Best,
> Piotrek
>
> czw., 29 cze 2023 o 17:19 Dong Lin  napisał(a):
>
> > Hi Shammon,
> >
> > Thanks for your comments. Please see my reply inline.
> >
> > On Thu, Jun 29, 2023 at 6:01 PM Shammon FY  wrote:
> >
> > > Hi Dong and Yunfeng,
> > >
> > > Thanks for bringing up this discussion.
> > >
> > > As described in the FLIP, the differences between `end-to-end latency`
> > and
> > > `table.exec.mini-batch.allow-latency` are: "It allows users to specify
> > the
> > > end-to-end latency, whereas table.exec.mini-batch.allow-latency applies
> > to
> > > each operator. If there are N operators on the path from source to
> sink,
> > > the end-to-end latency could be up to
> > table.exec.mini-batch.allow-latency *
> > > N".
> > >
> > > If I understand correctly, `table.exec.mini-batch.allow-latency` is
> also
> > > applied to the end-to-end latency for a job, maybe @Jack Wu can give
> more
> > > information.
> > >
> >
> > Based on what I can tell from the doc/code and offline discussion, I
> > believe table.exec.mini-batch.allow-latency is not applied to the
> > end-to-end latency for a job.
> >
> > It is mentioned here
> > <
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/config/
> > >
> > that
> > table.exec.mini-batch.allow-latency is "the maximum latency can be used
> for
> > MiniBatch to buffer input

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-05 Thread Dong Lin
Hi Piotr,

I am sorry if you feel unhappy or upset with us for not following/fixing
your proposal. It is not my intention to give you this feeling. After all,
we are all trying to make Flink better, to support more use-case with the
most maintainable code. I hope you understand that just like you, I have
also been doing my best to think through various design options and taking
time to evalute the pros/cons. Eventually, we probably still need to reach
consensus by clearly listing and comparing the objective pros/cons of
different proposals and identifying the best choice.

Regarding your concern (or frustration) that we are always finding issues
in your proposal, I would say it is normal (and probably necessary) for
developers to find pros/cons in each other's solutions, so that we can
eventually pick the right one. I will appreciate anyone who can correctly
pinpoint the concrete issue in my proposal so that I can improve it or
choose an alternative solution.

Regarding your concern that we are not spending enough effort to find
solutions and that the problem in your solution can be solved in a minute,
I would like to say that is not true. For each of your previous proposals,
I typically spent 1+ hours thinking through your proposal to understand
whether it works and why it does not work, and another 1+ hour to write
down the details and explain why it does not work. And I have had a variety
of offline discussions with my colleagues discussing various proposals
(including yours) with 6+ hours in total. Maybe I am not capable enough to
fix those issues in one minute or so so. If you think your proposal can be
easily fixed in one minute or so, I would really appreciate it if you can
think through your proposal and fix it in the first place :)

For your information, I have had several long discussions with my
colleagues at Alibaba and also Becket on this FLIP. We have seriously
considered your proposals and discussed in detail what are the pros/cons
and whether we can improve these solutions. The initial version of this
FLIP (which allows the source operator to specify checkpoint intervals)
does not get enough support due to concerns of not being generic (i.e.
users need to specify checkpoint intervals on a per-source basis). It is
only after I updated the FLIP to use the job-level
execution.checkpointing.interval-during-backlog, then they agree to give +1
to the FLIP. What I want to tell you is that your suggestions have been
taken seriously, and the quality of the FLIP has been taken seriously
by all those who have voted. As a result of taking your suggestion
seriously and trying to find improvements, we updated the FLIP to use
isProcessingBacklog.

I am wondering, do you think it will be useful to discuss face-to-face via
video conference call? It is not just between you and me. We can invite the
developers who are interested to join and help with the discussion. That
might improve communication efficiency and help us understand each other
better :)

I am writing this long email to hopefully get your understanding. I care
much more about the quality of the eventual solution rather than who
proposed the solution. Please bear with me and see my comments inline, with
an explanation of the pros/cons of these proposals.


On Wed, Jul 5, 2023 at 11:06 PM Piotr Nowojski 
wrote:

> Hi Guys,
>
> I would like to ask you again, to spend a bit more effort on trying to find
> solutions, not just pointing out problems. For 1.5 months,
> the discussion doesn't go in circle, but I'm suggesting a solution, you are
> trying to undermine it with some arguments, I'm coming
> back with a fix, often an extremely easy one, only for you to try to find
> yet another "issue". It doesn't bode well, if you are finding
> a "problem" that can be solved with a minute or so of thinking or even has
> already been solved.
>
> I have provided you so far with at least three distinct solutions that
> could address your exact target use-case. Two [1][2] generic
> enough to be probably good enough for the foreseeable future, one
> intermediate and not generic [3] but which wouldn't
> require @Public API changes or some custom hidden interfaces.


> All in all:
> - [1] with added metric hints like "isProcessingBacklog" solves your target
> use case pretty well. Downside is having to improve
>   how JM is collecting/aggregating metrics
>

Here is my analysis of this proposal compared to the current approach in
the FLIP-309.

pros:
- No need to add the public API
SplitEnumeratorContext#setIsProcessingBacklog.
cons:
- Need to add a public API that subclasses of SourceReader can use to
specify its IsProcessingBacklog metric value.
- Source Coordinator needs to periodically pull the isProcessingBacklog
metrics from all TMs throughout the job execution.

Here is why I think the cons outweigh the pros:
1) JM needs to collect/aggregate metrics with extra runtime overhead, which
is not necessary for the target use-case with the push-based approach in

Re: [DISCUSS] FLIP-329: Add operator attribute to specify support for object-reuse

2023-07-05 Thread Dong Lin
Hi Jing,

Thanks for the comments! Please find below my comments, which are based on
the offline discussion with Xuannan.

On Wed, Jul 5, 2023 at 1:36 AM Jing Ge  wrote:

> Hi Xuannan, Hi Dong
>
> Thanks for the Proposal! After reading the FLIP, I'd like to ask some
> questions:
>
> 1. Naming convention for boolean variables. It is recommended to follow
> JavaBean [1], i.e. objectReuseCompliant as the variable name with
> isObjectReuseCompliant() and setObjectReuseCompliant() as the methods' name.
>
>
Good point. We have updated the FLIP as suggested.


>
> 2.
>
>-
>
>*If pipeline.object-reuse is set to true, records emitted by this
>operator will be re-used.*
>-
>
>*Otherwise, if getIsObjectReuseCompliant() returns true, records
>emitted by this operator will be re-used.*
>-
>
>*Otherwise, records emitted by this operator will be deep-copied
>before being given to the next operator in the chain.*
>
>
> If I understand you correctly,  the hard coding objectReusedCompliant
> should have higher priority over the configuration, the checking logic
> should be:
>
>-
>
>*If getIsObjectReuseCompliant() returns true, records emitted by this
>operator will be re-used.*
>-
>
>*Otherwise, if pipeline.object-reuse is set to true, records emitted
>by this operator will be re-used.*
>-
>
>*Otherwise, records emitted by this operator will be deep-copied
>before being given to the next operator in the chain.*
>
>
> The results are the same but the checking logics are different, but there
> are some additional thoughts, which lead us to the next question.
>

>

> 3. Current design lets specific operators enable object reuse and ignore
> the global config. There could be another thought, on the contrary: if an
> operator has hard coded the objectReuseCompliant as false, i.e. disable
> object reuse on purpose, records should not be reused even if the global
> config pipeline.object-reused is set to true, which turns out that the
> objectReuseCompliant could be a triple value logic: ture: force object
> reusing; false: force deep-copying; unknown: depends on
> pipeline.object-reuse config.
>

With the current proposal, if pipeline.object-reused == true and
operatorA's objectReuseCompliant == false, Flink will not deep-copy
operatorA's output. I think you are suggesting changing the behavior such
that Flink should deep-copy the operatorA's output.

Could you explain what is the advantage of this approach compared to the
approach described in the FLIP?

My concern with this approach is that it can cause performance regression.
This is an operator's objectReuseCompliant will be false by default unless
it is explicitly overridden. For those jobs which are currently configured
with pipeline.object-reused = true, these jobs will likely start to have
lower performance (due to object deep-copy) after upgrading to the newer
Flink version.

Best,
Dong


>
> Best regards,
> Jing
>
>
> [1] https://en.wikipedia.org/wiki/JavaBeans
>
> On Mon, Jul 3, 2023 at 4:25 AM Xuannan Su  wrote:
>
>> Hi all,
>>
>> Dong(cc'ed) and I are opening this thread to discuss our proposal to
>> add operator attribute to allow operator to specify support for
>> object-reuse [1].
>>
>> Currently, the default configuration for pipeline.object-reuse is set
>> to false to avoid data corruption, which can result in suboptimal
>> performance. We propose adding APIs that operators can utilize to
>> inform the Flink runtime whether it is safe to reuse the emitted
>> records. This enhancement would enable Flink to maximize its
>> performance using the default configuration.
>>
>> Please refer to the FLIP document for more details about the proposed
>> design and implementation. We welcome any feedback and opinions on
>> this proposal.
>>
>> Best regards,
>>
>> Dong and Xuannan
>>
>> [1]
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255073749
>>
>


Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-04 Thread Dong Lin
Hi Piotr,

Any suggestion on how we can practically move forward to address the target
use-case?

My understanding is that the current proposal does not have any
correctness/performance issues. And it allows the extension to support all
the extra use-case without having to throw away the proposed APIs.

If you prefer to have a better solution with simpler APIs and yet same or
better correctness/performance for the target use-case, could you please
kindly explain its API design so that we can continue the discussion?


Best,
Dong

On Mon, Jul 3, 2023 at 6:39 PM Dong Lin  wrote:

> Hi Piotr,
>
> Please see my comments inline.
>
> On Mon, Jul 3, 2023 at 5:19 PM Piotr Nowojski 
> wrote:
>
>> Hi Dong,
>>
>> Starting from the end:
>>
>> > It seems that the only benefit of this approach is to avoid"
>> > adding SplitEnumeratorContext#setIsProcessingBacklog."
>>
>> Yes, that's the major benefit of this counter-proposal.
>>
>> > In the target use-case, user still want to do checkpoint (though at a"
>> > larger interval) when there is backlog. And HybridSource need to know
>> the"
>> > expected checkpoint interval during backlog in order to determine
>> whether"
>> > it should keep throwing CheckpointException. Thus, we still need to add"
>> > execution.checkpointing.interval-during-backlog for user to specify
>> this"
>> > information."
>> >
>> > The downside of this approach is that it is hard to enforce the"
>> > semantics specified by execution.checkpointing.interval-during-backlog.
>> For"
>> > example, suppose execution.checkpointing.interval =3 minute and"
>> > execution.checkpointing.interval-during-backlog = 7 minutes. During the"
>> > backlog phase, checkpoint coordinator will still trigger the checkpoint"
>> > once every 3 minutes. HybridSource will need to reject 2 out of the 3"
>> > checkpoint invocation, and the effective checkpoint interval will be 9"
>> > minutes."
>>
>> Does it really matter what's the exact value of the longer interval? Can
>> not we
>> hard-code it to be 5x or 10x of the base checkpoint interval? If there is
>> a
>> notice
>> able overhead from the base interval slowing down records processing rate,
>> reducing this interval by a factor of 5x or 10x, would fix performance
>> issue for
>> vast majority of users. So a source could just skip 4 out of 5 or 9 out of
>> 10
>> checkpoints.
>>
>
> Yes, I think the exact value of the longer interval matters.
>
> The main reason we need two intervals is for jobs which have two-phase
> commit sink. The short interval typically represents the interval that a
> user can accept for the two-phase commit sink to buffer data (since it can
> only emit data when checkpoint is triggered). And the long interval
> typically represents the maximum amount of duplicate work (in terms of
> time) that a job need to re-do after failover.
>
> Since there is no intrinsic relationship between the data buffer interval
> (related to processing latency) and the failover boundary, I don't think we
> can hardcode it to be 5x or 10x of the base checkpoint interval.
>
>
>> Alternatively we could introduce a config option like:
>>
>> execution.checkpointing.long-interval
>>
>> that might be re-used in the future, with more fancy algorithms, but I
>> don't see
>> much value in doing that.
>
>
>> > Overall, I think the solution is a bit hacky. I think it is preferred
>> to"
>> > throw exception only when there is indeed error. If we don't need to
>> check"
>> > a checkpoint, it is preferred to not trigger the checkpoint in the
>> first"
>> > place. And I think adding SplitEnumeratorContext#setIsProcessingBacklog
>> is"
>> > probably not that much of a big deal."
>>
>> Yes it's hacky, but at least it doesn't require extending the Public API
>> for a
>> quite limited solution, that only targets one or two sources that are
>> rarely used.
>>
>
> I am not sure it is fair to say MySQL CDC source is "rarely used".
> ververica/flink-cdc-connectors GitHub repo has 4K + starts. Also, note that
> the proposed feature can be useful for CDC sources with an internal
> "backlog phase". Its usage is not limited to just the two sources mentioned
> in the FLIP.
>
>
>>
>> 
>>
>> About the idea of emitting "RecordAttributes(isBacklog=..)". I have a
>> feeling that
>&

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-03 Thread Dong Lin
gt; 1. This solution is not generic enough
> 2. I can see solutions that wouldn't require modification of every source
> 3. They would have zero overlap with the interfaces extension from this
> FLIP
>


I have the feeling our discussion is kind of in a loop, where you ask for a
solution without any change to the source (so that it is generic), I
explain why I am not able to find such a solution and the drawback of your
proposed solution, and then you repeat the same ask and insist this is
possible.

If you can find a solution that wouldn't require modification of every
source and still address the target use-case well, could you please kindly
rephrase your solution so that we can revisit it?

I assume this solution would not require extra config from users, would not
cause the job to use long checkpoint interval due to random/short traffic
spikes, and would not cause the job to use the short interval when the job
is still reading backlog data.

I would be happy to be proven wrong if you else can provide such a solution
without the aforementioned drawbacks. I just hope we don't block the FLIP
forever for a goal that no one can address.

Best,
Dong


>
> Best,
> Piotrek
>
> sob., 1 lip 2023 o 17:01 Dong Lin  napisał(a):
>
> > Hi Piotr,
> >
> > Thank you for providing further suggestions to help improve the API.
> Please
> > see my comments inline.
> >
> > On Fri, Jun 30, 2023 at 10:35 PM Piotr Nowojski 
> > wrote:
> >
> > > Hey,
> > >
> > > Sorry for a late reply, I was OoO for a week. I have three things to
> > point
> > > out.
> > >
> > > 1. ===
> > >
> > > The updated proposal is indeed better, but to be honest I still don't
> > like
> > > it, for mostly the same reasons that I have mentioned earlier:
> > > - only a partial solution, that doesn't address all use cases, so we
> > would
> > > need to throw it away sooner or later
> > > - I don't see and it hasn't been discussed how to make it work out of
> the
> > > box for all sources
> > > - somehow complicating API for people implementing Sources
> > > - it should work out of the box for most of the sources, or at least to
> > > have that potential in the future
> > >
> >
> > The moments above seem kind of "abstract". I am hoping to understand more
> > technical details behind these comments so that we can see how to address
> > the concern. For example, even if a FLP does not address all use-case
> > (which is arguably true for every FLIP), its solution does not
> necessarily
> > need to be thrown away later as long as it is extensible. So we probably
> > need to understand specifically why the proposed APIs would be thrown
> away.
> >
> > Similarly, we would need to understand if there is a better design to
> make
> > the API simpler and work out of the box etc. in order to decide how to
> > address these comments.
> >
> >
> > > On top of that:
> > > - the FLIP I think is missing how to hook up SplitEnumeratorContext and
> > > CheckpointCoordinator to pass "isProcessingBacklog"
> > >
> >
> > I think it can be passed via the following function chain:
> > - CheckpointCoordinator invokes
> > OperatorCoordinatorCheckpointContext#isProcessingBacklog (via
> > coordinatorsToCheckpoint) to get this information.
> > - OperatorCoordinatorHolder implements
> > OperatorCoordinatorCheckpointContext#isProcessingBacklog and returns
> > OperatorCoordinator#isProcessingBacklog (via coordinator)
> > - SourceCoordinator implements OperatorCoordinator#isProcessingBacklog
> and
> > returns SourceCoordinatorContext#isProcessingBacklog
> > - SourceCoordinatorContext will implement
> > SplitEnumeratorContext#setIsProcessingBacklog and stores the given
> > information in a variable.
> >
> > Note that it involves only internal API. We might be able to find a
> simpler
> > solution with less functions on the path. As long as the above solution
> > works without having any performance or correctness, I think maybe we
> > should focus on the public API design and discuss the implementation in
> the
> > PR review?
> >
> > - the FLIP suggests to use the long checkpointing interval as long as any
> > > subtask is processing the backlog. Are you sure that's the right call?
> > What
> > > if other
> > >   sources are producing fresh records, and those fresh records are
> > reaching
> > > sinks? It could happen either with disjoint JobGraph, embarrassing
> > parallel
> > >   JobGraph (no keyB

[DISCUSS] FLIP-327: Support stream-batch unified operator to improve job throughput when processing backlog data

2023-07-02 Thread Dong Lin
Hi all,

I am opening this thread to discuss FLIP-327: Support stream-batch unified
operator to improve job throughput when processing backlog data. The design
doc can be found at
https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+stream-batch+unified+operator+to+improve+job+throughput+when+processing+backlog+data
.

This FLIP enables a Flink job to initially operate in batch mode, achieving
high throughput while processing records that do not require low processing
latency. Subsequently, the job can seamlessly transition to stream mode for
processing real-time records with low latency. Importantly, the same state
can be utilized before and after this mode switch, making it particularly
valuable when users wish to bootstrap the job's state using historical data.

We would greatly appreciate any comments or feedback you may have on this
proposal.

Cheers,
Dong


Re: [VOTE] FLIP-309: Support using larger checkpointing interval when source is processing backlog

2023-07-01 Thread Dong Lin
Hi Chesnay,

Thank you for your comments and I would be happy to discuss together to
find a solution.

I just want to note that the discussion thread for this FLIP has been open
for almost two months for everyone to leave comments. I will really
appreciate it if in the future you can help provide comments earlier in the
discussion thread so that I (and probably other contributors) can have the
chance to address your concern and achieve consensus sooner than later. I
am hoping we can be more considerate and help each other in the community
be more productive.

Thanks,
Dong

On Fri, Jun 30, 2023 at 11:18 PM Chesnay Schepler 
wrote:

> -1 (binding)
>
> I feel like this FLIP needs a bit more time in the oven.
>
> It seems to be very light on actual details; you can summarize the
> entire changes section as "The enumerator calls this method and then
> another checkpoint interval is used."
> I would love to know how this is wired into the triggering of
> checkpoints, what the behavior is with multiple sources, if a sink is
> allowed to set this at any point or just once, what the semantics of a
> "backlog" are for sources other than Hybrid/ MySQL CDC (because catching
> up after a failover is a common enough pattern), whether/how this
> information could also be interesting for the scheduler (because we may
> want to avoid rescalings during the backlog processing), whether the
> backlog processing should be exposed as a metric for users (or for that
> matter, how we inform users at all that we're using a different
> checkpoint interval at this time).
>
> Following my discussion with Piotr and Stefan I'm also not sure how
> future-proof the proposed API really is. Already I feel like the name
> "setIsProcessingBacklog()" is rather specific for the state of the
> source (making it technically wrong to call it in other situations like
> being backpressured (again, depending on what "backlog processing" even
> means)), while not being clear on what this actually results in. The
> javadocs don't even mention the checkpointing interval at all, but
> instead reference downstream optimizations that, afaict, aren't
> mentioned in the FLIP.
>
> I'd be very hesitant with marking it as public from the get-go. Ideally
> it would maybe even be added as a separate interface (somehow).
>
> On 30/06/2023 16:37, Piotr Nowojski wrote:
> > Hey,
> >
> > Sorry to disturb this voting, but after discussing this thoroughly with
> > Chesnay and Stefan Richter I have to vote:
> >   -1 (binding)
> > mainly to suspend the current voting thread. Please take a look at my
> mail
> > at dev mailing list.
> >
> > Best,
> > Piotrek
> >
> > czw., 29 cze 2023 o 14:59 feng xiangyu 
> napisał(a):
> >
> >> +1 (non-binding)
> >>
> >> Best,
> >> Xiangyu
> >>
> >> yuxia  于2023年6月29日周四 20:44写道:
> >>
> >>> +1 (binding)
> >>>
> >>> Best regards,
> >>> Yuxia
> >>>
> >>> - 原始邮件 -
> >>> 发件人: "Yuepeng Pan" 
> >>> 收件人: "dev" 
> >>> 发送时间: 星期四, 2023年 6 月 29日 下午 8:21:14
> >>> 主题: Re: [VOTE] FLIP-309: Support using larger checkpointing interval
> when
> >>> source is processing backlog
> >>>
> >>> +1  non-binding.
> >>>
> >>>
> >>> Best.
> >>> Yuepeng Pan
> >>>
> >>>
> >>>  Replied Message 
> >>> | From | Jingsong Li |
> >>> | Date | 06/29/2023 13:25 |
> >>> | To | dev |
> >>> | Cc | flink.zhouyunfeng |
> >>> | Subject | Re: [VOTE] FLIP-309: Support using larger checkpointing
> >>> interval when source is processing backlog |
> >>> +1 binding
> >>>
> >>> On Thu, Jun 29, 2023 at 11:03 AM Dong Lin  wrote:
> >>>> Hi all,
> >>>>
> >>>> We would like to start the vote for FLIP-309: Support using larger
> >>>> checkpointing interval when source is processing backlog [1]. This
> FLIP
> >>> was
> >>>> discussed in this thread [2].
> >>>>
> >>>> Flink 1.18 release will feature freeze on July 11. We hope to make
> this
> >>>> feature available in Flink 1.18.
> >>>>
> >>>> The vote will be open until at least July 4th (at least 72 hours),
> >>> following
> >>>> the consensus voting process.
> >>>>
> >>>> Cheers,
> >>>> Yunfeng and Dong
> >>>>
> >>>> [1]
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> >>>> [2] https://lists.apache.org/thread/l1l7f30h7zldjp6ow97y70dcthx7tl37
>
>
>


Re: [VOTE] FLIP-321: introduce an API deprecation process

2023-07-01 Thread Dong Lin
Thanks for the FLIP.

+1 (binding)

On Fri, Jun 30, 2023 at 5:39 PM Becket Qin  wrote:

> Hi folks,
>
> I'd like to start the VOTE for FLIP-321[1] which proposes to introduce an
> API deprecation process to Flink. The discussion thread for the FLIP can be
> found here[2].
>
> The vote will be open until at least July 4, following the consensus voting
> process.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-321%3A+Introduce+an+API+deprecation+process
> [2] https://lists.apache.org/thread/vmhzv8fcw2b33pqxp43486owrxbkd5x9
>


Re: [VOTE] FLIP-309: Support using larger checkpointing interval when source is processing backlog

2023-07-01 Thread Dong Lin
Hi Chesnay, can you put your comments in the discussion thread, so that we
can continue the technical discussion there?


Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-01 Thread Dong Lin
ion(TRIGGER_CHECKPOINT_FAILURE)`?
> Actually we could also introduce a dedicated `CheckpointFailureReason` for
> that purpose and handle it some special way in some places (like maybe hide
> such rejected checkpoints from the REST API/WebUI). We could elaborate on
> this a bit more, but after a brief thinking  I could see it actually
> working well
> enough without any public facing changes. But I might be wrong here.
>
> If this feature actually grabs traction, we could expand it to something
> more sophisticated available via a public API in the future.
>

In the target use-case, user still want to do checkpoint (though at a
larger interval) when there is backlog. And HybridSource need to know the
expected checkpoint interval during backlog in order to determine whether
it should keep throwing CheckpointException. Thus, we still need to add
execution.checkpointing.interval-during-backlog for user to specify this
information.

It seems that the only benefit of this approach is to avoid
adding SplitEnumeratorContext#setIsProcessingBacklog.

The downside of this approach is that it is hard to enforce the
semantics specified by execution.checkpointing.interval-during-backlog. For
example, suppose execution.checkpointing.interval =3 minute and
execution.checkpointing.interval-during-backlog = 7 minutes. During the
backlog phase, checkpoint coordinator will still trigger the checkpoint
once every 3 minutes. HybridSource will need to reject 2 out of the 3
checkpoint invocation, and the effective checkpoint interval will be 9
minutes.

Overall, I think the solution is a bit hacky. I think it is preferred to
throw exception only when there is indeed error. If we don't need to check
a checkpoint, it is preferred to not trigger the checkpoint in the first
place. And I think adding SplitEnumeratorContext#setIsProcessingBacklog is
probably not that much of a big deal.

Thanks for all the comments. I am looking forward to your thoughts.

Best,
Dong


>
> ===
>
> Sorry for disturbing this FLIP discussion and voting.
>
> Best,
> Piotrek
>
> czw., 29 cze 2023 o 05:08 feng xiangyu  napisał(a):
>
> > Hi Dong,
> >
> > Thanks for your quick reply. I think this has truly solved our problem
> and
> > will enable us upgrade our existing jobs more seamless.
> >
> > Best,
> > Xiangyu
> >
> > Dong Lin  于2023年6月29日周四 10:50写道:
> >
> > > Hi Feng,
> > >
> > > Thanks for the feedback. Yes, you can configure the
> > > execution.checkpointing.interval-during-backlog to effectively disable
> > > checkpoint during backlog.
> > >
> > > Prior to your comment, the FLIP allows users to do this by setting the
> > > config value to something large (e.g. 365 day). After thinking about
> this
> > > more, we think it is more usable to allow users to achieve this goal by
> > > setting the config value to 0. This is consistent with the existing
> > > behavior of execution.checkpointing.interval -- the checkpoint is
> > disabled
> > > if user set execution.checkpointing.interval to 0.
> > >
> > > We have updated the description of
> > > execution.checkpointing.interval-during-backlog
> > > to say the following:
> > > ... it is not null, the value must either be 0, which means the
> > checkpoint
> > > is disabled during backlog, or be larger than or equal to
> > > execution.checkpointing.interval.
> > >
> > > Does this address your need?
> > >
> > > Best,
> > > Dong
> > >
> > >
> > >
> > > On Thu, Jun 29, 2023 at 9:23 AM feng xiangyu 
> > wrote:
> > >
> > > > Hi Dong and Yunfeng,
> > > >
> > > > Thanks for the proposal, your flip sounds very useful from my
> > > perspective.
> > > > In our business, when we using hybrid source in production we also
> met
> > > the
> > > > problem described in your flip.
> > > > In our solution, we tend to skip making any checkpoints before all
> > batch
> > > > tasks have finished and resume the periodic checkpoint only in
> > streaming
> > > > phrase. Within this flip, we can solve our problem in a more generic
> > way.
> > > >
> > > > However, I am wondering if we still want to skip making any
> checkpoints
> > > > during historical phrase, can we set this configuration
> > > > "execution.checkpointing.interval-during-backlog" equals "-1" to
> cover
> > > this
> > > > case?
> > > >
> > > > Best,
> > > > Xiangyu
> > > >
&

Re: [RESULT] [VOTE] Apache Flink ML Release 2.2.0, release candidate #2

2023-06-29 Thread Dong Lin
Sorry, I used the wrong title. I just sent another email with the right
title to announce the voting result.

On Fri, Jun 30, 2023 at 7:26 AM Dong Lin  wrote:

> Hi all,
>
> I'm happy to announce that we have unanimously approved this release [1].
>
> There are 5 approving votes, 3 of which are binding:
>
> - Dian Fu (binding)
> - Xingbo Huang (binding)
> - Dong Lin (binding)
> - Xin Jiang (non-binding)
> - Zhipeng Zhang (non-binding)
>
> There are no disapproving votes.
>
> Thank you all for verifying the release candidate. We will now proceed to
> finalize the release and announce it once everything is published.
>
> [1] https://lists.apache.org/thread/408q8q70zvqj3h7cvyzd5d4v7l3l6r82
>
> Cheers,
> Dong
>


[RESULT] [VOTE] Apache Flink ML Release 2.3.0, release candidate #1

2023-06-29 Thread Dong Lin
Hi all,

I'm happy to announce that we have unanimously approved this release [1].

There are 5 approving votes, 3 of which are binding:

- Dian Fu (binding)
- Xingbo Huang (binding)
- Dong Lin (binding)
- Xin Jiang (non-binding)
- Zhipeng Zhang (non-binding)

There are no disapproving votes.

Thank you all for verifying the release candidate. We will now proceed to
finalize the release and announce it once everything is published.

[1] https://lists.apache.org/thread/408q8q70zvqj3h7cvyzd5d4v7l3l6r82

Cheers,
Dong


[RESULT] [VOTE] Apache Flink ML Release 2.2.0, release candidate #2

2023-06-29 Thread Dong Lin
Hi all,

I'm happy to announce that we have unanimously approved this release [1].

There are 5 approving votes, 3 of which are binding:

- Dian Fu (binding)
- Xingbo Huang (binding)
- Dong Lin (binding)
- Xin Jiang (non-binding)
- Zhipeng Zhang (non-binding)

There are no disapproving votes.

Thank you all for verifying the release candidate. We will now proceed to
finalize the release and announce it once everything is published.

[1] https://lists.apache.org/thread/408q8q70zvqj3h7cvyzd5d4v7l3l6r82

Cheers,
Dong


Re: [VOTE] Apache Flink ML Release 2.3.0, release candidate #1

2023-06-29 Thread Dong Lin
I will also vote +1 (binding).

- Verified checksums and GPG for all maven artifacts and source
distributions
- Successfully run the Flink ML Python/Java quickstart using source
distributions.
- Verified that the source distributions do not contain any unwanted
binaries.
- Built the source distribution and ensured that all source files have
Apache headers.
- Browsed through JIRA release notes files and did not find anything
unexpected.
- Browsed through README.md files and did not find anything unexpected.

Thanks everyone for help verify the release!

Since it has been more than 72 hours and we have got the required number of
votes, I will close the vote and proceed to finalize the release.


On Thu, Jun 29, 2023 at 10:05 PM Xingbo Huang  wrote:

> +1 (binding)
>
> - Verified the checksums and GPG files.
> - Reviewed the website PR.
> - Check the Python package apache-flink-ml-2.3.0.tar.gz
>
> Best,
> Xingbo
>
> Dian Fu  于2023年6月29日周四 19:46写道:
>
> > +1 (binding)
> >
> > - Verified the checksums and signatures.
> > - The website PR LGTM
> >
> > Regards,
> > Dian
> >
> > On Thu, Jun 29, 2023 at 7:03 PM Zhipeng Zhang 
> > wrote:
> > >
> > > Thanks Dong and Xin for driving this release.
> > >
> > > +1 (non-binding)
> > >
> > > - Verified that the checksums and GPG files.
> > > - Verified that the source distributions do not contain any binaries.
> > > - Browsed through JIRA release notes files.
> > > - Browsed through README.md files.
> > >
> > > Xin Jiang  于2023年6月29日周四 12:08写道:
> > > >
> > > > Hi Dong,
> > > >
> > > > Thanks for driving this release.
> > > >
> > > > +1 (non-binding)
> > > >
> > > > - Verified that the checksums and GPG files.
> > > > - Verified that the source distributions do not contain any binaries.
> > > > - Built the source distribution and run all unit tests.
> > > > - Verified that all POM files point to the same version.
> > > > - Browsed through JIRA release notes files.
> > > > - Browsed through README.md files.
> > > >
> > > >
> > > > Best Regards,
> > > > Xin
> > >
> > >
> > >
> > > --
> > > best,
> > > Zhipeng
> >
>


Re: [DISCUSS] FLIP-325: Support configuring end-to-end allowed latency

2023-06-29 Thread Dong Lin
Hi Shammon,

Thanks for your comments. Please see my reply inline.

On Thu, Jun 29, 2023 at 6:01 PM Shammon FY  wrote:

> Hi Dong and Yunfeng,
>
> Thanks for bringing up this discussion.
>
> As described in the FLIP, the differences between `end-to-end latency` and
> `table.exec.mini-batch.allow-latency` are: "It allows users to specify the
> end-to-end latency, whereas table.exec.mini-batch.allow-latency applies to
> each operator. If there are N operators on the path from source to sink,
> the end-to-end latency could be up to table.exec.mini-batch.allow-latency *
> N".
>
> If I understand correctly, `table.exec.mini-batch.allow-latency` is also
> applied to the end-to-end latency for a job, maybe @Jack Wu can give more
> information.
>

Based on what I can tell from the doc/code and offline discussion, I
believe table.exec.mini-batch.allow-latency is not applied to the
end-to-end latency for a job.

It is mentioned here

that
table.exec.mini-batch.allow-latency is "the maximum latency can be used for
MiniBatch to buffer input records". I think we should have mentioned that
the config is applied to the end-to-end latency in this doc if it is indeed
the case.


> So, from my perspective, and please correct me if I'm misunderstand, the
> targets of this FLIP may include the following:
>
> 1. Support a mechanism like  `mini-batch` in SQL for `DataStream`, which
> will collect data in the operator and flush data when it receives a `flush`
> event, in the FLIP it is `FlushEvent`.
>

I think the goal is to allow users to specify an end-to-end latency budget
for the job. IMO it is quite different from the `mini-batch` in SQL.


>
> 2. Support dynamic `latency` according to the progress of job, such as
> snapshot stage and after that.
>
> To do that, I have some questions:
>
> 1. I didn't understand the purpose of public interface `RecordAttributes`.
> I think `FlushEvent` in the FLIP is enough, and different
> `DynamicFlushStrategy` can be added to generate flush events to address
> different needs, such as a static interval similar to mini-batch in SQL or
> collect the information `isProcessingBacklog` and metrics to generate
> `FlushEvent` which is described in your FLIP? If hudi sink needs the
> `isBacklog` flag, the hudi `SplitEnumerator` can create an operator event
> and send it to hudi source reader.
>

Suppose we only have FlushEvent, then operators (e.g. Hudi Sink) will not
know they can buffer data in the following scenario:

- execution.allowed-latency is not configured and use the default value
null.
- The job is reading from HybridSource and HybridSource says isBacklog=true.

Also note that Hudi Sink might not be the only operators that can benefit
from knowing isBacklog=true. Other sinks and aggregation operators (e.g.
CoGroup) can also increase throughput by buffering/sorting records when
there is backlog. So it seems simpler to pass RecordAttributes to these
operators than asking every operator developer to create operator event etc.


>
> 2. How is this new mechanism unified with SQL's mini-batch mechanism? As
> far as I am concerned, SQL implements mini-batch mechanism based on
> watermark, I think it is very unreasonable to have two different
> implementation in SQL and DataStream.
>

I think we can deprecate table.exec.mini-batch.allow-latency later
once execution.allowed-latency is ready for production usage. This is
mentioned in the "Compatibility, Deprecation, and Migration Plan" section.

If there is a config that supports user specifying the e2e latency, it is
probably reasonable for this config to work for both DataStream and SQL.


> 3. I notice that the `CheckpointCoordinator` will generate `FlushEvent`,
> which information about `FlushEvent` will be stored in
>

CheckpointCoordinator might need to send FlushEvent before triggering
checkpoint in order to deal with the two-phase commit sinks. The algorithm
is specified in the "Proposed Changes" section.


> `Checkpoint`? What is the alignment strategy for FlushEvent in the
> operator? The operator will flush the data when it receives all
> `FlushEvent` from upstream with the same ID or do flush for each
> `FlushEvent`? Can you give more detailed proposal about that? We also have
> a demand for this piece, thanks
>

After an operator has received a FlushEvent:
- If the ID of the received FlushEvent is larger than the largest ID this
operator has received, then flush() is triggered for this operator and the
operator should broadcast FlushEvent to downstream operators.
- Otherwise, this FlushEvent is ignored.

This behavior is specified in the Java doc of the FlushEvent.

Can you see if this answers your questions?

Best,
Dong


>
>
> Best,
> Shammon FY
>
>
>
> On Thu, Jun 29, 2023 at 4:35 PM Martijn Visser 
> wrote:
>
>> Hi Dong and Yunfeng,
>>
>> Thanks for the FLIP. What's not clear for me is what's the expected
>> behaviour when the allowed latency 

Re: [DISCUSS] FLIP-325: Support configuring end-to-end allowed latency

2023-06-29 Thread Dong Lin
Hi Martijn,

Thanks for your feedback! Please see my replhy inline.

On Thu, Jun 29, 2023 at 4:35 PM Martijn Visser 
wrote:

> Hi Dong and Yunfeng,
>
> Thanks for the FLIP. What's not clear for me is what's the expected
> behaviour when the allowed latency can't be met, for whatever reason.
> Given that we're talking about an "allowed latency", it implies that
> something has gone wrong and should fail? Isn't this more a minimum
> latency that you're proposing?
>

execution.allowed-latency is not really a minimum latency because we do not
require operators/job to buffer records based on this configuration.

The actual latency can be much lower due to the following reasons:
- The job is composed of operators (e.g. simple map operation) which can
not increase throughput by delaying the processing.
- The operators in the job have not been upgraded to take advantage of the
allowed latency.
- The operator needs to flush data at a higher frequency due to limited
buffer space.

execution.allowed-latency effectively gives operators a signal that they
can choose the increase throughput at the cost of higher e2e latency. If
Flink can not meet the allowed-latency, such as when the Flink is
overloaded by traffic spike, nothing particular happens (no error is
logged). This is similar to when we fail to trigger checkpoint at the
user-specified checkpointing interval due to high checkpoint time.

The above behavior (e.g. this config is best effort) is specified in the
config's description. Can you see if the configuration's description can
address your concern?

I understand the config name might look confusing. We have considered a few
other options but could not find a better one. For example,
execution.target-latency is rejected because we want latency to be lower if
the operator can not benefit from buffering reords. And
execution.flush.interval is rejected because we might flush at higher
frequency to deal with two-phase-commit sink (see Proposed Changes
section). I am happy to change the config name based on your suggestions.


>
> There's also the part about the Hudi Sink processing records
> immediately upon arrival. Given that the SinkV2 API provides the
> ability for custom post and pre-commit topologies [1], specifically
> targeted to avoid generating multiple small files, why isn't that
> sufficient for the Hudi Sink? It would be great to see that added
> under Rejected Alternatives if this is indeed not sufficient.
>

After reading through FLIP-191, I think these two FLIPs complement each
other in addressing the small-file-compaction.

FLIP-191 provides the SinkV2 API so that sink developers can plugin custom
logic to merge small files before making the files visible. However,
without FLIP-325, sink operators have to trigger compaction and make the
compacted files visible to downstream jobs everytime a checkpoint is
triggered. When the frequency of making files visible to Hudi increases,
the number of files in the Hudi will also likely increase (for the same
amount of input data).

FLIP-325 can help reduce the number of files (and also increase sink
throughput) by allowing Hudi Sink to optionally reduce the frequency of
making data visible to Hudi when there is backlog.

Does this answer your question?

Best,
Dong


> Best regards,
>
> Martijn
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction
>
> On Sun, Jun 25, 2023 at 4:25 AM Yunfeng Zhou
>  wrote:
> >
> > Hi all,
> >
> > Dong(cc'ed) and I are opening this thread to discuss our proposal to
> > support configuring end-to-end allowed latency for Flink jobs, which
> > has been documented in FLIP-325
> > <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-325%3A+Support+configuring+end-to-end+allowed+latency
> >.
> >
> > By configuring the latency requirement for a Flink job, users would be
> > able to optimize the throughput and overhead of the job while still
> > acceptably increasing latency. This approach is particularly useful
> > when dealing with records that do not require immediate processing and
> > emission upon arrival.
> >
> > Please refer to the FLIP document for more details about the proposed
> > design and implementation. We welcome any feedback and opinions on
> > this proposal.
> >
> > Best regards.
> >
> > Dong and Yunfeng
>


[VOTE] FLIP-309: Support using larger checkpointing interval when source is processing backlog

2023-06-28 Thread Dong Lin
Hi all,

We would like to start the vote for FLIP-309: Support using larger
checkpointing interval when source is processing backlog [1]. This FLIP was
discussed in this thread [2].

Flink 1.18 release will feature freeze on July 11. We hope to make this
feature available in Flink 1.18.

The vote will be open until at least July 4th (at least 72 hours), following
the consensus voting process.

Cheers,
Yunfeng and Dong

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
[2] https://lists.apache.org/thread/l1l7f30h7zldjp6ow97y70dcthx7tl37


Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-28 Thread Dong Lin
Thanks everyone (and specifically Piotr) for your valuable suggestions and
review!

We will open the voting thread for this FLIP.  We hope to make this feature
available in Flink 1.18 release, which will feature freeze on July 11.

Piotr: we will create a followup FLIP (probably in FLIP-328
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+event-time+lag>)
to allow users to determine isBacklog dynamically based on the event-time
lag and/or source backpressure metrics.



On Thu, Jun 29, 2023 at 10:49 AM Dong Lin  wrote:

> Hi Feng,
>
> Thanks for the feedback. Yes, you can configure the
> execution.checkpointing.interval-during-backlog to effectively disable
> checkpoint during backlog.
>
> Prior to your comment, the FLIP allows users to do this by setting the
> config value to something large (e.g. 365 day). After thinking about this
> more, we think it is more usable to allow users to achieve this goal by
> setting the config value to 0. This is consistent with the existing
> behavior of execution.checkpointing.interval -- the checkpoint is
> disabled if user set execution.checkpointing.interval to 0.
>
> We have updated the description of 
> execution.checkpointing.interval-during-backlog
> to say the following:
> ... it is not null, the value must either be 0, which means the checkpoint
> is disabled during backlog, or be larger than or equal to
> execution.checkpointing.interval.
>
> Does this address your need?
>
> Best,
> Dong
>
>
>
> On Thu, Jun 29, 2023 at 9:23 AM feng xiangyu  wrote:
>
>> Hi Dong and Yunfeng,
>>
>> Thanks for the proposal, your flip sounds very useful from my perspective.
>> In our business, when we using hybrid source in production we also met the
>> problem described in your flip.
>> In our solution, we tend to skip making any checkpoints before all batch
>> tasks have finished and resume the periodic checkpoint only in streaming
>> phrase. Within this flip, we can solve our problem in a more generic way.
>>
>> However, I am wondering if we still want to skip making any checkpoints
>> during historical phrase, can we set this configuration
>> "execution.checkpointing.interval-during-backlog" equals "-1" to cover
>> this
>> case?
>>
>> Best,
>> Xiangyu
>>
>> Hang Ruan  于2023年6月28日周三 16:30写道:
>>
>> > Thanks for Dong and Yunfeng's work.
>> >
>> > The FLIP looks good to me. This new version is clearer to understand.
>> >
>> > Best,
>> > Hang
>> >
>> > Dong Lin  于2023年6月27日周二 16:53写道:
>> >
>> > > Thanks Jack, Jingsong, and Zhu for the review!
>> > >
>> > > Thanks Zhu for the suggestion. I have updated the configuration name
>> as
>> > > suggested.
>> > >
>> > > On Tue, Jun 27, 2023 at 4:45 PM Zhu Zhu  wrote:
>> > >
>> > > > Thanks Dong and Yunfeng for creating this FLIP and driving this
>> > > discussion.
>> > > >
>> > > > The new design looks generally good to me. Increasing the checkpoint
>> > > > interval when the job is processing backlogs is easier for users to
>> > > > understand and can help in more scenarios.
>> > > >
>> > > > I have one comment about the new configuration.
>> > > > Naming the new configuration
>> > > > "execution.checkpointing.interval-during-backlog" would be better
>> > > > according to Flink config naming convention.
>> > > > It is also because that nested config keys should be avoided. See
>> > > > FLINK-29372 for more details.
>> > > >
>> > > > Thanks,
>> > > > Zhu
>> > > >
>> > > > Jingsong Li  于2023年6月27日周二 15:45写道:
>> > > > >
>> > > > > Looks good to me!
>> > > > >
>> > > > > Thanks Dong, Yunfeng and all for your discussion and design.
>> > > > >
>> > > > > Best,
>> > > > > Jingsong
>> > > > >
>> > > > > On Tue, Jun 27, 2023 at 3:35 PM Jark Wu  wrote:
>> > > > > >
>> > > > > > Thank you Dong for driving this FLIP.
>> > > > > >
>> > > > > > The new design looks good to me!
>> > > > > >
>> > > > > > Best,
>> > > > > > Jark
>> > > > > >
>> > > > > > > 202

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-28 Thread Dong Lin
Hi Feng,

Thanks for the feedback. Yes, you can configure the
execution.checkpointing.interval-during-backlog to effectively disable
checkpoint during backlog.

Prior to your comment, the FLIP allows users to do this by setting the
config value to something large (e.g. 365 day). After thinking about this
more, we think it is more usable to allow users to achieve this goal by
setting the config value to 0. This is consistent with the existing
behavior of execution.checkpointing.interval -- the checkpoint is disabled
if user set execution.checkpointing.interval to 0.

We have updated the description of
execution.checkpointing.interval-during-backlog
to say the following:
... it is not null, the value must either be 0, which means the checkpoint
is disabled during backlog, or be larger than or equal to
execution.checkpointing.interval.

Does this address your need?

Best,
Dong



On Thu, Jun 29, 2023 at 9:23 AM feng xiangyu  wrote:

> Hi Dong and Yunfeng,
>
> Thanks for the proposal, your flip sounds very useful from my perspective.
> In our business, when we using hybrid source in production we also met the
> problem described in your flip.
> In our solution, we tend to skip making any checkpoints before all batch
> tasks have finished and resume the periodic checkpoint only in streaming
> phrase. Within this flip, we can solve our problem in a more generic way.
>
> However, I am wondering if we still want to skip making any checkpoints
> during historical phrase, can we set this configuration
> "execution.checkpointing.interval-during-backlog" equals "-1" to cover this
> case?
>
> Best,
> Xiangyu
>
> Hang Ruan  于2023年6月28日周三 16:30写道:
>
> > Thanks for Dong and Yunfeng's work.
> >
> > The FLIP looks good to me. This new version is clearer to understand.
> >
> > Best,
> > Hang
> >
> > Dong Lin  于2023年6月27日周二 16:53写道:
> >
> > > Thanks Jack, Jingsong, and Zhu for the review!
> > >
> > > Thanks Zhu for the suggestion. I have updated the configuration name as
> > > suggested.
> > >
> > > On Tue, Jun 27, 2023 at 4:45 PM Zhu Zhu  wrote:
> > >
> > > > Thanks Dong and Yunfeng for creating this FLIP and driving this
> > > discussion.
> > > >
> > > > The new design looks generally good to me. Increasing the checkpoint
> > > > interval when the job is processing backlogs is easier for users to
> > > > understand and can help in more scenarios.
> > > >
> > > > I have one comment about the new configuration.
> > > > Naming the new configuration
> > > > "execution.checkpointing.interval-during-backlog" would be better
> > > > according to Flink config naming convention.
> > > > It is also because that nested config keys should be avoided. See
> > > > FLINK-29372 for more details.
> > > >
> > > > Thanks,
> > > > Zhu
> > > >
> > > > Jingsong Li  于2023年6月27日周二 15:45写道:
> > > > >
> > > > > Looks good to me!
> > > > >
> > > > > Thanks Dong, Yunfeng and all for your discussion and design.
> > > > >
> > > > > Best,
> > > > > Jingsong
> > > > >
> > > > > On Tue, Jun 27, 2023 at 3:35 PM Jark Wu  wrote:
> > > > > >
> > > > > > Thank you Dong for driving this FLIP.
> > > > > >
> > > > > > The new design looks good to me!
> > > > > >
> > > > > > Best,
> > > > > > Jark
> > > > > >
> > > > > > > 2023年6月27日 14:38,Dong Lin  写道:
> > > > > > >
> > > > > > > Thank you Leonard for the review!
> > > > > > >
> > > > > > > Hi Piotr, do you have any comments on the latest proposal?
> > > > > > >
> > > > > > > I am wondering if it is OK to start the voting thread this
> week.
> > > > > > >
> > > > > > > On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu 
> > > > wrote:
> > > > > > >
> > > > > > >> Thanks Dong for driving this FLIP forward!
> > > > > > >>
> > > > > > >> Introducing  `backlog status` concept for flink job makes
> sense
> > to
> > > > me as
> > > > > > >> following reasons:
> > > > > > >>
> > > > > > >> From concept/API design perspective, it’s more 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-27 Thread Dong Lin
Thanks Jack, Jingsong, and Zhu for the review!

Thanks Zhu for the suggestion. I have updated the configuration name as
suggested.

On Tue, Jun 27, 2023 at 4:45 PM Zhu Zhu  wrote:

> Thanks Dong and Yunfeng for creating this FLIP and driving this discussion.
>
> The new design looks generally good to me. Increasing the checkpoint
> interval when the job is processing backlogs is easier for users to
> understand and can help in more scenarios.
>
> I have one comment about the new configuration.
> Naming the new configuration
> "execution.checkpointing.interval-during-backlog" would be better
> according to Flink config naming convention.
> It is also because that nested config keys should be avoided. See
> FLINK-29372 for more details.
>
> Thanks,
> Zhu
>
> Jingsong Li  于2023年6月27日周二 15:45写道:
> >
> > Looks good to me!
> >
> > Thanks Dong, Yunfeng and all for your discussion and design.
> >
> > Best,
> > Jingsong
> >
> > On Tue, Jun 27, 2023 at 3:35 PM Jark Wu  wrote:
> > >
> > > Thank you Dong for driving this FLIP.
> > >
> > > The new design looks good to me!
> > >
> > > Best,
> > > Jark
> > >
> > > > 2023年6月27日 14:38,Dong Lin  写道:
> > > >
> > > > Thank you Leonard for the review!
> > > >
> > > > Hi Piotr, do you have any comments on the latest proposal?
> > > >
> > > > I am wondering if it is OK to start the voting thread this week.
> > > >
> > > > On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu 
> wrote:
> > > >
> > > >> Thanks Dong for driving this FLIP forward!
> > > >>
> > > >> Introducing  `backlog status` concept for flink job makes sense to
> me as
> > > >> following reasons:
> > > >>
> > > >> From concept/API design perspective, it’s more general and natural
> than
> > > >> above proposals as it can be used in HybridSource for bounded
> records, CDC
> > > >> Source for history snapshot and general sources like KafkaSource for
> > > >> historical messages.
> > > >>
> > > >> From user cases/requirements, I’ve seen many users manually to set
> larger
> > > >> checkpoint interval during backfilling and then set a shorter
> checkpoint
> > > >> interval for real-time processing in their production environments
> as a
> > > >> flink application optimization. Now, the flink framework can make
> this
> > > >> optimization no longer require the user to set the checkpoint
> interval and
> > > >> restart the job multiple times.
> > > >>
> > > >> Following supporting using larger checkpoint for job under backlog
> status
> > > >> in current FLIP, we can explore supporting larger
> parallelism/memory/cpu
> > > >> for job under backlog status in the future.
> > > >>
> > > >> In short, the updated FLIP looks good to me.
> > > >>
> > > >>
> > > >> Best,
> > > >> Leonard
> > > >>
> > > >>
> > > >>> On Jun 22, 2023, at 12:07 PM, Dong Lin 
> wrote:
> > > >>>
> > > >>> Hi Piotr,
> > > >>>
> > > >>> Thanks again for proposing the isProcessingBacklog concept.
> > > >>>
> > > >>> After discussing with Becket Qin and thinking about this more, I
> agree it
> > > >>> is a better idea to add a top-level concept to all source
> operators to
> > > >>> address the target use-case.
> > > >>>
> > > >>> The main reason that changed my mind is that isProcessingBacklog
> can be
> > > >>> described as an inherent/nature attribute of every source instance
> and
> > > >> its
> > > >>> semantics does not need to depend on any specific checkpointing
> policy.
> > > >>> Also, we can hardcode the isProcessingBacklog behavior for the
> sources we
> > > >>> have considered so far (e.g. HybridSource and MySQL CDC source)
> without
> > > >>> asking users to explicitly configure the per-source behavior, which
> > > >> indeed
> > > >>> provides better user experience.
> > > >>>
> > > >>> I have updated the FLIP based on the latest suggestions. The
> latest FLIP
> > > >> no
> > > &g

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-27 Thread Dong Lin
Thank you Leonard for the review!

Hi Piotr, do you have any comments on the latest proposal?

I am wondering if it is OK to start the voting thread this week.

On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu  wrote:

> Thanks Dong for driving this FLIP forward!
>
> Introducing  `backlog status` concept for flink job makes sense to me as
> following reasons:
>
> From concept/API design perspective, it’s more general and natural than
> above proposals as it can be used in HybridSource for bounded records, CDC
> Source for history snapshot and general sources like KafkaSource for
> historical messages.
>
> From user cases/requirements, I’ve seen many users manually to set larger
> checkpoint interval during backfilling and then set a shorter checkpoint
> interval for real-time processing in their production environments as a
> flink application optimization. Now, the flink framework can make this
> optimization no longer require the user to set the checkpoint interval and
> restart the job multiple times.
>
> Following supporting using larger checkpoint for job under backlog status
> in current FLIP, we can explore supporting larger parallelism/memory/cpu
> for job under backlog status in the future.
>
> In short, the updated FLIP looks good to me.
>
>
> Best,
> Leonard
>
>
> > On Jun 22, 2023, at 12:07 PM, Dong Lin  wrote:
> >
> > Hi Piotr,
> >
> > Thanks again for proposing the isProcessingBacklog concept.
> >
> > After discussing with Becket Qin and thinking about this more, I agree it
> > is a better idea to add a top-level concept to all source operators to
> > address the target use-case.
> >
> > The main reason that changed my mind is that isProcessingBacklog can be
> > described as an inherent/nature attribute of every source instance and
> its
> > semantics does not need to depend on any specific checkpointing policy.
> > Also, we can hardcode the isProcessingBacklog behavior for the sources we
> > have considered so far (e.g. HybridSource and MySQL CDC source) without
> > asking users to explicitly configure the per-source behavior, which
> indeed
> > provides better user experience.
> >
> > I have updated the FLIP based on the latest suggestions. The latest FLIP
> no
> > longer introduces per-source config that can be used by end-users. While
> I
> > agree with you that CheckpointTrigger can be a useful feature to address
> > additional use-cases, I am not sure it is necessary for the use-case
> > targeted by FLIP-309. Maybe we can introduce CheckpointTrigger separately
> > in another FLIP?
> >
> > Can you help take another look at the updated FLIP?
> >
> > Best,
> > Dong
> >
> >
> >
> > On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski 
> > wrote:
> >
> >> Hi Dong,
> >>
> >>> Suppose there are 1000 subtask and each subtask has 1% chance of being
> >>> "backpressured" at a given time (due to random traffic spikes). Then at
> >> any
> >>> given time, the chance of the job
> >>> being considered not-backpressured = (1-0.01)^1000. Since we evaluate
> the
> >>> backpressure metric once a second, the estimated time for the job
> >>> to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) =
> 23163
> >>> sec = 6.4 hours.
> >>>
> >>> This means that the job will effectively always use the longer
> >>> checkpointing interval. It looks like a real concern, right?
> >>
> >> Sorry I don't understand where you are getting those numbers from.
> >> Instead of trying to find loophole after loophole, could you try to
> think
> >> how a given loophole could be improved/solved?
> >>
> >>> Hmm... I honestly think it will be useful to know the APIs due to the
> >>> following reasons.
> >>
> >> Please propose something. I don't think it's needed.
> >>
> >>> - For the use-case mentioned in FLIP-309 motivation section, would the
> >> APIs
> >>> of this alternative approach be more or less usable?
> >>
> >> Everything that you originally wanted to achieve in FLIP-309, you could
> do
> >> as well in my proposal.
> >> Vide my many mentions of the "hacky solution".
> >>
> >>> - Can these APIs reliably address the extra use-case (e.g. allow
> >>> checkpointing interval to change dynamically even during the unbounded
> >>> phase) as it claims?
> >>
> >> I don't see why not.
> >>
> >>> - Can thes

[VOTE] Apache Flink ML Release 2.3.0, release candidate #1

2023-06-26 Thread Dong Lin
Hi everyone,

We would like to start voting for the Flink ML 2.3.0 release. This
release primarily
provides the ability to run Flink ML on Flink 1.15, 1.16 and 1.17.

Please review and vote on the release candidate #1 for version 2.3.0 of
Apache Flink ML as follows.

[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)

**Testing Guideline**

You can find here [1] a page in the project wiki on instructions for
testing.

To cast a vote, it is not necessary to perform all listed checks, but
please mention which checks you have performed when voting.

**Release Overview**

As an overview, the release consists of the following:
a) Flink ML source release to be deployed to dist.apache.org
b) Flink ML Python source distributions to be deployed to PyPI
c) Maven artifacts to be deployed to the Maven Central Repository

**Staging Areas to Review**

The staging areas containing the above-mentioned artifacts are as follows, for
your review:

- All artifacts for a) and b) can be found in the corresponding dev repository
at dist.apache.org [2], which are signed with the key with fingerprint AFAC
DB09 E6F0 FF28 C93D  64BC BEED 4F6C B9F7 7D0E [3]
- All artifacts for c) can be found at the Apache Nexus Repository [4]

**Other links for your review**

- JIRA release notes [5]
- Source code tag "release-2.2.0-rc2" [6]
- PR to update the website Downloads page to include Flink ML links [7]

**Vote Duration**

The voting time will run for at least 72 hours. It is adopted by majority
approval, with at least 3 PMC affirmative votes.


Cheers,
Dong


[1] https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+ML+
Release
[2] https://dist.apache.org/repos/dist/dev/flink/flink-ml-2.3.0-rc1/
[3] https://dist.apache.org/repos/dist/release/flink/KEYS
[4] https://repository.apache.org/content/repositories/orgapacheflink-1645/
[5]
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353096
[6] https://github.com/apache/flink-ml/releases/tag/release-2.3.0-rc1
[7] https://github.com/apache/flink-web/pull/659


Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-21 Thread Dong Lin
interval,
> for those that do know how to do that, user would have to configure every
> source
> individually and yet again we would end up with a system, that works only
> partially in
> some special use cases (HybridSource), that's confusing the users even
> more.
>
> That's why I think the more generic solution, working primarily on the same
> metrics that are used by various auto scaling solutions (like Flink K8s
> operator's
> autosaler) would be better. The hacky solution I proposed to:
> 1. show you that the generic solution is simply a superset of your proposal
> 2. if you are adamant that busyness/backpressured/records processing
> rate/pending records
> metrics wouldn't cover your use case sufficiently (imo they can), then
> you can very easily
> enhance this algorithm with using some hints from the sources. Like
> "processingBacklog==true"
> to short circuit the main algorithm, if `processingBacklog` is
> available.
>
> Best,
> Piotrek
>
>
> pt., 16 cze 2023 o 04:45 Dong Lin  napisał(a):
>
> > Hi again Piotr,
> >
> > Thank you for the reply. Please see my reply inline.
> >
> > On Fri, Jun 16, 2023 at 12:11 AM Piotr Nowojski <
> piotr.nowoj...@gmail.com>
> > wrote:
> >
> > > Hi again Dong,
> > >
> > > > I understand that JM will get the backpressure-related metrics every
> > time
> > > > the RestServerEndpoint receives the REST request to get these
> metrics.
> > > But
> > > > I am not sure if RestServerEndpoint is already always receiving the
> > REST
> > > > metrics at regular interval (suppose there is no human manually
> > > > opening/clicking the Flink Web UI). And if it does, what is the
> > interval?
> > >
> > > Good catch, I've thought that metrics are pre-emptively sent to JM
> every
> > 10
> > > seconds.
> > > Indeed that's not the case at the moment, and that would have to be
> > > improved.
> > >
> > > > I would be surprised if Flink is already paying this much overhead
> just
> > > for
> > > > metrics monitoring. That is the main reason I still doubt it is true.
> > Can
> > > > you show where this 100 ms is currently configured?
> > > >
> > > > Alternatively, maybe you mean that we should add extra code to invoke
> > the
> > > > REST API at 100 ms interval. Then that means we need to considerably
> > > > increase the network/cpu overhead at JM, where the overhead will
> > increase
> > > > as the number of TM/slots increase, which may pose risk to the
> > > scalability
> > > > of the proposed design. I am not sure we should do this. What do you
> > > think?
> > >
> > > Sorry. I didn't mean metric should be reported every 100ms. I meant
> that
> > > "backPressuredTimeMsPerSecond (metric) would report (a value of)
> > 100ms/s."
> > > once per metric interval (10s?).
> > >
> >
> > Suppose there are 1000 subtask and each subtask has 1% chance of being
> > "backpressured" at a given time (due to random traffic spikes). Then at
> any
> > given time, the chance of the job
> > being considered not-backpressured = (1-0.01)^1000. Since we evaluate the
> > backpressure metric once a second, the estimated time for the job
> > to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) = 23163
> > sec = 6.4 hours.
> >
> > This means that the job will effectively always use the longer
> > checkpointing interval. It looks like a real concern, right?
> >
> >
> > > > - What is the interface of this CheckpointTrigger? For example, are
> we
> > > > going to give CheckpointTrigger a context that it can use to fetch
> > > > arbitrary metric values? This can help us understand what information
> > > this
> > > > user-defined CheckpointTrigger can use to make the checkpoint
> decision.
> > >
> > > I honestly don't think this is important at this stage of the
> discussion.
> > > It could have
> > > whatever interface we would deem to be best. Required things:
> > >
> > > - access to at least a subset of metrics that the given
> > `CheckpointTrigger`
> > > requests,
> > >   for example via some registration mechanism, so we don't have to
> fetch
> > > all of the
> > >   metrics all the time from TMs.
> > > - some way to influence `CheckpointCoordinator`. Either via manually
> > > triggering
> > >   chec

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-16 Thread Dong Lin
 do this yet without
> > having users explicitly provide this information on a per-source basis.
> >
> > Suppose the job read from a bounded Kafka source, should it emit
> > "processingBacklog=true"? If yes, then the job might use long
> checkpointing
> > interval even
> > if the job is asked to process data starting from now to the next 1 hour.
> > If no, then the job might use the short checkpointing interval
> > even if the job is asked to re-process data starting from 7 days ago.
>
> Yes. The same can be said of your proposal. Your proposal has the very same
> issues
> that every source would have to implement it differently, most sources
> would
>

Hmm.. I don't think my proposal has the same issue. We probably have a
different understanding on how these two solutions work.

With my approach in FLIP-309, only selected sources with bounded/unbounded
phases (e.g. HybridSource, MySQL CDC Source) need to implement the API for
users to specify checkpointing upper-bound at selected phases. All other
sources (e.g. Kafka, FIleSystem) won't be affected.

Since your proposal adds this job-level config based on processingBacklog
of source operators, that means all source operators need to explicitly
document what is the value of its processingBacklog. If "processingBacklog"
value is not defined for any source, that effectively means the job-level
config's behavior is undefined. This is a much more involved change than
FLIP-309 since it effectively introduces a new first-class concept for all
source operators.

Note that the overhead of this concept is fundamentally higher than the
overhead of introducing the "isBackPressured" metric. The reason is that
currently these metrics are used to help humans find issues and trigger
alerts. It is OK for the definition of these metrics to be a bit vague and
for values to be a bit accurate (e.g. occasionally deviate from the
accurate values by 20%).

On the other hand, checkpointing interval directly impacts the data
freshness of Flink job (when there is exactly-once sink) and it might not
be OK for checkpointing interval to occasionally deviate from the expected
choice by 20%. That is the reason I think we need to clearly define the
semantics of processingBacklog and its value for every source so that users
know the expected behavior of Flink with the given config values.

What do you think?


> have no idea how to properly calculate the new requested checkpoint
> interval,

for those that do know how to do that, user would have to configure every
> source
>

If user uses HybridSource without specifying the extra config, or use the
Kafka source (which do not provide the extra config), the Flink job will
use the existing execution.checkpointing.interval. Users would not need to
configure every source individually.

In the cases where users use MySQL CDC Source or HybridSource (which
typically is a small percentage of all Flink jobs), most likely user only
need to config the extra checkpointing interval upper-bound on one source.

I am not sure why the solution in FLIP-309 works only "partially" in some
special use cases. Can you help explain when this solution would not work?


> individually and yet again we would end up with a system, that works only
> partially in
> some special use cases (HybridSource), that's confusing the users even
> more.
>
> That's why I think the more generic solution, working primarily on the same
> metrics that are used by various auto scaling solutions (like Flink K8s
> operator's
> autosaler) would be better. The hacky solution I proposed to:
> 1. show you that the generic solution is simply a superset of your proposal
> 2. if you are adamant that busyness/backpressured/records processing
> rate/pending records
> metrics wouldn't cover your use case sufficiently (imo they can), then
>

To be clear, I think the solution based on the CheckpointTrigger +
algorithm can be useful in some cases (e.g. when the source has
event-time). And I would be happy to draft a FLIP separately to take
advantage of this idea.

For the use case targeted by FLIP-309, I just think the solution based on
the algorithm might not work as expected in some 1-5% case (which is hard
to upper-bound) due to the loopholes mentioned earlier.

Best,
Dong

you can very easily
> enhance this algorithm with using some hints from the sources. Like
> "processingBacklog==true"
> to short circuit the main algorithm, if `processingBacklog` is
> available.
>
> Best,
> Piotrek
>
>
> pt., 16 cze 2023 o 04:45 Dong Lin  napisał(a):
>
> > Hi again Piotr,
> >
> > Thank you for the reply. Please see my reply inline.
> >
> > On Fri, Jun 16, 2023 at 12:11 AM Piotr Nowojski <
> piotr.nowoj...@gmail.com>
> > wrote:
> >
> &

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-15 Thread Dong Lin
equire all source operators to specify how they enforce this concept (e.g.
FileSystemSource always emits processingBacklog=true). And there might be
cases where the source itself (e.g. a bounded Kafka Source) can not
automatically derive the value of this concept, in which case we need to
provide option for users to explicitly specify the value for this concept
on a per-source basis.



> > And it probably also has the issues of "having two places to configure
> checkpointing
> > interval" and "giving flexibility for every source to implement a
> different
> > API" (as mentioned below).
>
> No, it doesn't.
>
> > IMO, it is a hard-requirement for the user-facing API to be
> > clearly defined and users should be able to use the API without concern
> of
> > regression. And this requirement is more important than the other goals
> > discussed above because it is related to the stability/performance of the
> > production job. What do you think?
>
> I don't agree with this. There are many things that work something in
> between perfectly and well enough
> in some fraction of use cases (maybe in 99%, maybe 95% or maybe 60%), while
> still being very useful.
> Good examples are: selection of state backend, unaligned checkpoints,
> buffer debloating but frankly if I go
> through list of currently available config options, something like half of
> them can cause regressions. Heck,
> even Flink itself doesn't work perfectly in 100% of the use cases, due to a
> variety of design choices. Of
> course, the more use cases are fine with said feature, the better, but we
> shouldn't fixate to perfectly cover
> 100% of the cases, as that's impossible.
>
> In this particular case, if back pressure monitoring  trigger can work well
> enough in 95% of cases, I would
> say that's already better than the originally proposed alternative, which
> doesn't work at all if user has a large
> backlog to reprocess from Kafka, including when using HybridSource AFTER
> the switch to Kafka has
> happened. For the remaining 5%, we should try to improve the behaviour over
> time, but ultimately, users can
> decide to just run a fixed checkpoint interval (or at worst use the hacky
> checkpoint trigger that I mentioned
> before a couple of times).
>
> Also to be pedantic, if a user naively selects slow-interval in your
> proposal to 30 minutes, when that user's
> job fails on average every 15-20minutes, his job can end up in a state that
> it can not make any progress,
> this arguably is quite serious regression.
>

I probably should not say it is "hard requirement". After all there are
pros/cons. We will need to consider implementation complexity, usability,
extensibility etc.

I just don't think we should take it for granted to introduce regression
for one use-case in order to support another use-case. If we can not find
an algorithm/solution that addresses
both use-case well, I hope we can be open to tackle them separately so that
users can choose the option that best fits their needs.

All things else being equal, I think it is preferred for user-facing API to
be clearly defined and let users should be able to use the API without
concern of regression.

Maybe we can list pros/cons for the alternative approaches we have been
discussing and see choose the best approach. And maybe we will end up
finding that use-case
which needs CheckpointTrigger can be tackled separately from the use-case
in FLIP-309.


> > I am not sure if there is a typo. Because if backPressuredTimeMsPerSecond
> =
> > 0, then maxRecordsConsumedWithoutBackpressure = numRecordsInPerSecond /
> > 1000 * metricsUpdateInterval according to the above algorithm.
> >
> > Do you mean "maxRecordsConsumedWithoutBackpressure =
> (numRecordsInPerSecond
> > / (1 - backPressuredTimeMsPerSecond / 1000)) * metricsUpdateInterval"?
>
> It looks like there is indeed some mistake in my proposal above. Yours look
> more correct, it probably
> still needs some safeguard/special handling if
> `backPressuredTimeMsPerSecond > 950`
>
> > The only information it can access is the backlog.
>
> Again no. It can access whatever we want to provide to it.
>
> Regarding the rest of your concerns. It's a matter of tweaking the
> parameters and the algorithm itself,
> and how much safety-net do we want to have. Ultimately, I'm pretty sure
> that's a (for 95-99% of cases)
> solvable problem. If not, there is always the hacky solution, that could be
> even integrated into this above
> mentioned algorithm as a short circuit to always reach `slow-interval`.
>
> Apart of that, you picked 3 minutes as the checkpointing interval in your
> counter example. In most cases
> any interval above 1 minute would inflict pretty

[jira] [Created] (FLINK-32360) Optimize DataStream#coGroup in stream mode when results are emitted at end of input

2023-06-15 Thread Dong Lin (Jira)
Dong Lin created FLINK-32360:


 Summary: Optimize DataStream#coGroup in stream mode when results 
are emitted at end of input
 Key: FLINK-32360
 URL: https://issues.apache.org/jira/browse/FLINK-32360
 Project: Flink
  Issue Type: Improvement
Reporter: Dong Lin
Assignee: Dong Lin






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-295: Support lazy initialization of catalogs and persistence of catalog configurations

2023-06-14 Thread Dong Lin
Thanks Feng for the FLIP.

+1(binding)

Cheers,
Dong

On Wed, Jun 14, 2023 at 10:35 AM Feng Jin  wrote:

> Hi everyone
>
> Thanks for all the feedback about the FLIP-295: Support lazy initialization
> of catalogs and persistence of catalog configurations[1].
> [2] is the discussion thread.
>
>
> I'd like to start a vote for it. The vote will be open for at least 72
> hours(excluding weekends,until June 19, 10:00AM GMT) unless there is an
> objection or an insufficient number of votes.
>
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations
> [2]https://lists.apache.org/thread/dcwgv0gmngqt40fl3694km53pykocn5s
>
>
> Best,
> Feng
>


Re: [VOTE] FLIP-312: Add Yarn ACLs to Flink Containers

2023-06-06 Thread Dong Lin
+1 (binding)

Thank you Archit for driving the FLIP.

Cheers,
Dong

On Tue, Jun 6, 2023 at 12:55 AM Archit Goyal 
wrote:

> Hi everyone,
>
> Thanks for all the feedback for FLIP-312: Add Yarn ACLs to Flink
> Containers<
> https://cwiki.apache.org/confluence/display/FLINK/FLIP+312%3A+Add+Yarn+ACLs+to+Flink+Containers
> >.
> Following is the discussion thread : Link<
> https://lists.apache.org/thread/xj3ytkwj9lsl3hpjdb4n8pmy7lk3l8tv>
>
> I'd like to start a vote for it. The vote will be open for at least 72
> hours (until June 9th, 12:00AM GMT) unless there is an objection or an
> insufficient number of votes.
>
> Thanks,
> Archit Goyal
>


Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-02 Thread Dong Lin
Hi Piotr,

Thanks for the explanations. I have some followup questions below.

On Fri, Jun 2, 2023 at 10:55 PM Piotr Nowojski  wrote:

> Hi All,
>
> Thanks for chipping in the discussion Ahmed!
>
> Regarding using the REST API. Currently I'm leaning towards implementing
> this feature inside the Flink itself, via some pluggable interface.
> REST API solution would be tempting, but I guess not everyone is using
> Flink Kubernetes Operator.
>
> @Dong
>
> > I am not sure metrics such as isBackPressured are already sent to JM.
>
> Fetching code path on the JM:
>
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl#queryTmMetricsFuture
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore#add
>
> Example code path accessing Task level metrics via JM using the
> `MetricStore`:
>
> org.apache.flink.runtime.rest.handler.job.metrics.AggregatingSubtasksMetricsHandler
>

Thanks for the code reference. I checked the code that invoked these two
classes and found the following information:

- AggregatingSubtasksMetricsHandler#getStoresis currently invoked only
when AggregatingJobsMetricsHandler is invoked.
- AggregatingJobsMetricsHandler is only instantiated and returned by
WebMonitorEndpoint#initializeHandlers
- WebMonitorEndpoint#initializeHandlers is only used by RestServerEndpoint.
And RestServerEndpoint invokes these handlers in response to external REST
request.

I understand that JM will get the backpressure-related metrics every time
the RestServerEndpoint receives the REST request to get these metrics. But
I am not sure if RestServerEndpoint is already always receiving the REST
metrics at regular interval (suppose there is no human manually
opening/clicking the Flink Web UI). And if it does, what is the interval?



> > For example, let's say every source operator subtask reports this metric
> to
> > JM once every 10 seconds. There are 100 source subtasks. And each subtask
> > is backpressured roughly 10% of the total time due to traffic spikes (and
> > limited buffer). Then at any given time, there are 1 - 0.9^100 = 99.997%
> > chance that there is at least one subtask that is backpressured. Then we
> > have to wait for at least 10 seconds to check again.
>
> backPressuredTimeMsPerSecond and other related metrics (like
> busyTimeMsPerSecond) are not subject to that problem.
> They are recalculated once every metric fetching interval, and they report
> accurately on average the given subtask spent busy/idling/backpressured.
> In your example, backPressuredTimeMsPerSecond would report 100ms/s.


Suppose every subtask is already reporting backPressuredTimeMsPerSecond to
JM once every 100 ms. If a job has 10 operators (that are not chained) and
each operator has 100 subtasks, then JM would need to handle 1 requests
per second to receive metrics from these 1000 subtasks. It seems like a
non-trivial overhead for medium-to-large sized jobs and can make JM the
performance bottleneck during job execution.

I would be surprised if Flink is already paying this much overhead just for
metrics monitoring. That is the main reason I still doubt it is true. Can
you show where this 100 ms is currently configured?

Alternatively, maybe you mean that we should add extra code to invoke the
REST API at 100 ms interval. Then that means we need to considerably
increase the network/cpu overhead at JM, where the overhead will increase
as the number of TM/slots increase, which may pose risk to the scalability
of the proposed design. I am not sure we should do this. What do you think?


>
> > While it will be nice to support additional use-cases
> > with one proposal, it is probably also reasonable to make incremental
> > progress and support the low-hanging-fruit use-case first. The choice
> > really depends on the complexity and the importance of supporting the
> extra
> > use-cases.
>
> That would be true, if that was a private implementation detail or if the
> low-hanging-fruit-solution would be on the direct path to the final
> solution.
> That's unfortunately not the case here. This will add public facing API,
> that we will later need to maintain, no matter what the final solution will
> be,
> and at the moment at least I don't see it being related to a "perfect"
> solution.


Sure. Then let's decide the final solution first.


> > I guess the point is that the suggested approach, which dynamically
> > determines the checkpointing interval based on the backpressure, may
> cause
> > regression when the checkpointing interval is relatively low. This makes
> it
> > hard for users to enable this feature in production. It is like an
> > auto-driving system that is not guaranteed to work
>
> Yes, creating a more generic solution that would require less configuration
> is usually more difficult then static configurations.
> It doesn't mean we shouldn't try to do that. Especially that if my proposed
> algorithm wouldn't work good enough, there is
> an obvious solution, that any source could add a 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-01 Thread Dong Lin
Hi Ahmed,

Thanks for the comments.

I agree with you and Piotr that it would be useful to provide a more
generic approach to address more use-case in one proposal. On the other
hand, I also think it is important to make sure that the alternative (more
generic) approach can indeed address the extra use-cases reliably as
expected. Then we can compare the pros/cons of these approaches and make
the best choice for Flink users.

If I understand your question correctly, you are asking whether it would be
better to replace upperBoundCheckpointingIntervalForLastSource() with an
API on the source/operator interface.

The short answer is probably no. This is because the expected users of the
API *HybridSourceBuilder#upperBoundCheckpointingIntervalForLastSource*()
are end-users who use Flink API and connector API to develop Flink job. We
probably don't want end-users to directly use the source/operator
interface, which is generally more complicated and intended to be used by
developers of source operators.

FLIP-309 currently proposes to add the API
*SplitEnumeratorContext#upperBoundCheckpointingInterval* for developers of
source operators (e.g. HybridSource, MySQL CDC source) to upper-bound
checkpointing interval. Are you suggesting that we should replace this API
with a config on the source or operator constructor?

This approach probably works for HybridSource. But I am not sure it works
for MySQL CDC Source (which is also mentioned in the latest FLIP-309
motivation section), which is implemented as one source operator rather
than multiple source operators (which HybridSource does). And we need to
enable the new checkpointing interval in the middle of this source
operator's execution.

If I misunderstood your suggestion, can you provide more details regarding
the proposed API and explain its benefits?

Best,
Dong



On Fri, Jun 2, 2023 at 2:12 AM Ahmed Hamdy  wrote:

> Hi Dong,
> Thanks for the great proposal.
> The thread is very intuitive along with suggestions from Jing and Piotr.
> As much as I like the simplicity of the proposed approach I think a much
> wider benefit is achieved by taking a more generic approach similar to
> Piotr's suggestion of having a `CheckpointTrigger`. I think this even
> solidifies the argument you are discussing
> >  On the other hand, the approach currently proposed in the FLIP is much
> simpler as it does not depend on backpressure. Users specify the extra
> interval requirement on the specific sources (e.g. HybridSource, MySQL CDC
> Source) and can easily know the checkpointing interval will be used on the
> continuous phase of the corresponding source.
>
> where the base HybridSource can use a `CheckpointTrigger` that doesn't
> depend on backpressure.
>
>
>
>
> I have a couple of questions for clarification.
>
> @Dong
> Do you think in the solution in FLIP 309, instead of using
> ```
> /**
>  * Upper-bound the checkpointing interval when the last source
> added right before this
>  * method invocation is reading data.
>  */
> public  Source ?, ?>>
> HybridSourceBuilder
> upperBoundCheckpointingIntervalForLastSource(
> Duration duration) {
> ...
> }
> ```
>
> We can have an upperBoundCheckpointingInterval configured in the Source
> Interface, or even better in the Operator one.
> then we can easily implement the one for HybridSource by relying on
> delegation to the `currentReader`.
>
>
> @Piotr
>
> Regarding the more general approach of adjusting based on generic
> triggers/backpressure metrics. I saw you mentioned the resemblance with
> FLIP-271,
> Do you think it is worth going with the REST API proposal for dynamically
> configuring the interval hence the trigger logic could be implemented on
> Flink or external systems like Flink Kubernetes Operator?
> Wdyt? I think the REST API proposal here sounds more and more interesting.
>
>
> Best Regards,
> Ahmed Hamdy
>
>
> On Wed, 31 May 2023 at 07:59, Dong Lin  wrote:
>
> > Hi Piotr,
> >
> > Thanks for the reply. Please see my comments inline.
> >
> > On Wed, May 31, 2023 at 12:58 AM Piotr Nowojski 
> > wrote:
> >
> > > Hi Dong,
> > >
> > > First of all we don't need to send any extra signal from source (or non
> > > source) operators. All of the operators are already reporting
> > backpressured
> > > metrics [1]
> > > and all of the metrics are already sent to JobManager. We would only
> need
> > >
> >
> > Hmm... I am not sure metrics such as isBackPressured are already sent to
> > JM. According to the doc
> > <
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#io
> > >,
> 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-31 Thread Dong Lin
a
> lower value, or just use static checkpoint interval strategy.
>

I guess the point is that the suggested approach, which dynamically
determines the checkpointing interval based on the backpressure, may cause
regression when the checkpointing interval is relatively low. This makes it
hard for users to enable this feature in production. It is like an
auto-driving system that is not guaranteed to work

On the other hand, the approach currently proposed in the FLIP is much
simpler as it does not depend on backpressure. Users specify the extra
interval requirement on the specific sources (e.g. HybridSource, MySQL CDC
Source) and can easily know the checkpointing interval will be used on the
continuous phase of the corresponding source. This is pretty much same as
how users use the existing execution.checkpointing.interval config. So
there is no extra concern of regression caused by this approach.


>
> > With the suggested approach, the e2e latency introduced by Flink is
> roughly
> > 72 seconds. This is because it takes 1 minute for 11MBps phase to end,
> and
> > another 12 seconds for the accumulated backlog to be cleared. And Flink
> can
> > not do checkpoint before the backlog is cleared.
>
> Indeed that's a valid concern. After thinking more about this issue, maybe
> the proper solution would be to calculate "how much overloaded is the most
> overloaded subtask".
> In this case, that would be 10% (we are trying to push 110% of the
> available capacity in the current job/cluster). Then we could use that
> number as some kind of weighted average.
> We could figure out a function mapping the overload percentage, into a
> floating point number from range [0, 1]
>
> f(overload_factor) = weight // weight is from [0, 1]
>
> and then the desired checkpoint interval would be something like
>
> (1 - weight) * fastCheckpointInterval + weight * slowCheckpointInterval
>
> In your problematic example, we would like the weight to be pretty small
> (<10%?), so the calculated checkpoint interval would be pretty close to the
> fastCheckpointInterval.
>

Hmm... I am not sure it will always be pretty close to the
fastCheckpointInterval. We can discuss when there is concrete definition of
this algorithm.

While each source subtask can measure its current throughput, I am not sure
it can measure the "input throughput", which is defined as the throughput
when the subtask (and it downstream operators) as the unlimited processing
capacity. Therefore, it seems pretty hard to determine the
"overload_factor" timely and accurately.


> The overload factor we could calculate the same way as FLIP-271 is
> calculating how much should we rescale given operator [4].
>
> I can think about this more and elaborate/refine this idea tomorrow.
>

Sounds good. Looking forward to learning more ideas.

Best,
Dong


>
> Best,
> Piotrek
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#io
> [2]
>
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/metric_reporters/
> [3]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#end-to-end-latency-tracking
> [4]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling
>
> wt., 30 maj 2023 o 13:58 Dong Lin  napisał(a):
>
> > Hi Piotr,
> >
> > Thank you for providing those details.
> >
> > I understand you suggested using the existing "isBackPressured" signal to
> > determine whether we should use the less frequent checkpointing
> interval. I
> > followed your thoughts and tried to make it work. Below are the issues
> that
> > I am not able to address. Can you see if there is a way to address these
> > issues?
> >
> > Let's will use the following use-case to make the discussion more
> concrete:
> > a) Users want to checkpoint at least once every 30 minutes to upper-bound
> > the amount of duplicate work after job failover.
> > b) Users want to checkpoint at least once every 30 seconds to
> > upper-bound *extra
> > e2e lag introduced by the Flink job* during the continuous processing
> > phase.
> >
> > The suggested approach is designed to do this:
> > - If any of the source subtasks is backpressured, the job will checkpoint
> > at 30-minutes interval.
> > - If none of the source subtasks is backpressured, the job will
> checkpoint
> > at 30-seconds interval.
> >
> > And we would need to add the following public APIs to implement this
> > approach:
> > - Add a job level config, maybe
> > execution.checkpointing.interval.no-backpressure. This is the
> checkpointing
> > interval when none of the source subtasks is back

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-30 Thread Dong Lin
gt;   - event time unreliability
>   - lack of universal threshold value for `pendingRecords`
>
> In a bit more detail, we probably should check (using [1] or [2]) either:
>   a) if any of the source subtasks is backpressured
>   b) if any of the subtasks is backpressured
>
> In most cases a == b. The only time when that's not true, if some windowed
> operator in the middle of the job graph started triggering so many results
> that it became backpressured,
> but the backpressure didn't last long enough to propagate to sources. For
> example that especially might occur if sources are idle. So probably b) is
> a better and more generic option.
>
> Regarding your last concern, with spiky traffic, I think the following
> algorithm of triggering checkpoints would work pretty well:
>
> public BackpressureDetectingCheckpointTrigger {
>
> private long lastCheckpointTs = System.currentTimeMillis();
> private long slowCheckpointInterval = ...;
> private long fastCheckpointInteveral = ...;
>
> //code executed periodically, for example once a second, once every 10ms,
> or at the 1/10th of the fast checkpoint interval
> void maybeTriggerCheckpoint(...) {
>
>   long nextCheckpointTs = lastCheckpointTs;
>   if (isAnySubtaskBackpressured()) {
> nextCheckpointTs += slowCheckpointInterval;
>   }
>   else {
>   nextCheckpointTs += fastCheckpointInterval;
>   }
>
>   if (nextCheckpointTs >= System.currentTimeMillis()) {
> triggerCheckpoint();
> lastCheckpointTs = System.currentTimeMillis();
>   }
> }
> }
>
> This way, if there is a spike of backpressure, it doesn't matter that much.
> If the backpressure goes away until the next iteration, the next check will
> trigger a checkpoint according to the
> fast interval. The slow checkpoint interval will be used only if the
> backpressure persists for the whole duration of the slowCheckpointInterval.
>
> We could also go a little bit more fancy, and instead of using only fast or
> slow intervals, we could use a continuous spectrum to gradually adjust the
> interval, by replacing the first if/else
> check with a weighted average:
>
>   int maxBackPressureTime = getSubtaskMaxBackPressuredTimeMsPerSecond();
>   long nextCheckpointTs = lastCheckpointTs + slowCheckpointInterval *
> maxBackPressureTime + fastCheckpointInterval * (1000 -
> maxBackPressureTime);
>
> This would further eliminate some potential jitter and make the actual
> checkpoint interval a bit more predictable.
>
> Best,
> Piotrek
>
>
> wt., 30 maj 2023 o 04:40 Dong Lin  napisał(a):
>
> > Let me correct the typo in the last paragraph as below:
> >
> > To make the problem even harder, the incoming traffic can be spiky. And
> the
> > overhead of triggering checkpointing can be relatively low, in which case
> > it might be more performant (w.r.t. e2e lag) for the Flink job to
> > checkpoint at the more frequent interval in the continuous phase in face
> of
> > a spike in the number of pending records buffered in the source operator.
> >
> >
> > On Tue, May 30, 2023 at 9:17 AM Dong Lin  wrote:
> >
> > > Hi Piotrek,
> > >
> > > Thanks for providing more details of the alternative approach!
> > >
> > > If I understand your proposal correctly, here are the requirements for
> it
> > > to work without incurring any regression:
> > >
> > > 1) The source needs a way to determine whether there exists
> backpressure.
> > > 2) If there is backpressure, then it means e2e latency is already high
> > > and there should be no harm to use the less frequent checkpointing
> > interval.
> > > 3) The configuration of the "less frequent checkpointing interval"
> needs
> > > to be a job-level config so that it works for sources other than
> > > HybridSource.
> > >
> > > I would say that if we can find a way for the source to determine the
> > > "existence of backpressure" and meet the requirement 2), it would
> indeed
> > be
> > > a much more elegant approach that solves more use-cases.
> > >
> > > The devil is in the details. I am not sure how to determine the
> > "existence
> > > of backpressure". Let me explain my thoughts and maybe you can help
> > > provide the answers.
> > >
> > > To make the discussion more concrete, let's say the input records do
> not
> > > have event timestamps. Users want to checkpoint at least once every 30
> > > minutes to upper-bound the amount of duplicate work after job failover.
> > And
> > > users want to c

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-29 Thread Dong Lin
Let me correct the typo in the last paragraph as below:

To make the problem even harder, the incoming traffic can be spiky. And the
overhead of triggering checkpointing can be relatively low, in which case
it might be more performant (w.r.t. e2e lag) for the Flink job to
checkpoint at the more frequent interval in the continuous phase in face of
a spike in the number of pending records buffered in the source operator.


On Tue, May 30, 2023 at 9:17 AM Dong Lin  wrote:

> Hi Piotrek,
>
> Thanks for providing more details of the alternative approach!
>
> If I understand your proposal correctly, here are the requirements for it
> to work without incurring any regression:
>
> 1) The source needs a way to determine whether there exists backpressure.
> 2) If there is backpressure, then it means e2e latency is already high
> and there should be no harm to use the less frequent checkpointing interval.
> 3) The configuration of the "less frequent checkpointing interval" needs
> to be a job-level config so that it works for sources other than
> HybridSource.
>
> I would say that if we can find a way for the source to determine the
> "existence of backpressure" and meet the requirement 2), it would indeed be
> a much more elegant approach that solves more use-cases.
>
> The devil is in the details. I am not sure how to determine the "existence
> of backpressure". Let me explain my thoughts and maybe you can help
> provide the answers.
>
> To make the discussion more concrete, let's say the input records do not
> have event timestamps. Users want to checkpoint at least once every 30
> minutes to upper-bound the amount of duplicate work after job failover. And
> users want to checkpoint at least once every 30 seconds to upper-bound *extra
> e2e lag introduced by the Flink job* during the continuous processing
> phase.
>
> Since the input records do not have event timestamps, we can not rely on
> metrics such as currentFetchEventTimeLag [1] to determine the absolute e2e
> lag, because currentFetchEventTimeLag depends on the existence of event
> timestamps.
>
> Also note that, even if the input records have event timestamps and we can
> measure currentFetchEventTimeLag, we still need a threshold to determine
> whether the value of currentFetchEventTimeLag is too high. One idea might
> be to use the user-specified "less frequent checkpointing interval" as
> this threshold, which in this case is 30 seconds. But this approach can
> also cause regression. For example, let's say the records go through
> several Kafka/MirrorMaker pipelines after it is generated and before it is
> received by Flink, causing its currentFetchEventTimeLag to be always higher
> than 30 seconds. Then Flink will end up always using the "less frequent
> checkpointing interval" in the continuous phase, which in this case is 30
> minutes.
>
> Other options to determine the "existence of backpressure" includes using
> the absolute number of records in the source storage system that are
> waiting to be fetched (e.g. pendingRecords [1]), or using the absolute
> number of buffered records in the source output queue. However, I find it
> hard to reliably determine "e2e latency is already high" based on the
> absolute number of records. What threshold should we choose to determine
> that the number of pending records is too many (and it is safe to increase
> the checkpointing interval)?
>
> To make the problem even harder, the incoming traffic can be spiky. And
> the overhead of triggering checkpointing can be relative low, in which case
> it might be more performance (w.r.t. e2e lag) for the Flink job to
> checkpoint at the higher interval in the continuous phase in face of a
> spike in the number of pending records buffered in the source operator.
>
> The problems described above are the main reasons that I can not find a
> way to make the alternative approach work. Any thoughts?
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
>
>
> On Mon, May 29, 2023 at 11:23 PM Piotr Nowojski 
> wrote:
>
>> Hi
>>
>> @Jing
>>
>> > Your proposal to dynamically adjust the checkpoint intervals is elegant!
>> It
>> > makes sense to build it as a generic feature in Flink. Looking forward
>> to
>> > it. However, for some user cases, e.g. when users were aware of the
>> bounded
>> > sources (in the HybridSource) and care more about the throughput, the
>> > dynamic adjustment might not be required. Just let those bounded sources
>> > always have larger checkpoint intervals even when there is no back
>> > pressure. Because no one cares about lat

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-29 Thread Dong Lin
; Boundedness in Flink is a top level concept. I think it should be ok to
> > introduce a top level config for the top level concept. I am not familiar
> > with MySQL CDC. For those specific cases, you are right, your proposal
> can
> > provide the feature with minimal changes, like I mentioned previously, it
> > is a thoughtful design.  +1
> >
> > @Piotr
> >
> > > For example join (windowed/temporal) of two tables backed by a hybrid
> > > source? I could easily see a scenario where one table with little data
> > > catches up much more quickly.
> >
> > I am confused. I thought we were talking about HybridSource which "solves
> > the problem of sequentially reading input from heterogeneous sources to
> > produce a single input stream."[2]
> > I could not find any join within a HybridSource. So, your might mean
> > something else the join example and it should be out of the scope, if I
> am
> > not mistaken.
> >
> > > About the (un)boundness of the input stream. I'm not sure if that
> should
> > > actually matter. Actually the same issue, with two frequent
> checkpointing
> > > during a catch up period or when Flink is overloaded, could affect jobs
> > > that are purely unbounded, like continuously reading from Kafka. Even
> > more,
> > > nothing prevents users from actually storing bounded data in a Kafka
> > topic.
> > > Either way, I would like to refine my earlier idea, and instead of
> using
> > > metrics like `pendingRecords`, I think we could switch between fast and
> > > slow checkpointing intervals based on the information if the job is
> > > backpressured or not. My thinking is as follows:
> >
> > This is again a very different use case as HybridSource. Users do allow
> > storing bounded data in a Kafka and if it is not used as the last source
> in
> > a HybridSource, it is a bounded source and can still benefit from larger
> > checkpoint interval wrt the high throughput (Kafka or any other storage
> > does not matter). BTW, the larger checkpoint interval for bounded source
> is
> > optional, users can use it but must not use it, if they don't care about
> > the throughput with bounded data.
> >
> > Your proposal to dynamically adjust the checkpoint intervals is elegant!
> It
> > makes sense to build it as a generic feature in Flink. Looking forward to
> > it. However, for some user cases, e.g. when users were aware of the
> bounded
> > sources (in the HybridSource) and care more about the throughput, the
> > dynamic adjustment might not be required. Just let those bounded sources
> > always have larger checkpoint intervals even when there is no back
> > pressure. Because no one cares about latency in this case, let's turn off
> > the dynamic adjustment, reduce the checkpoint frequency, have better
> > throughput, and save unnecessary source consumption. Did I miss anything
> > here?
> >
> > Best regards,
> > Jing
> >
> >
> > [1]
> >
> >
> https://github.com/apache/flink/blob/6b6df3db466d6a030d5a38ec786ac3297cb41c38/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java#L244
> > [2]
> >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/#hybrid-source
> >
> >
> > On Thu, May 25, 2023 at 3:03 PM Dong Lin  wrote:
> >
> > > Hi Piotr,
> > >
> > > Thanks for the discussion. Please see my comments inline.
> > >
> > > On Thu, May 25, 2023 at 6:34 PM Piotr Nowojski 
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > Thanks for the discussion.
> > > >
> > > > @Dong
> > > >
> > > > > In the target use-case, we would like to HybridSource to trigger>
> > > > checkpoint more frequently when it is read the Kafka Source (than
> when
> > it
> > > > > is reading the HDFS source). We would need to set a flag for the
> > > > checkpoint
> > > > > trigger to know which source the HybridSource is reading from.
> > > >
> > > > Is this really your actual goal? Should users care if some table
> > defined
> > > in
> > > >
> > >
> > > My actual goal is to address the use-case described in the motivation
> > > section. More specifically,
> > > my goal is to provide API that uses can use to express their needed
> > > checkpointing interval
> > > at differe

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-25 Thread Dong Lin
I would like to refine my earlier idea, and instead of using
> metrics like `pendingRecords`, I think we could switch between fast and
> slow checkpointing intervals based on the information if the job is
> backpressured or not. My thinking is as follows:
>
> As a user, I would like to have my regular fast checkpointing interval for
> low latency, but the moment my system is not keeping up, if the
> backpressure builds up, or simply we have a huge backlog to reprocess,
> latency doesn't matter anymore. Only throughput matters. So I would like
> the checkpointing to slow down.
>
> I think this should cover pretty well most of the cases, what do you think?
>

Thank you for all the comments and this idea. I like this idea. We actually
thought about this idea before proposing this FLIP.

In order to make this idea work, we need to come-up with a good algorithm
that can dynamically change the checkpointing interval based on the
"backlog signal", without causing regression w.r.t. failover time and data
freshness. I find it hard to come up with this algorithm due to
insufficient "backlog signal".

For the use-case mentioned in the motivation section, the data in the
source does not have event timestamps to help determine the amount of
backlog. So the only source-of-truth for determining backlog is the amount
of data buffered in operators. But the buffer size is typically chosen to
be proportional to round-trip-time and throughput. Having a full buffer
does not necessarily mean that the data is lagging behind. And increasing
the checkpointing interval with insufficient "backlog signal" can have a
negative impact on data freshness and failover time.

In order to make this idea work, we would need to *provide* that the
algorithm would not negatively hurt data freshness and failover time when
it decides to increase checkpointing intervals. For now I cold not come up
with such an algorithm.

If this backpressured based behaviour is still not enough, I would still
> say that we should provide plugable checkpoint triggering controllers that
> would work based on metrics.


I am not sure how to address the use-case mentioned in the motivation
section, with the pluggable checkpoint trigger + metrics. Can you help
provide the definition of these APIs and kindly explain how that works to
address the mentioned use-case.

In the mentioned use-case, users want to have two different checkpointing
intervals at different phases of the HybridSource. We should provide an API
for users to express the extra checkpointing interval in addition to the
existing execution.checkpointing.interval. What would be the definition of
that API with this alternative approach?

Best,
Dong


>
> Best,
> Piotrek
>
> czw., 25 maj 2023 o 07:47 Dong Lin  napisał(a):
>
> > Hi Jing,
> >
> > Thanks for your comments!
> >
> > Regarding the idea of using the existing "boundedness" attribute of
> > sources, that is indeed something that we might find intuitive
> initially. I
> > have thought about this idea, but could not find a good way to make it
> > work. I will try to explain my thoughts and see if we can find a better
> > solution.
> >
> > Here is my understanding of the idea mentioned above: provide a job level
> > config execution.checkpoint.interval.bounded. Flink will use this as the
> > checkpointing interval whenever there exists at least one running source
> > which claims it is under the "bounded" stage.
> >
> > Note that we can not simply re-use the existing "boundedness" attribute
> of
> > source operators. The reason is that for sources such as MySQL CDC, its
> > boundedness can be "continuous_unbounded" because it can run
> continuously.
> > But MySQL CDC has two phases internally, where the source needs to first
> > read a snapshot (with bounded amount of data) and then read a binlog
> (with
> > unbounded amount of data).
> >
> > As a result, in order to support optimization for souces like MySQL CDC,
> we
> > need to expose an API for the source operator to declare whether it is
> > running at a bounded or continuous_unbounded stage. *This introduces the
> > need to define a new concept named "bounded stage".*
> >
> > Then, we will need to *introduce a new contract between source operators
> > and the Flink runtime*, saying that if there is a source that claims it
> is
> > running at the bounded stage, then Flink will use the "
> > execution.checkpoint.interval.bounded" as the checkpointing interval.
> >
> > Here are the the concerns I have with this approach:
> >
> > - The execution.checkpoint.interval.bounded is a top-level config,
> meaning
> > that every Flink us

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-24 Thread Dong Lin
ins
> historical data that has to be processed before the real-time data is
> allowed to be processed. Otherwise, up-to-date data will be overwritten by
> out-of-date data which turns out to be unexpected results in real business
> scenarios.
>
>
> Best regards,
> Jing
>
> [1]
>
> https://github.com/apache/flink/blob/fadde2a378aac4293676944dd513291919a481e3/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java#L41
>
> On Tue, May 23, 2023 at 5:53 PM Dong Lin  wrote:
>
> > Hi Piotr,
> >
> > Thanks for the comments. Let me try to understand your concerns and
> > hopefully address the concerns.
> >
> > >> What would happen if there are two (or more) operator coordinators
> with
> > conflicting desired checkpoint trigger behaviour
> >
> > With the proposed change, there won't exist any "*conflicting* desired
> > checkpoint trigger" by definition. Both job-level config and the proposed
> > API upperBoundCheckpointingInterval() means the upper-bound of the
> > checkpointing interval. If there are different upper-bounds proposed by
> > different source operators and the job-level config, Flink will try to
> > periodically trigger checkpoints at the interval corresponding to the
> > minimum of all these proposed upper-bounds.
> >
> > >> If one source is processing a backlog and the other is already
> > processing real time data..
> >
> > Overall, I am not sure we always want to have a longer checkpointing
> > interval. That really depends on the specific use-case and the job graph.
> >
> > The proposed API change mechanism for operators and users to specify
> > different checkpoint intervals at different periods of the job. Users
> have
> > the option to use the new API to get better performance in the use-case
> > specified in the motivation section. I believe there can be use-case
> where
> > the proposed API is not useful, in which case users can choose not to use
> > the API without incurring any performance regression.
> >
> > >> it might be a bit confusing and not user friendly to have multiple
> > places that can override the checkpointing behaviour in a different way
> >
> > Admittedly, adding more APIs always incur more complexity. But sometimes
> we
> > have to incur this complexity to address new use-cases. Maybe we can see
> if
> > there are more user-friendly way to address this use-case.
> >
> > >> already implemented and is simple from the perspective of Flink
> >
> > Do you mean that the HybridSource operator should invoke the rest API to
> > trigger checkpoints? The downside of this approach is that it makes it
> hard
> > for developers of source operators (e.g. MySQL CDC, HybridSource) to
> > address the target use-case. AFAIK, there is no existing case where we
> > require operator developers to use REST API to do their job.
> >
> > Can you help explain the benefit of using REST API over using the
> proposed
> > API?
> >
> > Note that this approach also seems to have the same downside mentioned
> > above: "multiple places that can override the checkpointing behaviour". I
> > am not sure there can be a solution to address the target use-case
> without
> > having multiple places that can affect the checkpointing behavior.
> >
> > >> check if `pendingRecords` for some source has exceeded the configured
> > threshold and based on that adjust the checkpointing interval accordingly
> >
> > I am not sure this approach can address the target use-case in a better
> > way. In the target use-case, we would like to HybridSource to trigger
> > checkpoint more frequently when it is read the Kafka Source (than when it
> > is reading the HDFS source). We would need to set a flag for the
> checkpoint
> > trigger to know which source the HybridSource is reading from. But IMO
> the
> > approach is less intuitive and more complex than having the HybridSource
> > invoke upperBoundCheckpointingInterval() directly once it is reading
> Kafka
> > Source.
> >
> > Maybe I did not understand the alternative approach rightly. I am happy
> to
> > discuss more on this topic. WDYT?
> >
> >
> > Best,
> > Dong
> >
> > On Tue, May 23, 2023 at 10:27 PM Piotr Nowojski 
> > wrote:
> >
> > > Hi,
> > >
> > > Thanks for the proposal. However, are you sure that the
> > > OperatorCoordinator is the right place to place such logic? What would
> > > happen if there are two (or more) operator coordinators w

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-23 Thread Dong Lin
Hi Piotr,

Thanks for the comments. Let me try to understand your concerns and
hopefully address the concerns.

>> What would happen if there are two (or more) operator coordinators with
conflicting desired checkpoint trigger behaviour

With the proposed change, there won't exist any "*conflicting* desired
checkpoint trigger" by definition. Both job-level config and the proposed
API upperBoundCheckpointingInterval() means the upper-bound of the
checkpointing interval. If there are different upper-bounds proposed by
different source operators and the job-level config, Flink will try to
periodically trigger checkpoints at the interval corresponding to the
minimum of all these proposed upper-bounds.

>> If one source is processing a backlog and the other is already
processing real time data..

Overall, I am not sure we always want to have a longer checkpointing
interval. That really depends on the specific use-case and the job graph.

The proposed API change mechanism for operators and users to specify
different checkpoint intervals at different periods of the job. Users have
the option to use the new API to get better performance in the use-case
specified in the motivation section. I believe there can be use-case where
the proposed API is not useful, in which case users can choose not to use
the API without incurring any performance regression.

>> it might be a bit confusing and not user friendly to have multiple
places that can override the checkpointing behaviour in a different way

Admittedly, adding more APIs always incur more complexity. But sometimes we
have to incur this complexity to address new use-cases. Maybe we can see if
there are more user-friendly way to address this use-case.

>> already implemented and is simple from the perspective of Flink

Do you mean that the HybridSource operator should invoke the rest API to
trigger checkpoints? The downside of this approach is that it makes it hard
for developers of source operators (e.g. MySQL CDC, HybridSource) to
address the target use-case. AFAIK, there is no existing case where we
require operator developers to use REST API to do their job.

Can you help explain the benefit of using REST API over using the proposed
API?

Note that this approach also seems to have the same downside mentioned
above: "multiple places that can override the checkpointing behaviour". I
am not sure there can be a solution to address the target use-case without
having multiple places that can affect the checkpointing behavior.

>> check if `pendingRecords` for some source has exceeded the configured
threshold and based on that adjust the checkpointing interval accordingly

I am not sure this approach can address the target use-case in a better
way. In the target use-case, we would like to HybridSource to trigger
checkpoint more frequently when it is read the Kafka Source (than when it
is reading the HDFS source). We would need to set a flag for the checkpoint
trigger to know which source the HybridSource is reading from. But IMO the
approach is less intuitive and more complex than having the HybridSource
invoke upperBoundCheckpointingInterval() directly once it is reading Kafka
Source.

Maybe I did not understand the alternative approach rightly. I am happy to
discuss more on this topic. WDYT?


Best,
Dong

On Tue, May 23, 2023 at 10:27 PM Piotr Nowojski 
wrote:

> Hi,
>
> Thanks for the proposal. However, are you sure that the
> OperatorCoordinator is the right place to place such logic? What would
> happen if there are two (or more) operator coordinators with conflicting
> desired checkpoint trigger behaviour? If one source is processing a backlog
> and the other is already processing real time data, I would assume that in
> most use cases you would like to still have the longer checkpointing
> interval, not the shorter one. Also apart from that, it might be a bit
> confusing and not user friendly to have multiple places that can override
> the checkpointing behaviour in a different way.
>
> FIY in the past, we had some discussions about similar requests and back
> then we chose to keep the system simpler, and exposed a more generic REST
> API checkpoint triggering mechanism. I know that having to implement such
> logic outside of Flink and having to call REST calls to trigger checkpoints
> might not be ideal, but that's already implemented and is simple from the
> perspective of Flink.
>
> I don't know, maybe instead of adding this logic to operator coordinators,
> `CheckpointCoordinator` should have a pluggable `CheckpointTrigger`, that
> the user could configure like a `MetricReporter`. The default one would be
> just periodically triggering checkpoints. Maybe
> `BacklogDynamicCheckpointTrigger` could look at metrics[1], check if
> `pendingRecords` for some source has exceeded the configured threshold and
> based on that adjust the checkpointing interval accordingly? This would at
> least address some of my concerns.
>
> WDYT?
>
> Best,
> Piotrek
>
> [1]
> 

[RESULT] [VOTE] Apache Flink ML Release 2.2.0, release candidate #2

2023-04-18 Thread Dong Lin
I'm happy to announce that we have unanimously approved this release [1].

There are 4 approving votes, 3 of which are binding:

- Guowei Ma (binding)
- Xingbo Huang (binding)
- Dong Lin (binding)
- Zhipeng Zhang (non-binding)

There are no disapproving votes.

Thank you for verifying the release candidate. We will now proceed to
finalize the release and announce it once everything is published.

[1] https://lists.apache.org/thread/2w84ym18bzwfsd7mk30nqdo55ppc51qf

Cheers,
Dong


[VOTE] Apache Flink ML Release 2.2.0, release candidate #2

2023-04-13 Thread Dong Lin
Hi everyone,

Please review and vote on the release candidate #2 for version 2.2.0 of
Apache Flink ML as follows:

[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)

**Testing Guideline**

You can find here [1] a page in the project wiki on instructions for
testing.

To cast a vote, it is not necessary to perform all listed checks, but
please mention which checks you have performed when voting.

**Release Overview**

As an overview, the release consists of the following:
a) Flink ML source release to be deployed to dist.apache.org
b) Flink ML Python source distributions to be deployed to PyPI
c) Maven artifacts to be deployed to the Maven Central Repository

**Staging Areas to Review**

The staging areas containing the above-mentioned artifacts are as follows, for
your review:

- All artifacts for a) and b) can be found in the corresponding dev repository
at dist.apache.org [2], which are signed with the key with fingerprint AFAC
DB09 E6F0 FF28 C93D 64BC BEED 4F6C B9F7 7D0E [3]
- All artifacts for c) can be found at the Apache Nexus Repository [4]

**Other links for your review**

- JIRA release notes [5]
- Source code tag "release-2.2.0-rc2" [6]
- PR to update the website Downloads page to include Flink ML links [7]

**Vote Duration**

The voting time will run for at least 72 hours. It is adopted by majority
approval, with at least 3 PMC affirmative votes.


Cheers,
Dong


[1] https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+ML+
Release
[2] https://dist.apache.org/repos/dist/dev/flink/flink-ml-2.2.0-rc2/
[3] https://dist.apache.org/repos/dist/release/flink/KEYS
[4] https://repository.apache.org/content/repositories/orgapacheflink-1605/
[5]
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351884
[6] https://github.com/apache/flink-ml/releases/tag/release-2.2.0-rc2
[7] https://github.com/apache/flink-web/pull/630


[jira] [Created] (FLINK-31753) Support DataStream CoGroup in stream Mode with similar performance as DataSet CoGroup

2023-04-07 Thread Dong Lin (Jira)
Dong Lin created FLINK-31753:


 Summary: Support DataStream CoGroup in stream Mode with similar 
performance as DataSet CoGroup
 Key: FLINK-31753
 URL: https://issues.apache.org/jira/browse/FLINK-31753
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Reporter: Dong Lin
Assignee: Dong Lin
 Fix For: ml-2.3.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Releasing Flink ML 2.2.0

2023-04-03 Thread Dong Lin
Thanks everyone for the comments!

We will go ahead to release Flink ML 2.2.0. Please see here
 for the
release plan.

Best Regards,
Dong


On Fri, Mar 31, 2023 at 6:50 PM Yu Li  wrote:

> +1. Great to know the (exciting) progress and thanks for the efforts!
>
> Best Regards,
> Yu
>
>
> On Fri, 31 Mar 2023 at 14:39, Fan Hong  wrote:
>
>> Hi Dong and Zhipeng,
>>
>> Thanks for starting the discussion. Glad to see a new release of Flink ML.
>>
>> Cheers!
>>
>> On Fri, Mar 31, 2023 at 2:34 PM Zhipeng Zhang 
>> wrote:
>>
>> > Hi Dong,
>> >
>> > Thanks for starting the discussion. +1 for the Flink ML 2.1.0 release.
>> >
>>
>


[jira] [Created] (FLINK-31681) Network connection timeout between operators should trigger either network re-connection or job failover

2023-03-31 Thread Dong Lin (Jira)
Dong Lin created FLINK-31681:


 Summary: Network connection timeout between operators should 
trigger either network re-connection or job failover
 Key: FLINK-31681
 URL: https://issues.apache.org/jira/browse/FLINK-31681
 Project: Flink
  Issue Type: Bug
Reporter: Dong Lin


If a network connection error occurs between two operators, the upstream 
operator may log the following error message in the method 
PartitionRequestQueue#handleException and subsequently close the connection. 
When this happens, the Flink job may become stuck without completing or 
failing. 

To avoid this issue, we can either allow the upstream operator to reconnect 
with the downstream operator, or enable job failover so that users can take 
corrective action promptly.

org.apache.flink.runtime.io.network.netty.PartitionRequestQueue - Encountered 
error while consuming partitions 
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors#NativeIOException: 
writeAccess(...) failed: Connection timed out.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [ANNOUNCE] Flink Table Store Joins Apache Incubator as Apache Paimon(incubating)

2023-03-29 Thread Dong Lin
Congratulations!

Dong

On Mon, Mar 27, 2023 at 5:24 PM Yu Li  wrote:

> Dear Flinkers,
>
>
>
> As you may have noticed, we are pleased to announce that Flink Table Store 
> has joined the Apache Incubator as a separate project called Apache 
> Paimon(incubating) [1] [2] [3]. The new project still aims at building a 
> streaming data lake platform for high-speed data ingestion, change data 
> tracking and efficient real-time analytics, with the vision of supporting a 
> larger ecosystem and establishing a vibrant and neutral open source community.
>
>
>
> We would like to thank everyone for their great support and efforts for the 
> Flink Table Store project, and warmly welcome everyone to join the 
> development and activities of the new project. Apache Flink will continue to 
> be one of the first-class citizens supported by Paimon, and we believe that 
> the Flink and Paimon communities will maintain close cooperation.
>
>
> 亲爱的Flinkers,
>
>
> 正如您可能已经注意到的,我们很高兴地宣布,Flink Table Store 已经正式加入 Apache
> 孵化器独立孵化 [1] [2] [3]。新项目的名字是
> Apache 
> Paimon(incubating),仍致力于打造一个支持高速数据摄入、流式数据订阅和高效实时分析的新一代流式湖仓平台。此外,新项目将支持更加丰富的生态,并建立一个充满活力和中立的开源社区。
>
>
> 在这里我们要感谢大家对 Flink Table Store 项目的大力支持和投入,并热烈欢迎大家加入新项目的开发和社区活动。Apache Flink
> 将继续作为 Paimon 支持的主力计算引擎之一,我们也相信 Flink 和 Paimon 社区将继续保持密切合作。
>
>
> Best Regards,
>
> Yu (on behalf of the Apache Flink PMC and Apache Paimon PPMC)
>
>
> 致礼,
>
> 李钰(谨代表 Apache Flink PMC 和 Apache Paimon PPMC)
>
>
> [1] https://paimon.apache.org/
>
> [2] https://github.com/apache/incubator-paimon
>
> [3] https://cwiki.apache.org/confluence/display/INCUBATOR/PaimonProposal
>


  1   2   3   >