Re: [ANNOUNCE] New Apache Flink Committer - Junrui Li

2024-11-05 Thread Lijie Wang
Congratulations Junrui !

Best,
Lijie

Zhu Zhu  于2024年11月5日周二 20:00写道:

> Hi everyone,
>
> On behalf of the PMC, I'm happy to announce that Junrui Li has become a
> new Flink Committer!
>
> Junrui has been an active contributor to the Apache Flink project for two
> years. He had been the driver and major developer of 8 FLIPs, contributed
> 100+ commits with tens of thousands of code lines.
>
> His contributions mainly focus on enhancing Flink batch execution
> capabilities, including enabling parallelism inference by
> default(FLIP-283),
> supporting progress recovery after JM failover(FLIP-383), and supporting
> adaptive optimization of logical execution plan (FLIP-468/469).
> Furthermore,
> Junrui did a lot of work to improve Flink's configuration layer, addressing
> technical debt and enhancing its user-friendliness. He is also active in
> mailing lists, participating in discussions and answering user questions.
>
> Please join me in congratulating Junrui Li for becoming an Apache Flink
> committer.
>
> Best,
> Zhu (on behalf of the Flink PMC)
>


Re: [ANNOUNCE] New Apache Flink PMC Member - Fan Rui

2024-06-05 Thread Lijie Wang
Congratulations, Rui!

Best,
Lijie

Rodrigo Meneses  于2024年6月5日周三 21:35写道:

> All the best
>
> On Wed, Jun 5, 2024 at 5:56 AM xiangyu feng  wrote:
>
> > Congratulations, Rui!
> >
> > Regards,
> > Xiangyu Feng
> >
> > Feng Jin  于2024年6月5日周三 20:42写道:
> >
> > > Congratulations, Rui!
> > >
> > >
> > > Best,
> > > Feng Jin
> > >
> > > On Wed, Jun 5, 2024 at 8:23 PM Yanfei Lei  wrote:
> > >
> > > > Congratulations, Rui!
> > > >
> > > > Best,
> > > > Yanfei
> > > >
> > > > Luke Chen  于2024年6月5日周三 20:08写道:
> > > > >
> > > > > Congrats, Rui!
> > > > >
> > > > > Luke
> > > > >
> > > > > On Wed, Jun 5, 2024 at 8:02 PM Jiabao Sun 
> > > wrote:
> > > > >
> > > > > > Congrats, Rui. Well-deserved!
> > > > > >
> > > > > > Best,
> > > > > > Jiabao
> > > > > >
> > > > > > Zhanghao Chen  于2024年6月5日周三 19:29写道:
> > > > > >
> > > > > > > Congrats, Rui!
> > > > > > >
> > > > > > > Best,
> > > > > > > Zhanghao Chen
> > > > > > > 
> > > > > > > From: Piotr Nowojski 
> > > > > > > Sent: Wednesday, June 5, 2024 18:01
> > > > > > > To: dev ; rui fan <1996fan...@gmail.com>
> > > > > > > Subject: [ANNOUNCE] New Apache Flink PMC Member - Fan Rui
> > > > > > >
> > > > > > > Hi everyone,
> > > > > > >
> > > > > > > On behalf of the PMC, I'm very happy to announce another new
> > Apache
> > > > Flink
> > > > > > > PMC Member - Fan Rui.
> > > > > > >
> > > > > > > Rui has been active in the community since August 2019. During
> > this
> > > > time
> > > > > > he
> > > > > > > has contributed a lot of new features. Among others:
> > > > > > >   - Decoupling Autoscaler from Kubernetes Operator, and
> > supporting
> > > > > > > Standalone Autoscaler
> > > > > > >   - Improvements to checkpointing, flamegraphs, restart
> > strategies,
> > > > > > > watermark alignment, network shuffles
> > > > > > >   - Optimizing the memory and CPU usage of large operators,
> > greatly
> > > > > > > reducing the risk and probability of TaskManager OOM
> > > > > > >
> > > > > > > He reviewed a significant amount of PRs and has been active
> both
> > on
> > > > the
> > > > > > > mailing lists and in Jira helping to both maintain and grow
> > Apache
> > > > > > Flink's
> > > > > > > community. He is also our current Flink 1.20 release manager.
> > > > > > >
> > > > > > > In the last 12 months, Rui has been the most active contributor
> > in
> > > > the
> > > > > > > Flink Kubernetes Operator project, while being the 2nd most
> > active
> > > > Flink
> > > > > > > contributor at the same time.
> > > > > > >
> > > > > > > Please join me in welcoming and congratulating Fan Rui!
> > > > > > >
> > > > > > > Best,
> > > > > > > Piotrek (on behalf of the Flink PMC)
> > > > > > >
> > > > > >
> > > >
> > >
> >
>


Re: [ANNOUNCE] New Apache Flink PMC Member - Weijie Guo

2024-06-04 Thread Lijie Wang
Congratulations, Weijie!

Best,
Lijie

Zakelly Lan  于2024年6月4日周二 20:45写道:

> Congratulations, Weijie!
>
> Best,
> Zakelly
>
> On Tue, Jun 4, 2024 at 7:49 PM Sergey Nuyanzin 
> wrote:
>
> > Congratulations Weijio Guo!
> >
> > On Tue, Jun 4, 2024, 13:45 Jark Wu  wrote:
> >
> > > Congratulations, Weijie!
> > >
> > > Best,
> > > Jark
> > >
> > > On Tue, 4 Jun 2024 at 19:10, spoon_lz  wrote:
> > >
> > > > Congratulations, Weijie!
> > > >
> > > >
> > > >
> > > > Regards,
> > > > Zhuo.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >  Replied Message 
> > > > | From | Aleksandr Pilipenko |
> > > > | Date | 06/4/2024 18:59 |
> > > > | To |  |
> > > > | Subject | Re: [ANNOUNCE] New Apache Flink PMC Member - Weijie Guo |
> > > > Congratulations, Weijie!
> > > >
> > > > Best,
> > > > Aleksandr
> > > >
> > > > On Tue, 4 Jun 2024 at 11:42, Abdulquddus Babatunde Ekemode <
> > > > abdulqud...@aligence.io> wrote:
> > > >
> > > > Congratulations! I wish you all the best.
> > > >
> > > > Best Regards,
> > > > Abdulquddus
> > > >
> > > > On Tue, 4 Jun 2024 at 13:14, Ahmed Hamdy 
> wrote:
> > > >
> > > > Congratulations Weijie
> > > > Best Regards
> > > > Ahmed Hamdy
> > > >
> > > >
> > > > On Tue, 4 Jun 2024 at 10:51, Matthias Pohl 
> wrote:
> > > >
> > > > Congratulations, Weijie!
> > > >
> > > > Matthias
> > > >
> > > > On Tue, Jun 4, 2024 at 11:12 AM Guowei Ma 
> > > > wrote:
> > > >
> > > > Congratulations!
> > > >
> > > > Best,
> > > > Guowei
> > > >
> > > >
> > > > On Tue, Jun 4, 2024 at 4:55 PM gongzhongqiang <
> > > > gongzhongqi...@apache.org
> > > >
> > > > wrote:
> > > >
> > > > Congratulations Weijie! Best,
> > > > Zhongqiang Gong
> > > >
> > > > Xintong Song  于2024年6月4日周二 14:46写道:
> > > >
> > > > Hi everyone,
> > > >
> > > > On behalf of the PMC, I'm very happy to announce that Weijie Guo
> > > > has
> > > > joined
> > > > the Flink PMC!
> > > >
> > > > Weijie has been an active member of the Apache Flink community
> > > > for
> > > > many
> > > > years. He has made significant contributions in many components,
> > > > including
> > > > runtime, shuffle, sdk, connectors, etc. He has driven /
> > > > participated
> > > > in
> > > > many FLIPs, authored and reviewed hundreds of PRs, been
> > > > consistently
> > > > active
> > > > on mailing lists, and also helped with release management of 1.20
> > > > and
> > > > several other bugfix releases.
> > > >
> > > > Congratulations and welcome Weijie!
> > > >
> > > > Best,
> > > >
> > > > Xintong (on behalf of the Flink PMC)
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > >
> >
>


Re: [VOTE] FLIP-445: Support dynamic parallelism inference for HiveSource

2024-04-26 Thread Lijie Wang
+1 (binding)

Best,
Lijie

Zhu Zhu  于2024年4月26日周五 14:47写道:

> +1 (binding)
>
> Thanks,
> Zhu
>
> Ron Liu  于2024年4月26日周五 13:11写道:
>
> > +1(binding)
> >
> > Best,
> > Ron
> >
> > Rui Fan <1996fan...@gmail.com> 于2024年4月26日周五 12:55写道:
> >
> > > +1(binding)
> > >
> > > Best,
> > > Rui
> > >
> > > On Fri, Apr 26, 2024 at 10:26 AM Muhammet Orazov
> > >  wrote:
> > >
> > > > Hey Xia,
> > > >
> > > > +1 (non-binding)
> > > >
> > > > Thanks and best,
> > > > Muhammet
> > > >
> > > > On 2024-04-26 02:21, Xia Sun wrote:
> > > > > Hi everyone,
> > > > >
> > > > > I'd like to start a vote on FLIP-445: Support dynamic parallelism
> > > > > inference
> > > > > for HiveSource[1] which has been discussed in this thread [2].
> > > > >
> > > > > The vote will be open for at least 72 hours unless there is an
> > > > > objection or
> > > > > not enough votes.
> > > > >
> > > > >
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-445%3A+Support+dynamic+parallelism+inference+for+HiveSource
> > > > > [2]
> https://lists.apache.org/thread/4k1qx6lodhbkknkhjyl0lq9bx8fcpjvn
> > > > >
> > > > >
> > > > > Best,
> > > > > Xia
> > > >
> > >
> >
>


Re: [DISCUSS] FLIP-445: Support dynamic parallelism inference for HiveSource

2024-04-17 Thread Lijie Wang
Thanks for driving the discussion.

+1 for the proposal and +1 for the `InferMode.NONE` option.

Best,
Lijie

Ron liu  于2024年4月18日周四 11:36写道:

> Hi, Xia
>
> Thanks for driving this FLIP.
>
> This proposal looks good to me overall. However, I have the following minor
> questions:
>
> 1. FLIP introduced `table.exec.hive.infer-source-parallelism.mode` as a new
> parameter, and the value is the enum class `InferMode`, I think the
> InferMode class should also be introduced in the Public Interfaces section!
> 2. You mentioned in FLIP that the default value of
> `table.exec.hive.infer-source-parallelism.max` is 1024, I checked through
> the code that the default value is 1000?
> 3. I also agree with Muhammet's idea that there is no need to introduce the
> option `table.exec.hive.infer-source-parallelism.enabled`, and that
> expanding the InferMode values will fulfill the need. There is another
> issue to consider here though, how are
> `table.exec.hive.infer-source-parallelism` and
> `table.exec.hive.infer-source-parallelism.mode` compatible?
> 4. In FLIP-367 it is supported to be able to set the Source's parallelism
> individually, if in the future HiveSource also supports this feature,
> however, the default value of
> `table.exec.hive.infer-source-parallelism.mode` is `InferMode. DYNAMIC`, at
> this point will the parallelism be dynamically derived or will the manually
> set parallelism take effect, and who has the higher priority?
>
> Best,
> Ron
>
> Xia Sun  于2024年4月17日周三 12:08写道:
>
> > Hi Jeyhun, Muhammet,
> > Thanks for all the feedback!
> >
> > > Could you please mention the default values for the new configurations
> > > (e.g., table.exec.hive.infer-source-parallelism.mode,
> > > table.exec.hive.infer-source-parallelism.enabled,
> > > etc) ?
> >
> >
> > Thanks for your suggestion. I have supplemented the explanation regarding
> > the default values.
> >
> > > Since we are introducing the mode as a configuration option,
> > > could it make sense to have `InferMode.NONE` option also?
> > > The `NONE` option would disable the inference.
> >
> >
> > This is a good idea. Looking ahead, it could eliminate the need for
> > introducing
> > a new configuration option. I haven't identified any potential
> > compatibility issues
> > as yet. If there are no further ideas from others, I'll go ahead and
> update
> > the FLIP to
> > introducing InferMode.NONE.
> >
> > Best,
> > Xia
> >
> > Muhammet Orazov  于2024年4月17日周三 10:31写道:
> >
> > > Hello Xia,
> > >
> > > Thanks for the FLIP!
> > >
> > > Since we are introducing the mode as a configuration option,
> > > could it make sense to have `InferMode.NONE` option also?
> > > The `NONE` option would disable the inference.
> > >
> > > This way we deprecate the `table.exec.hive.infer-source-parallelism`
> > > and no additional `table.exec.hive.infer-source-parallelism.enabled`
> > > option is required.
> > >
> > > What do you think?
> > >
> > > Best,
> > > Muhammet
> > >
> > > On 2024-04-16 07:07, Xia Sun wrote:
> > > > Hi everyone,
> > > > I would like to start a discussion on FLIP-445: Support dynamic
> > > > parallelism
> > > > inference for HiveSource[1].
> > > >
> > > > FLIP-379[2] has introduced dynamic source parallelism inference for
> > > > batch
> > > > jobs, which can utilize runtime information to more accurately decide
> > > > the
> > > > source parallelism. As a follow-up task, we plan to implement the
> > > > dynamic
> > > > parallelism inference interface for HiveSource, and also switch the
> > > > default
> > > > static parallelism inference to dynamic parallelism inference.
> > > >
> > > > Looking forward to your feedback and suggestions, thanks.
> > > >
> > > > [1]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-445%3A+Support+dynamic+parallelism+inference+for+HiveSource
> > > > [2]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs
> > > >
> > > > Best regards,
> > > > Xia
> > >
> >
>


Re: [ANNOUNCE] New Apache Flink PMC Member - Lincoln Lee

2024-04-12 Thread Lijie Wang
Congratulations Lincoln !

Best,
Lijie

 于2024年4月12日周五 18:16写道:

> Congratulations Lincoln!
> On Apr 12, 2024 at 12:08 +0200, yuxia ,
> wrote:
> > Congratulations, Lincoln!
> >
> > Best regards,
> > Yuxia
> >
> > - 原始邮件 -
> > 发件人: "Feng Jin" 
> > 收件人: "dev" 
> > 发送时间: 星期五, 2024年 4 月 12日 下午 5:23:40
> > 主题: Re: [ANNOUNCE] New Apache Flink PMC Member - Lincoln Lee
> >
> > Congratulations, Lincoln!
> >
> > Best,
> > Feng Jin
> >
> > On Fri, Apr 12, 2024 at 5:20 PM xiangyu feng 
> wrote:
> >
> > > Congratulations, Lincoln!
> > >
> > > Best,
> > > Xiangyu Feng
> > >
> > > Feifan Wang  于2024年4月12日周五 17:19写道:
> > >
> > > > > Congratulations, Lincoln!
> > > > >
> > > > >
> > > > > ——
> > > > >
> > > > > Best regards,
> > > > >
> > > > > Feifan Wang
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > At 2024-04-12 15:59:00, "Jark Wu"  wrote:
> > > > > > >Hi everyone,
> > > > > > >
> > > > > > >On behalf of the PMC, I'm very happy to announce that Lincoln
> Lee has
> > > > > > >joined the Flink PMC!
> > > > > > >
> > > > > > >Lincoln has been an active member of the Apache Flink community
> for
> > > > > > >many years. He mainly works on Flink SQL component and has
> driven
> > > > > > >/pushed many FLIPs around SQL, including FLIP-282/373/415/435 in
> > > > > > >the recent versions. He has a great technical vision of Flink
> SQL and
> > > > > > >participated in plenty of discussions in the dev mailing list.
> Besides
> > > > > > >that,
> > > > > > >he is community-minded, such as being the release manager of
> 1.19,
> > > > > > >verifying releases, managing release syncs, writing the release
> > > > > > >announcement etc.
> > > > > > >
> > > > > > >Congratulations and welcome Lincoln!
> > > > > > >
> > > > > > >Best,
> > > > > > >Jark (on behalf of the Flink PMC)
> > > > >
> > >
>


Re: [VOTE] FLIP-441: Show the JobType and remove Execution Mode on Flink WebUI

2024-04-11 Thread Lijie Wang
+1 (binding)

Thanks for driving.

Best,
Lijie

Zakelly Lan  于2024年4月12日周五 11:08写道:

> +1 non-binding
>
>
> Best,
> Zakelly
>
> On Fri, Apr 12, 2024 at 11:05 AM Yuepeng Pan 
> wrote:
>
> > Hi Rui,
> >
> > Thanks for driving it!
> > +1  (non-binding)
> > Best,
> > Yuepeng Pan
> >
> > At 2024-04-12 10:31:19, "Yanfei Lei"  wrote:
> > >Hi Rui,
> > >
> > >Thanks for driving it!
> > >
> > >+1  (binding)
> > >
> > >Hangxiang Yu  于2024年4月12日周五 10:26写道:
> > >>
> > >> +1  (binding)
> > >>
> > >> On Fri, Apr 12, 2024 at 10:22 AM Jinzhong Li <
> lijinzhong2...@gmail.com>
> > >> wrote:
> > >>
> > >> > +1  (non binding)
> > >> >
> > >> > Bests,
> > >> > Jinzhong
> > >> >
> > >> > On Thu, Apr 11, 2024 at 7:26 AM Muhammet Orazov
> > >> >  wrote:
> > >> >
> > >> > > Hey Rui,
> > >> > >
> > >> > > +1 (non-binding).
> > >> > >
> > >> > > Thanks for driving it!
> > >> > >
> > >> > > Best,
> > >> > > Muhammet
> > >> > >
> > >> > > On 2024-04-10 04:36, Rui Fan wrote:
> > >> > > > Hi devs,
> > >> > > >
> > >> > > > Thank you to everyone for the feedback on FLIP-441: Show
> > >> > > > the JobType and remove Execution Mode on Flink WebUI[1]
> > >> > > > which has been discussed in this thread [2].
> > >> > > >
> > >> > > > I would like to start a vote for it. The vote will be open for
> at
> > least
> > >> > > > 72
> > >> > > > hours unless there is an objection or not enough votes.
> > >> > > >
> > >> > > > [1] https://cwiki.apache.org/confluence/x/agrPEQ
> > >> > > > [2]
> > https://lists.apache.org/thread/0s52w17w24x7m2zo6ogl18t1fy412vcd
> > >> > > >
> > >> > > > Best,
> > >> > > > Rui
> > >> > >
> > >> >
> > >>
> > >>
> > >> --
> > >> Best,
> > >> Hangxiang.
> > >
> > >
> > >
> > >--
> > >Best,
> > >Yanfei
> >
>


Re: Question around Flink's AdaptiveBatchScheduler

2024-02-29 Thread Lijie Wang
Hi Venkat,

>> default-source-parallelism config should be independent from the
max-parallelism

Actually, it's not.

Firstly, it's obvious that the parallelism should be less than or equal to
the max parallelism(both literally and execution). The
"jobmanager.adaptive-batch-scheduler.max-parallelism" will be used as the
max parallelism for a vertex if you don't set max parallelism for it
individually (Just like the source in your case).

Secondly, we first need to limit the max parallelism of (downstream)
vertex, and then we can decide how many subpartitions (upstream vertex)
should produce. The limit should be effective, otherwise some downstream
tasks will have no data to process. Source can actually ignore this limit
because it has no upstream, but this will lead to semantic inconsistency.

Best,
Lijie

Venkatakrishnan Sowrirajan  于2024年2月29日周四 05:49写道:

> Hi Flink devs,
>
> With Flink's AdaptiveBatchScheduler
> <
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/#adaptive-batch-scheduler
> >
> (Note:
> this is different from AdaptiveScheduler
> <
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/#adaptive-scheduler
> >),
> the scheduler automatically determines the correct number of downstream
> tasks required to process the shuffle generated by the upstream vertex.
>
> I have a question regarding the current behavior. There are 2 configs which
> are in interplay here.
> 1. jobmanager.adaptive-batch-scheduler.default-source-parallelism
> <
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-default-source-parallelism
> >
>  - The default parallelism of data source.
> 2. jobmanager.adaptive-batch-scheduler.max-parallelism
> <
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-max-parallelism
> >
> -
> Upper bound of allowed parallelism to set adaptively.
>
> Currently, if "
> jobmanager.adaptive-batch-scheduler.default-source-parallelism
> <
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-default-source-parallelism
> >"
> is greater than "jobmanager.adaptive-batch-scheduler.max-parallelism
> <
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-max-parallelism
> >",
> Flink application fails with the below message:
>
> "Vertex's parallelism should be smaller than or equal to vertex's max
> parallelism."
>
> This is the corresponding code in Flink's DefaultVertexParallelismInfo
> <
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java#L110
> >.
> My question is, "default-source-parallelism" config should be independent
> from the "max-parallelism" flag. The former controls the default source
> parallelism while the latter controls the max number of partitions to write
> the intermediate shuffle.
>
> If this is true, then the above check should be fixed. Otherwise, wanted to
> understand why the "default-source-parallelism` should be less than the
> "max-parallelism"
>
> Thanks
> Venkat
>


Re: [VOTE] FLIP-406: Reorganize State & Checkpointing & Recovery Configuration

2024-01-24 Thread Lijie Wang
+1 (binding)

Best,
Lijie

Yanfei Lei  于2024年1月25日周四 10:06写道:

> +1 (binding)
>
> Hangxiang Yu  于2024年1月25日周四 10:00写道:
> >
> > +1 (binding)
> >
> > On Thu, Jan 25, 2024 at 8:49 AM Rui Fan <1996fan...@gmail.com> wrote:
> >
> > > +1(binding)
> > >
> > > Best,
> > > Rui
> > >
> > > On Wed, 24 Jan 2024 at 21:50, Zakelly Lan 
> wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > I'd like to start a vote on the FLIP-406: Reorganize State &
> > > Checkpointing
> > > > & Recovery Configuration [1]. The discussion thread is here [2].
> > > >
> > > > The vote will be open for at least 72 hours unless there is an
> objection
> > > or
> > > > insufficient votes.
> > > >
> > > > [1]
> > > >
> > >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=284789560
> > > > [2] https://lists.apache.org/thread/0oc10cr2q2ms855dbo29s7v08xs3bvqg
> > > >
> > > >
> > > > Best,
> > > > Zakelly
> > > >
> > >
> >
> >
> > --
> > Best,
> > Hangxiang.
>
>
>
> --
> Best,
> Yanfei
>


Re: [DISCUSS] FLIP-417: Expose JobManagerOperatorMetrics via REST API

2024-01-17 Thread Lijie Wang
Hi Mason,

Thanks for driving the discussion. +1 for the proposal.

How about we return all operator metrics in a vertex? (the response is a
map of operatorId/operatorName -> operator-metrics). Correspondingly, the
url may be changed to /jobs//vertices//operator-metrics

In this way, users can skip the step of obtaining the operator id.

Best,
Lijie

Hang Ruan  于2024年1月17日周三 10:31写道:

> Hi, Mason.
>
> The field `operatorName` in JobManagerOperatorQueryScopeInfo refers to the
> fields in OperatorQueryScopeInfo and chooses the operatorName instead of
> OperatorID.
> It is fine by my side to change from opertorName to operatorID in this
> FLIP.
>
> Best,
> Hang
>
> Mason Chen  于2024年1月17日周三 09:39写道:
>
> > Hi Xuyang and Hang,
> >
> > Thanks for your support and feedback! See my responses below:
> >
> > 1. IIRC, in a sense, operator ID and vertex ID are the same thing. The
> > > operator ID can
> > > be converted from the vertex ID[1]. Therefore, it is somewhat strange
> to
> > > have both vertex
> > > ID and operator ID in a single URL.
> > >
> > I think Hang explained it perfectly. Essentially, a vertix may contain
> one
> > or more operators so the operator ID is required to distinguish this
> case.
> >
> > 2. If I misunderstood the semantics of operator IDs here, then what is
> the
> > > relationship
> > > between vertex ID and operator ID, and do we need a url like
> > > `/jobs//vertices//operators/`
> > > to list all operator ids under this vertices?
> > >
> > Good question, we definitely need expose operator IDs through the REST
> API
> > to make this usable. I'm looking at how users would currently discover
> the
> > vertex id to query. From the supported REST APIs [1], you can currently
> > obtain it from
> >
> > 1. `/jobs/`
> > 2. `/jobs//plan`
> >
> > From the response of both these APIs, they include the vertex ids (the
> > vertices AND nodes fields), but not the operator ids. We would need to
> add
> > the logic to the plan generation [2]. The response is a little confusing
> > because there is a field in the vertices called "operator name". I
> propose
> > to add a new field called "operators" to the vertex response object,
> which
> > would be a list of objects with the structure
> >
> > Operator
> > {
> >   "id": "THE-FLINK-GENERATED-ID"
> > }.
> >
> > The JobManagerOperatorQueryScopeInfo has three fields: jobID, vertexID
> and
> > > operatorName. So we should use the operator name in the API.
> > > If you think we should use the operator id, there need be more changes
> > > about it.
> > >
> > I think we should use operator id since it uniquely identifies an
> > operator--on the contrary, the operator name does not (it may be empty or
> > repeated between operators by the user). I actually had a question on
> that
> > since you implemented the metric group. What's the reason we use operator
> > name currently? Could it also use operator id so we can match against the
> > id?
> >
> > [1]
> > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/
> > [2]
> >
> >
> https://github.com/apache/flink/blob/416cb7aaa02c176e01485ff11ab4269f76b5e9e2/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java#L53
> >
> > Best,
> > Mason
> >
> >
> > On Thu, Jan 11, 2024 at 10:54 PM Hang Ruan 
> wrote:
> >
> > > Hi, Mason.
> > >
> > > Thanks for driving this FLIP.
> > >
> > > The JobManagerOperatorQueryScopeInfo has three fields: jobID, vertexID
> > and
> > > operatorName. So we should use the operator name in the API.
> > > If you think we should use the operator id, there need be more changes
> > > about it.
> > >
> > > About the Xuyang's questions, we add both vertexID and operatorID
> > > information because of the operator chain.
> > > A operator chain has a vertexID and contains many different operators.
> > The
> > > operator information helps to distinguish them in the same operator
> > chain.
> > >
> > > Best,
> > > Hang
> > >
> > >
> > > Xuyang  于2024年1月12日周五 10:21写道:
> > >
> > > > Hi, Mason.
> > > > Thanks for driving this Flip. I think it's important for external
> > system
> > > > to be able to
> > > > perceive the metric of the operator coordinator. +1 for it.
> > > >
> > > >
> > > > I just have the following minor questions and am looking forward to
> > your
> > > > reply. Please forgive
> > > > me if I have some misunderstandings.
> > > >
> > > >
> > > > 1. IIRC, in a sense, operator ID and vertex ID are the same thing.
> The
> > > > operator ID can
> > > > be converted from the vertex ID[1]. Therefore, it is somewhat strange
> > to
> > > > have both vertex
> > > > ID and operator ID in a single URL.
> > > >
> > > >
> > > > 2. If I misunderstood the semantics of operator IDs here, then what
> is
> > > the
> > > > relationship
> > > > between vertex ID and operator ID, and do we need a url like
> > > > `/jobs//vertices//operators/`
> > > > to list all operator ids under this vertices?
> > > >
> > > >
> > > >
> > > >
> > > > [1]
> > > >
> > >
>

Re: Re: [VOTE] Accept Flink CDC into Apache Flink

2024-01-09 Thread Lijie Wang
+1 (non-binding)

Best,
Lijie

Jiabao Sun  于2024年1月9日周二 19:28写道:

> +1 (non-binding)
>
> Best,
> Jiabao
>
>
> On 2024/01/09 09:58:04 xiangyu feng wrote:
> > +1 (non-binding)
> >
> > Regards,
> > Xiangyu Feng
> >
> > Danny Cranmer  于2024年1月9日周二 17:50写道:
> >
> > > +1 (binding)
> > >
> > > Thanks,
> > > Danny
> > >
> > > On Tue, Jan 9, 2024 at 9:31 AM Feng Jin  wrote:
> > >
> > > > +1 (non-binding)
> > > >
> > > > Best,
> > > > Feng Jin
> > > >
> > > > On Tue, Jan 9, 2024 at 5:29 PM Yuxin Tan  wrote:
> > > >
> > > > > +1 (non-binding)
> > > > >
> > > > > Best,
> > > > > Yuxin
> > > > >
> > > > >
> > > > > Márton Balassi  于2024年1月9日周二 17:25写道:
> > > > >
> > > > > > +1 (binding)
> > > > > >
> > > > > > On Tue, Jan 9, 2024 at 10:15 AM Leonard Xu 
> > > wrote:
> > > > > >
> > > > > > > +1(binding)
> > > > > > >
> > > > > > > Best,
> > > > > > > Leonard
> > > > > > >
> > > > > > > > 2024年1月9日 下午5:08,Yangze Guo  写道:
> > > > > > > >
> > > > > > > > +1 (non-binding)
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Yangze Guo
> > > > > > > >
> > > > > > > > On Tue, Jan 9, 2024 at 5:06 PM Robert Metzger <
> > > rmetz...@apache.org
> > > > >
> > > > > > > wrote:
> > > > > > > >>
> > > > > > > >> +1 (binding)
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> On Tue, Jan 9, 2024 at 9:54 AM Guowei Ma 
> > > > > > wrote:
> > > > > > > >>
> > > > > > > >>> +1 (binding)
> > > > > > > >>> Best,
> > > > > > > >>> Guowei
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>> On Tue, Jan 9, 2024 at 4:49 PM Rui Fan <19...@gmail.com>
> > > > > wrote:
> > > > > > > >>>
> > > > > > >  +1 (non-binding)
> > > > > > > 
> > > > > > >  Best,
> > > > > > >  Rui
> > > > > > > 
> > > > > > >  On Tue, Jan 9, 2024 at 4:41 PM Hang Ruan <
> > > > ruanhang1...@gmail.com>
> > > > > > > wrote:
> > > > > > > 
> > > > > > > > +1 (non-binding)
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Hang
> > > > > > > >
> > > > > > > > gongzhongqiang  于2024年1月9日周二
> > > > 16:25写道:
> > > > > > > >
> > > > > > > >> +1 non-binding
> > > > > > > >>
> > > > > > > >> Best,
> > > > > > > >> Zhongqiang
> > > > > > > >>
> > > > > > > >> Leonard Xu  于2024年1月9日周二 15:05写道:
> > > > > > > >>
> > > > > > > >>> Hello all,
> > > > > > > >>>
> > > > > > > >>> This is the official vote whether to accept the Flink
> CDC
> > > > code
> > > > > > > >> contribution
> > > > > > > >>> to Apache Flink.
> > > > > > > >>>
> > > > > > > >>> The current Flink CDC code, documentation, and website
> can
> > > be
> > > > > > > >>> found here:
> > > > > > > >>> code:
> https://github.com/ververica/flink-cdc-connectors <
> > > > > > > >>> https://github.com/ververica/flink-cdc-connectors>
> > > > > > > >>> docs:
> https://ververica.github.io/flink-cdc-connectors/ <
> > > > > > > >>> https://ververica.github.io/flink-cdc-connectors/>
> > > > > > > >>>
> > > > > > > >>> This vote should capture whether the Apache Flink
> community
> > > > is
> > > > > > > > interested
> > > > > > > >>> in accepting, maintaining, and evolving Flink CDC.
> > > > > > > >>>
> > > > > > > >>> Regarding my original proposal[1] in the dev mailing
> list,
> > > I
> > > > > > firmly
> > > > > > > >> believe
> > > > > > > >>> that this initiative aligns perfectly with Flink. For
> the
> > > > Flink
> > > > > > > >> community,
> > > > > > > >>> it represents an opportunity to bolster Flink's
> competitive
> > > > > edge
> > > > > > in
> > > > > > > >>> streaming
> > > > > > > >>> data integration, fostering the robust growth and
> > > prosperity
> > > > of
> > > > > > the
> > > > > > > >> Apache
> > > > > > > >>> Flink
> > > > > > > >>> ecosystem. For the Flink CDC project, becoming a
> > > sub-project
> > > > of
> > > > > > >  Apache
> > > > > > > >>> Flink
> > > > > > > >>> means becoming an integral part of a neutral
> open-source
> > > > > > community,
> > > > > > > >>> capable of
> > > > > > > >>> attracting a more diverse pool of contributors.
> > > > > > > >>>
> > > > > > > >>> All Flink CDC maintainers are dedicated to continuously
> > > > > > > >>> contributing
> > > > > > >  to
> > > > > > > >>> achieve
> > > > > > > >>> seamless integration with Flink. Additionally, PMC
> members
> > > > like
> > > > > > > >>> Jark,
> > > > > > > >>> Qingsheng,
> > > > > > > >>> and I are willing to infacilitate the expansion of
> > > > contributors
> > > > > > and
> > > > > > > >>> committers to
> > > > > > > >>> effectively maintain this new sub-project.
> > > > > > > >>>
> > > > > > > >>> This is a "Adoption of a new Codebase" vote as per the
> > > Flink
> > > > > > bylaws
> > > > > > > > [2].
> > > > > > > >>> Only PMC votes are binding. The vote will be open at
> least
> > > 7
> > > > > days
> > > > > > > >>> (excluding weekends), meaning until Thursd

Re: [ANNOUNCE] New Apache Flink Committer - Alexander Fedulov

2024-01-02 Thread Lijie Wang
Congratulations Alex !

Best,
Lijie

Romit Mahanta  于2024年1月3日周三 13:41写道:

> Happy New Year & congratulations Alex!
>
> Best,
>
> R
>
> On Tue, 2 Jan, 2024, 5:45 pm Maximilian Michels,  wrote:
>
> > Happy New Year everyone,
> >
> > I'd like to start the year off by announcing Alexander Fedulov as a
> > new Flink committer.
> >
> > Alex has been active in the Flink community since 2019. He has
> > contributed more than 100 commits to Flink, its Kubernetes operator,
> > and various connectors [1][2].
> >
> > Especially noteworthy are his contributions on deprecating and
> > migrating the old Source API functions and test harnesses, the
> > enhancement to flame graphs, the dynamic rescale time computation in
> > Flink Autoscaling, as well as all the small enhancements Alex has
> > contributed which make a huge difference.
> >
> > Beyond code contributions, Alex has been an active community member
> > with his activity on the mailing lists [3][4], as well as various
> > talks and blog posts about Apache Flink [5][6].
> >
> > Congratulations Alex! The Flink community is proud to have you.
> >
> > Best,
> > The Flink PMC
> >
> > [1]
> > https://github.com/search?type=commits&q=author%3Aafedulov+org%3Aapache
> > [2]
> >
> https://issues.apache.org/jira/browse/FLINK-28229?jql=status%20in%20(Resolved%2C%20Closed)%20AND%20assignee%20in%20(afedulov)%20ORDER%20BY%20resolved%20DESC%2C%20created%20DESC
> > [3] https://lists.apache.org/list?dev@flink.apache.org:lte=100M:Fedulov
> > [4] https://lists.apache.org/list?u...@flink.apache.org:lte=100M:Fedulov
> > [5]
> >
> https://flink.apache.org/2020/01/15/advanced-flink-application-patterns-vol.1-case-study-of-a-fraud-detection-system/
> > [6]
> >
> https://www.ververica.com/blog/presenting-our-streaming-concepts-introduction-to-flink-video-series
> >
>


[jira] [Created] (FLINK-33968) Compute the number of subpartitions when initializing executon job vertices

2024-01-02 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-33968:
--

 Summary: Compute the number of subpartitions when initializing 
executon job vertices
 Key: FLINK-33968
 URL: https://issues.apache.org/jira/browse/FLINK-33968
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Lijie Wang
Assignee: Lijie Wang


Currently, when using dynamic graphs, the subpartition-num of a task is lazily 
calculated until the task deployment moment, this may lead to some 
uncertainties in job recovery scenarios.

Before jm crashs, when deploying upstream tasks, the parallelism of downstream 
vertex may be unknown, so the subpartiton-num will be the max parallelism of 
downstream job vertex. However, after jm restarts, when deploying upstream 
tasks, the parallelism of downstream job vertex may be known(has been 
calculated before jm crashs and been recovered after jm restarts), so the 
subpartiton-num will be the actual parallelism of downstream job vertex.
 
The difference of calculated subpartition-num will lead to the partitions 
generated before jm crashs cannot be reused after jm restarts.

We will solve this problem by advancing the calculation of subpartitoin-num to 
the moment of initializing executon job vertex (in ctor of 
IntermediateResultPartition)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-406: Reorganize State & Checkpointing & Recovery Configuration

2023-12-26 Thread Lijie Wang
Hi Zakelly,

>> I'm wondering if `execution.checkpointing.savepoint-dir` would be better.

`execution.checkpointing.dir` and `execution.checkpointing.savepoint-dir`
are also fine for me.

>> So I think an enumeration option `execution.checkpointing.mode` which
can be 'full' (default) or 'incremental' would be better

I agree with using an enumeration option. But currently there is already a
configuration option called `execution.checkpointing.mode`, which is used
to choose EXACTLY_ONCE or AT_LEAST_ONCE. Maybe we need to use another name
or merge these two options.

Best,
Lijie

Zakelly Lan  于2023年12月27日周三 11:43写道:

> Hi everyone,
>
> Thanks all for your comments!
>
> @Yanfei
>
> > 1. For some state backends that do not support incremental checkpoint,
> > how does the execution.checkpointing.incrementaloption take effect? Or
> > is it better to put incremental under state.backend.xxx.incremental?
> >
> I'd rather not put the option for incremental checkpoint under the
> 'state.backend', since it is more about the checkpointing instead of state
> accessing. Of course, the state backend may not necessarily do incremental
> checkpoint as requested. If the state backend is not capable of taking
> incremental cp, it is better to fallback to the full cp.
>
> 2. I'm a little worried that putting all configurations into
> > `ExecutionCheckpointingOptions` will introduce some dependency
> > problems. Some options would be used by flink-runtime module, but
> > flink-runtime should not depend on flink-streaming-java. e.g.
> > FLINK-28286[1].
> > So, I prefer to move configurations to `CheckpointingOptions`, WDYT?
> >
>
> Yes, that's a very good point.  Moving to
> `CheckpointingOptions`(flink-core) makes sense.
>
> @Lijie
>
> How about
> > state.savepoints.dir -> execution.checkpointing.savepoint.dir
> > state.checkpoints.dir -> execution.checkpointing.checkpoint.dir
>
>
> Actually, I think the `checkpointing.checkpoint` may cause some confusion.
> But I'm ok if others agree.
> I'm wondering if `execution.checkpointing.savepoint-dir` would be better.
> WDYT?
>
> 2. We changed the execution.checkpointing.local-copy' to
> > 'execution.checkpointing.local-copy.enabled'. Should we also add
> "enabled"
> > suffix for other boolean type configuration options ? For example,
> > execution.checkpointing.incremental ->
> > execution.checkpointing.incremental.enabled
> >
>
> Actually, the incremental cp is something like choosing a mode for doing
> checkpoint instead of enabling a function. So I think an enumeration option
> `execution.checkpointing.mode` which can be 'full' (default) or
> 'incremental' would be better, WDYT?
> And @Rui Fan @Yanfei What do you think about this?
>
>
> On Tue, Dec 26, 2023 at 5:15 PM Lijie Wang 
> wrote:
>
> > Hi Zakelly,
> >
> > Thanks for driving the discussion.
> >
> > 1.
> > >> But I'm not so sure since there is only one savepoint-related option.
> > Maybe someone else could share some thoughts here.
> >
> > How about
> > state.savepoints.dir -> execution.checkpointing.savepoint.dir
> > state.checkpoints.dir -> execution.checkpointing.checkpoint.dir
> >
> > 2. We changed the execution.checkpointing.local-copy' to
> > 'execution.checkpointing.local-copy.enabled'. Should we also add
> "enabled"
> > suffix for other boolean type configuration options ? For example,
> > execution.checkpointing.incremental ->
> > execution.checkpointing.incremental.enabled
> >
> > In this way, the naming style of configuration options is unified, and it
> > can avoid potential similar problems (for example, we may need to add
> more
> > options for incremental checkpoint in the future).
> >
> > Best,
> > Lijie
> >
> > Yanfei Lei  于2023年12月26日周二 12:05写道:
> >
> > > Hi Zakelly,
> > >
> > > Thank you for creating the FLIP and starting the discussion.
> > >
> > > The current arrangement of these options is indeed somewhat haphazard,
> > > and the new arrangement looks much better. I have some questions about
> > > the arrangement of some new configuration options:
> > >
> > > 1. For some state backends that do not support incremental checkpoint,
> > > how does the execution.checkpointing.incrementaloption take effect? Or
> > > is it better to put incremental under state.backend.xxx.incremental?
> > >
> > > 2. I'm a little worried that putting all

Re: [DISCUSS] FLIP-406: Reorganize State & Checkpointing & Recovery Configuration

2023-12-26 Thread Lijie Wang
Hi Zakelly,

Thanks for driving the discussion.

1.
>> But I'm not so sure since there is only one savepoint-related option.
Maybe someone else could share some thoughts here.

How about
state.savepoints.dir -> execution.checkpointing.savepoint.dir
state.checkpoints.dir -> execution.checkpointing.checkpoint.dir

2. We changed the execution.checkpointing.local-copy' to
'execution.checkpointing.local-copy.enabled'. Should we also add "enabled"
suffix for other boolean type configuration options ? For example,
execution.checkpointing.incremental ->
execution.checkpointing.incremental.enabled

In this way, the naming style of configuration options is unified, and it
can avoid potential similar problems (for example, we may need to add more
options for incremental checkpoint in the future).

Best,
Lijie

Yanfei Lei  于2023年12月26日周二 12:05写道:

> Hi Zakelly,
>
> Thank you for creating the FLIP and starting the discussion.
>
> The current arrangement of these options is indeed somewhat haphazard,
> and the new arrangement looks much better. I have some questions about
> the arrangement of some new configuration options:
>
> 1. For some state backends that do not support incremental checkpoint,
> how does the execution.checkpointing.incrementaloption take effect? Or
> is it better to put incremental under state.backend.xxx.incremental?
>
> 2. I'm a little worried that putting all configurations into
> `ExecutionCheckpointingOptions` will introduce some dependency
> problems. Some options would be used by flink-runtime module, but
> flink-runtime should not depend on flink-streaming-java. e.g.
> FLINK-28286[1].
> So, I prefer to move configurations to `CheckpointingOptions`, WDYT?
>
> [1] https://issues.apache.org/jira/browse/FLINK-28286
>
> --
> Best,
> Yanfei
>
> Zakelly Lan  于2023年12月25日周一 21:14写道:
> >
> > Hi Rui Fan and Junrui,
> >
> > Thanks for the reminder! I agree to change the
> > 'execution.checkpointing.local-copy' to
> > 'execution.checkpointing.local-copy.enabled'.
> >
> > And for other suggestions Rui proposed:
> >
> > 1. How about execution.checkpointing.storage.type instead
> > > of execution.checkpointing.storage?
> >
> >
> > Ah, I missed something here. Actually I suggest we could merge the
> current
> > 'state.checkpoints.dir' and 'state.checkpoint-storage' into one URI
> > configuration named 'execution.checkpointing.dir'. WDYT?
> >
> > 3. execution.checkpointing.savepoint.dir is a little weird.
> > >
> >
> > Yes, I think it is better to make 'savepoint' and 'checkpoint' the same
> > level. But I'm not so sure since there is only one savepoint-related
> > option. Maybe someone else could share some thoughts here.
> >
> > 4. How about execution.recovery.claim-mode instead of
> > > execution.recovery.mode?
> > >
> >
> >  Agreed. That's more accurate.
> >
> >
> > Many thanks for your suggestions!
> >
> > Best,
> > Zakelly
> >
> > On Mon, Dec 25, 2023 at 8:18 PM Junrui Lee  wrote:
> >
> > > Hi Zakelly,
> > >
> > > Thanks for driving this. I agree that the proposed restructuring of the
> > > configuration options is largely positive. It will make understanding
> and
> > > working with Flink configurations more intuitive.
> > >
> > > Most of the proposed changes look great. Just a heads-up, as Rui Fan
> > > mentioned, Flink currently requires that no configOption's key be the
> > > prefix of another to avoid issues when we eventually adopt a standard
> YAML
> > > parser, as detailed in FLINK-29372 (
> > > https://issues.apache.org/jira/browse/FLINK-29372). Therefore, it's
> better
> > > to change the key 'execution.checkpointing.local-copy' because it
> serves as
> > > a prefix to the key 'execution.checkpointing.local-copy.dir'.
> > >
> > > Best regards,
> > > Junrui
> > >
> > > Rui Fan <1996fan...@gmail.com> 于2023年12月25日周一 19:11写道:
> > >
> > > > Hi Zakelly,
> > > >
> > > > Thank you for driving this proposal!
> > > >
> > > > Overall good for me. I have some questions about these names.
> > > >
> > > > 1. How about execution.checkpointing.storage.type instead of
> > > > execution.checkpointing.storage?
> > > >
> > > > It's similar to state.backend.type.
> > > >
> > > > 2. How about execution.checkpointing.local-copy.enabled instead of
> > > > execution.checkpointing.local-copy?
> > > >
> > > > You added a new option: execution.checkpointing.local-copy.dir.
> > > > IIUC, one option name shouldn't be the prefix of other options.
> > > > If you add a new option execution.checkpointing.local-copy,
> > > > flink CI will fail directly.
> > > >
> > > > 3. execution.checkpointing.savepoint.dir is a little weird.
> > > >
> > > > For old options: state.savepoints.dir and state.checkpoints.dir,
> > > > the savepoint and checkpoint are the same level. It means
> > > > it's a checkpoint or savepoint.
> > > >
> > > > The new option execution.checkpointing.dir is fine for me.
> > > > However, execution.checkpointing.savepoint.dir is a little weird.
> > > > I don't know which name is better now. Let us think about it m

[jira] [Created] (FLINK-33892) FLIP-383: Support Job Recovery for Batch Jobs

2023-12-19 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-33892:
--

 Summary: FLIP-383: Support Job Recovery for Batch Jobs
 Key: FLINK-33892
 URL: https://issues.apache.org/jira/browse/FLINK-33892
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Coordination
Reporter: Lijie Wang


This is the umbrella ticket for 
[FLIP-383|https://cwiki.apache.org/confluence/display/FLINK/FLIP-383%3A+Support+Job+Recovery+for+Batch+Jobs]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[RESULT][VOTE] FLIP-383: Support Job Recovery for Batch Jobs

2023-12-19 Thread Lijie Wang
Hi devs,

I'm happy to announce that FLIP-383 [1] has been approved unanimously.
According to the vote thread[2], there are 5 approving votes, 4 of which
are binding:

Xintong Song (binding)
Zhu Zhu (binding)
Weijie Guo (binding)
Rui Fan (binding)
Yuepeng Pan (non-binding)

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-383%3A+Support+Job+Recovery+for+Batch+Jobs
[2] https://lists.apache.org/thread/vkmghnohx3tl6h19of43hs75c9tnxh4w

Best,
Lijie


Re: [VOTE] FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs

2023-12-17 Thread Lijie Wang
+1 (binding)

Best,
Lijie

Yuxin Tan  于2023年12月15日周五 17:14写道:

> +1 (non-binding)
>
> Best,
> Yuxin
>
>
> weijie guo  于2023年12月15日周五 10:05写道:
>
> > +1(binding)
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > Wencong Liu  于2023年12月15日周五 09:13写道:
> >
> > > Hi dev,
> > >
> > > I'd like to start a vote on FLIP-382.
> > >
> > > Discussion thread:
> > > https://lists.apache.org/thread/3mgsc31odtpmzzl32s4oqbhlhxd0mn6b
> > > FLIP:
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs
> > >
> > > Best regards,
> > > Wencong Liu
> >
>


[VOTE] FLIP-383: Support Job Recovery for Batch Jobs

2023-12-13 Thread Lijie Wang
Hi devs, Thanks for all feedback about the FLIP-383: Support Job Recovery
for Batch Jobs[1]. This FLIP was discussed in [2].

I'd like to start a vote for it. The vote will be open for at least 72
hours (until December 19th 12:00 GMT) unless there is an objection or
insufficient votes. [1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-383%3A+Support+Job+Recovery+for+Batch+Jobs

[2] https://lists.apache.org/thread/074z237c07vtj74685nxo6bttkq3kshz
  Best, Lijie


Re: [DISCUSS][2.0] FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs

2023-12-11 Thread Lijie Wang
Hi Wencong

Thanks for driving the discussion, +1 for the proposal. I left two minor
questions/suggestions:

1. Is the getTaskNameWithIndexAndAttemptNumber method a bit redundant? It
can be replaced by getTaskName + getTaskIndex + getAttemptNumber.
2. I think it would be better if we can explicitly specify the alternative
(based on TaskInfo/JobInfo) for each deprecated method

Best,
Lijie

Wencong Liu  于2023年11月30日周四 14:50写道:

> Hi devs,
>
> I would like to start a discussion on FLIP-382: Unify the Provision
> of Diverse Metadata for Context-like APIs [1].
>
> In the Flink project, the context-like APIs are interfaces annotated by
> @Public and supply runtime metadata and functionalities to its modules and
> components. RuntimeContext is such an interface with 27 methods for
> accessing metadata and framework functionalities. Currently, any
> addition of metadata requires updating the RuntimeContext interface
> and all 12 of its implementation classes, leading to high code
> maintenance costs. To improve this, we propose to a categorize all
> metadata into some metadata classes and provide it through dedicated
> methods in context-like APIs. The newly provided metadata in context-like
> API will only require to update the metadata classes, not context-like API
> itself or it's implementations.
>
> Looking forward to your feedback.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs
>
> Best regards,
> Wencong Liu


Re: [DISCUSS] FLIP-383: Support Job Recovery for Batch Jobs

2023-12-11 Thread Lijie Wang
Hi devs,

After an offline discussion with the Apache Celeborn folks, we changed the
signatures of "snapshotState" and "retoreState" as follows:

void snapshotState(CompletableFuture snapshotFuture,
ShuffleMasterSnapshotContext context);
void restoreState(List snapshots);

We believe the above signatures to be more general and flexible:
1. ShuffleMasterSnapshotContext can provide the necessary information for
taking snapshots.
2. Abstracts the ShuffleMasterSnapshot interface, and supports two kinds of
snapshots: full and incremental. We can use incremental snapshots when the
internal state is too large, to make the snapshot operations faster and
occupy less storage.

See FLIP  "JobEvent" section for details.

By the way, if there are no more questions, we will start voting tomorrow.

Best,
Lijie

Lijie Wang  于2023年12月5日周二 22:57写道:

> Hi Paul,
>
> I believe Xintong has answered your question.
>
> >> IIUC, in the FLIP, the main method is lost after the recovery, and only
> submitted jobs would be recovered. Is that right?
>
> You are right, we can't recover the execution progress of main method. So
> after JM crashs, only the submitted and in-completed jobs (as Xintong said,
> completed jobs will not be re-run) will be recovered and continue to run.
>
> Best,
> Lijie
>
> Xintong Song  于2023年12月5日周二 18:30写道:
>
>> @Paul,
>>
>>
>> Do you mean the scenario where users call `evn.execute()` multiple times
>> in
>> the `main()` method? I believe that is not supported currently when HA is
>> enabled, for the exact same reason you mentioned that Flink is not aware
>> of
>> which jobs are executed and which are not.
>>
>>
>> On the other hand, if an external scheduler is used to submit multiple
>> jobs
>> to a session cluster, Flink already has a JobResultStore for persisting
>> information about successfully completed jobs, so that only in-completed
>> jobs will be recovered. See FLIP-194[1] for more details.
>>
>>
>> Best,
>>
>> Xintong
>>
>>
>> [1]
>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore
>>
>> On Tue, Dec 5, 2023 at 6:01 PM Xintong Song 
>> wrote:
>>
>> > Thanks for addressing my comments, Lijie. LGTM
>> >
>> > Best,
>> >
>> > Xintong
>> >
>> >
>> >
>> > On Tue, Dec 5, 2023 at 2:56 PM Paul Lam  wrote:
>> >
>> >> Hi Lijie,
>> >>
>> >> Recovery for batch jobs is no doubt a long-awaited feature. Thanks for
>> >> the proposal!
>> >>
>> >> I’m concerned about the multi-job scenario. In session mode, users
>> could
>> >> use web submission to upload and run jars which may produce multiple
>> >> Flink jobs. However, these jobs may not be submitted at once and run in
>> >> parallel. Instead, they could be dependent on other jobs like a DAG.
>> The
>> >> schedule of the jobs is controlled by the user's main method.
>> >>
>> >> IIUC, in the FLIP, the main method is lost after the recovery, and only
>> >> submitted jobs would be recovered. Is that right?
>> >>
>> >> Best,
>> >> Paul Lam
>> >>
>> >> > 2023年11月2日 18:00,Lijie Wang  写道:
>> >> >
>> >> > Hi devs,
>> >> >
>> >> > Zhu Zhu and I would like to start a discussion about FLIP-383:
>> Support
>> >> Job
>> >> > Recovery for Batch Jobs[1]
>> >> >
>> >> > Currently, when Flink’s job manager crashes or gets killed, possibly
>> >> due to
>> >> > unexpected errors or planned nodes decommission, it will cause the
>> >> > following two situations:
>> >> > 1. Failed, if the job does not enable HA.
>> >> > 2. Restart, if the job enable HA. If it’s a streaming job, the job
>> will
>> >> be
>> >> > resumed from the last successful checkpoint. If it’s a batch job, it
>> >> has to
>> >> > run from beginning, all previous progress will be lost.
>> >> >
>> >> > In view of this, we think the JM crash may cause great regression for
>> >> batch
>> >> > jobs, especially long running batch jobs. This FLIP is mainly to
>> solve
>> >> this
>> >> > problem so that batch jobs can recover most job progress after JM
>> >> crashes.
>> >> > In this FLIP, our goal is to let most finished tasks not need to be
>> >> re-run.
>> >> >
>> >> > You can find more details in the FLIP-383[1]. Looking forward to your
>> >> > feedback.
>> >> >
>> >> > [1]
>> >> >
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-383%3A+Support+Job+Recovery+for+Batch+Jobs
>> >> >
>> >> > Best,
>> >> > Lijie
>> >>
>> >>
>>
>


Re: [DISCUSS] FLIP-383: Support Job Recovery for Batch Jobs

2023-12-05 Thread Lijie Wang
Hi Paul,

I believe Xintong has answered your question.

>> IIUC, in the FLIP, the main method is lost after the recovery, and only
submitted jobs would be recovered. Is that right?

You are right, we can't recover the execution progress of main method. So
after JM crashs, only the submitted and in-completed jobs (as Xintong said,
completed jobs will not be re-run) will be recovered and continue to run.

Best,
Lijie

Xintong Song  于2023年12月5日周二 18:30写道:

> @Paul,
>
>
> Do you mean the scenario where users call `evn.execute()` multiple times in
> the `main()` method? I believe that is not supported currently when HA is
> enabled, for the exact same reason you mentioned that Flink is not aware of
> which jobs are executed and which are not.
>
>
> On the other hand, if an external scheduler is used to submit multiple jobs
> to a session cluster, Flink already has a JobResultStore for persisting
> information about successfully completed jobs, so that only in-completed
> jobs will be recovered. See FLIP-194[1] for more details.
>
>
> Best,
>
> Xintong
>
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore
>
> On Tue, Dec 5, 2023 at 6:01 PM Xintong Song  wrote:
>
> > Thanks for addressing my comments, Lijie. LGTM
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Tue, Dec 5, 2023 at 2:56 PM Paul Lam  wrote:
> >
> >> Hi Lijie,
> >>
> >> Recovery for batch jobs is no doubt a long-awaited feature. Thanks for
> >> the proposal!
> >>
> >> I’m concerned about the multi-job scenario. In session mode, users could
> >> use web submission to upload and run jars which may produce multiple
> >> Flink jobs. However, these jobs may not be submitted at once and run in
> >> parallel. Instead, they could be dependent on other jobs like a DAG. The
> >> schedule of the jobs is controlled by the user's main method.
> >>
> >> IIUC, in the FLIP, the main method is lost after the recovery, and only
> >> submitted jobs would be recovered. Is that right?
> >>
> >> Best,
> >> Paul Lam
> >>
> >> > 2023年11月2日 18:00,Lijie Wang  写道:
> >> >
> >> > Hi devs,
> >> >
> >> > Zhu Zhu and I would like to start a discussion about FLIP-383: Support
> >> Job
> >> > Recovery for Batch Jobs[1]
> >> >
> >> > Currently, when Flink’s job manager crashes or gets killed, possibly
> >> due to
> >> > unexpected errors or planned nodes decommission, it will cause the
> >> > following two situations:
> >> > 1. Failed, if the job does not enable HA.
> >> > 2. Restart, if the job enable HA. If it’s a streaming job, the job
> will
> >> be
> >> > resumed from the last successful checkpoint. If it’s a batch job, it
> >> has to
> >> > run from beginning, all previous progress will be lost.
> >> >
> >> > In view of this, we think the JM crash may cause great regression for
> >> batch
> >> > jobs, especially long running batch jobs. This FLIP is mainly to solve
> >> this
> >> > problem so that batch jobs can recover most job progress after JM
> >> crashes.
> >> > In this FLIP, our goal is to let most finished tasks not need to be
> >> re-run.
> >> >
> >> > You can find more details in the FLIP-383[1]. Looking forward to your
> >> > feedback.
> >> >
> >> > [1]
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-383%3A+Support+Job+Recovery+for+Batch+Jobs
> >> >
> >> > Best,
> >> > Lijie
> >>
> >>
>


Re: [DISCUSS] FLIP-383: Support Job Recovery for Batch Jobs

2023-12-04 Thread Lijie Wang
Thanks for raising this valueable point, Xintong

Supporting external shuffle service makes sense to me. In order to recover
the internal states of ShuffleMaster after JM restarts, we will add the
following 3 methods to ShuffleMaster:

boolean supportsBatchSnapshot();
void snapshotState(CompletableFuture snapshotFuture);
void restoreState(byte[] snapshotData);

We will provide empty implementations by default. If an external service
wants to support Job Recovery, it needs to override these methods.  Before
the job starts running, we will check whether the shuffle master supports
taking snapshots(through method supportsBatchSnapshot). If it is not
supported, we will disable Job Recovery for jobs.

The default Netty/TM shuffle is stateless, so we only need to override the
"supportsBatchSnapshot" method to let it return true ("snapshotState" and
"restoreState" keep empty implementations).

You can find more details in FLIP  "JobEvent" section.

Best,
Lijie

Xintong Song  于2023年12月4日周一 15:34写道:

> Thanks for the proposal, Lijie and Zhu.
>
> I have been having offline discussions with the Apache Celeborn folks
> regarding integrating Apache Celeborn into Flink's Hybrid Shuffle mode. One
> thing coming from those discussions that might relate to this FLIP is that
> Celeborn maintains some internal states inside its LifecycleManager (think
> of this as a component resident in Flink's Shuffle Master), which would
> also need persistent and recovery in order for the partitions to be reused
> after a JM crash. Given that Flink supports pluggable shuffle services,
> there could be other custom shuffle services with similar demands. I wonder
> if it makes sense to also add interfaces that take snapshots from Shuffle
> Master once a while, and provide such snapshots to Shuffle Master upon
> recovery?
>
> Best,
>
> Xintong
>
>
>
> On Thu, Nov 30, 2023 at 5:48 PM Lijie Wang 
> wrote:
>
> > Hi Guowei,
> >
> > Thanks for your feedback.
> >
> > >> As far as I know, there are multiple job managers on standby in some
> > scenarios. In this case, is your design still effective?
> > I think it's still effective. There will only be one leader. After
> becoming
> > the leader, the startup process of JobMaster is the same as only one
> > jobmanger restarts, so I think the current process should also be
> > applicable to multi-jobmanager situation. We will also do some tests to
> > cover this case.
> >
> > >> How do you rule out that there might still be some states in the
> memory
> > of the original operator coordinator?
> > Current restore process is the same as steraming jobs restore from
> > checkpoint(call the same methods) after failover, which is widely used in
> > production, so I think there is no problem.
> >
> > >> Additionally, using NO_CHECKPOINT seems a bit odd. Why not use a
> normal
> > checkpoint ID greater than 0 and record it in the event store?
> > We use -1(NO_CHECKPOINT) to distinguish it from a normal checkpoint, -1
> > indicates that this is a snapshot for the no-checkpoint/batch scenarios.
> >
> > Besides, considering that currently some operator coordinators may not
> > support taking snapshots in the no-checkpint/batch scenarios (or don't
> > support passing -1 as a checkpoint id), we think it is better to let the
> > developer explicitly specify whether it supports snapshots in the batch
> > scenario. Therefore, we intend to introduce the "SupportsBatchSnapshot"
> > interface for split enumerator and the "supportsBatchSnapshot" method for
> > operator coordinator. You can find more details in FLIP "Introduce
> > SupportsBatchSnapshot interface" and "JobEvent" sections.
> >
> > Looking forward to your further feedback.
> >
> > Best,
> > Lijie
> >
> > Guowei Ma  于2023年11月19日周日 10:47写道:
> >
> > > Hi,
> > >
> > >
> > > This is a very good proposal, as far as I know, it can solve some very
> > > critical production operations in certain scenarios. I have two minor
> > > issues:
> > >
> > > As far as I know, there are multiple job managers on standby in some
> > > scenarios. In this case, is your design still effective? I'm unsure if
> > you
> > > have conducted any tests. For instance, standby job managers might take
> > > over these failed jobs more quickly.
> > > Regarding the part about the operator coordinator, how can you ensure
> > that
> > > the checkpoint mechanism can restore the state of the operator
> > coordinator:
> > >

Re: [DISCUSS] FLIP-383: Support Job Recovery for Batch Jobs

2023-11-30 Thread Lijie Wang
Hi Guowei,

Thanks for your feedback.

>> As far as I know, there are multiple job managers on standby in some
scenarios. In this case, is your design still effective?
I think it's still effective. There will only be one leader. After becoming
the leader, the startup process of JobMaster is the same as only one
jobmanger restarts, so I think the current process should also be
applicable to multi-jobmanager situation. We will also do some tests to
cover this case.

>> How do you rule out that there might still be some states in the memory
of the original operator coordinator?
Current restore process is the same as steraming jobs restore from
checkpoint(call the same methods) after failover, which is widely used in
production, so I think there is no problem.

>> Additionally, using NO_CHECKPOINT seems a bit odd. Why not use a normal
checkpoint ID greater than 0 and record it in the event store?
We use -1(NO_CHECKPOINT) to distinguish it from a normal checkpoint, -1
indicates that this is a snapshot for the no-checkpoint/batch scenarios.

Besides, considering that currently some operator coordinators may not
support taking snapshots in the no-checkpint/batch scenarios (or don't
support passing -1 as a checkpoint id), we think it is better to let the
developer explicitly specify whether it supports snapshots in the batch
scenario. Therefore, we intend to introduce the "SupportsBatchSnapshot"
interface for split enumerator and the "supportsBatchSnapshot" method for
operator coordinator. You can find more details in FLIP "Introduce
SupportsBatchSnapshot interface" and "JobEvent" sections.

Looking forward to your further feedback.

Best,
Lijie

Guowei Ma  于2023年11月19日周日 10:47写道:

> Hi,
>
>
> This is a very good proposal, as far as I know, it can solve some very
> critical production operations in certain scenarios. I have two minor
> issues:
>
> As far as I know, there are multiple job managers on standby in some
> scenarios. In this case, is your design still effective? I'm unsure if you
> have conducted any tests. For instance, standby job managers might take
> over these failed jobs more quickly.
> Regarding the part about the operator coordinator, how can you ensure that
> the checkpoint mechanism can restore the state of the operator coordinator:
> For example:
> How do you rule out that there might still be some states in the memory of
> the original operator coordinator? After all, the implementation was done
> under the assumption of scenarios where the job manager doesn't fail.
> Additionally, using NO_CHECKPOINT seems a bit odd. Why not use a normal
> checkpoint ID greater than 0 and record it in the event store?
> If the issues raised in point 2 cannot be resolved in the short term, would
> it be possible to consider not supporting failover with a source job
> manager?
>
> Best,
> Guowei
>
>
> On Thu, Nov 2, 2023 at 6:01 PM Lijie Wang 
> wrote:
>
> > Hi devs,
> >
> > Zhu Zhu and I would like to start a discussion about FLIP-383: Support
> Job
> > Recovery for Batch Jobs[1]
> >
> > Currently, when Flink’s job manager crashes or gets killed, possibly due
> to
> > unexpected errors or planned nodes decommission, it will cause the
> > following two situations:
> > 1. Failed, if the job does not enable HA.
> > 2. Restart, if the job enable HA. If it’s a streaming job, the job will
> be
> > resumed from the last successful checkpoint. If it’s a batch job, it has
> to
> > run from beginning, all previous progress will be lost.
> >
> > In view of this, we think the JM crash may cause great regression for
> batch
> > jobs, especially long running batch jobs. This FLIP is mainly to solve
> this
> > problem so that batch jobs can recover most job progress after JM
> crashes.
> > In this FLIP, our goal is to let most finished tasks not need to be
> re-run.
> >
> > You can find more details in the FLIP-383[1]. Looking forward to your
> > feedback.
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-383%3A+Support+Job+Recovery+for+Batch+Jobs
> >
> > Best,
> > Lijie
> >
>


Re: [VOTE] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-11-29 Thread Lijie Wang
+1 (binding)

Best,
Lijie

Zhu Zhu  于2023年11月30日周四 13:13写道:

> +1
>
> Thanks,
> Zhu
>
> Xia Sun  于2023年11月30日周四 11:41写道:
>
> > Hi everyone,
> >
> > I'd like to start a vote on FLIP-379: Dynamic source parallelism
> inference
> > for batch jobs[1] which has been discussed in this thread [2].
> >
> > The vote will be open for at least 72 hours unless there is an objection
> or
> > not enough votes.
> >
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs
> > [2] https://lists.apache.org/thread/ocftkqy5d2x4n58wzprgm5qqrzzkbmb8
> >
> >
> > Best Regards,
> > Xia
> >
>


Re: [VOTE] FLIP-381: Deprecate configuration getters/setters that return/set complex Java objects

2023-11-12 Thread Lijie Wang
+1 (binding)

Best,
Lijie

Yuepeng Pan  于2023年11月10日周五 18:32写道:

> +1(non-binding)
>
> Best,
> Roc
>
> On 2023/11/10 03:58:10 Junrui Lee wrote:
> > Hi everyone,
> >
> > Thank you to everyone for the feedback on FLIP-381: Deprecate
> configuration
> > getters/setters that return/set complex Java objects[1] which has been
> > discussed in this thread [2].
> >
> > I would like to start a vote for it. The vote will be open for at least
> 72
> > hours (excluding weekends) unless there is an objection or not enough
> votes.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278464992
> > [2]https://lists.apache.org/thread/y5owjkfxq3xs9lmpdbl6d6jmqdgbjqxo
> >
>


Re: [DISCUSS] FLIP-381: Deprecate configuration getters/setters that return/set complex Java objects

2023-11-02 Thread Lijie Wang
Thanks Junrui for driving this.

Making configurations simple and consistent has great benefits for both
users and devs. +1 for the proposal.

Best,
Lijie

weijie guo  于2023年11月2日周四 16:49写道:

> Thanks Junrui for driving this proposal!
>
> I believe this is helpful for the new Process Function API. Because we
> don't need to move some related class/components from flink-core to a pure
> API module (maybe, called flink-core-api) after this. Even though the FLIP
> related to new API is in preparation atm, I still want to emphasize our
> goal is that user application should no longer depend on these stuff. So
> I'm + 1 for this proposal.
>
>
> Best regards,
>
> Weijie
>
>
> Zhu Zhu  于2023年11月2日周四 16:00写道:
>
> > Thanks Junrui for creating the FLIP and kicking off this discussion.
> >
> > The community has been constantly striving to unify and simplify the
> > configuration layer of Flink. Some progress has already been made,
> > such as FLINK-29379. However, the compatibility of public interfaces
> > poses an obstacle to completing the task. The release of Flink 2.0
> > presents a great opportunity to accomplish this goal.
> >
> > +1 for the proposal.
> >
> > Thanks,
> > Zhu
> >
> > Rui Fan <1996fan...@gmail.com> 于2023年11月2日周四 10:27写道:
> >
> > > Thanks Junrui for driving this proposal!
> > >
> > > ConfigOption is easy to use for flink users, easy to manage options
> > > for flink platform maintainers, and easy to maintain for flink
> developers
> > > and flink community.
> > >
> > > So big +1 for this proposal!
> > >
> > > Best,
> > > Rui
> > >
> > > On Thu, Nov 2, 2023 at 10:10 AM Junrui Lee 
> wrote:
> > >
> > > > Hi devs,
> > > >
> > > > I would like to start a discussion on FLIP-381: Deprecate
> configuration
> > > > getters/setters that return/set complex Java objects[1].
> > > >
> > > > Currently, the job configuration in FLINK is spread out across
> > different
> > > > components, which leads to inconsistencies and confusion. To address
> > this
> > > > issue, it is necessary to migrate non-ConfigOption complex Java
> objects
> > > to
> > > > use ConfigOption and adopt a single Configuration object to host all
> > the
> > > > configuration.
> > > > However, there is a significant blocker in implementing this
> solution.
> > > > These complex Java objects in StreamExecutionEnvironment,
> > > CheckpointConfig,
> > > > and ExecutionConfig have already been exposed through the public API,
> > > > making it challenging to modify the existing implementation.
> > > >
> > > > Therefore, I propose to deprecate these Java objects and their
> > > > corresponding getter/setter interfaces, ultimately removing them in
> > > > FLINK-2.0.
> > > >
> > > > Your feedback and thoughts on this proposal are highly appreciated.
> > > >
> > > > Best regards,
> > > > Junrui Lee
> > > >
> > > > [1]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278464992
> > > >
> > >
> >
>


[DISCUSS] FLIP-383: Support Job Recovery for Batch Jobs

2023-11-02 Thread Lijie Wang
Hi devs,

Zhu Zhu and I would like to start a discussion about FLIP-383: Support Job
Recovery for Batch Jobs[1]

Currently, when Flink’s job manager crashes or gets killed, possibly due to
unexpected errors or planned nodes decommission, it will cause the
following two situations:
1. Failed, if the job does not enable HA.
2. Restart, if the job enable HA. If it’s a streaming job, the job will be
resumed from the last successful checkpoint. If it’s a batch job, it has to
run from beginning, all previous progress will be lost.

In view of this, we think the JM crash may cause great regression for batch
jobs, especially long running batch jobs. This FLIP is mainly to solve this
problem so that batch jobs can recover most job progress after JM crashes.
In this FLIP, our goal is to let most finished tasks not need to be re-run.

You can find more details in the FLIP-383[1]. Looking forward to your
feedback.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-383%3A+Support+Job+Recovery+for+Batch+Jobs

Best,
Lijie


Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-10-31 Thread Lijie Wang
Hi Xia,

Thanks for driving this FLIP, +1 for the proposal.

I have 2 questions about the relationship between static inference and
dynamic inference:

1. AFAIK, currently the hive table source enable static inference by
default. In this case, which one (static vs dynamic) will take effect ? I
think it would be better if we can point this out in FLIP

2. As you mentioned above, dynamic inference is the most ideal way, so do
we have plan to deprecate the static inference in the future?

Best,
Lijie

Zhu Zhu  于2023年10月31日周二 20:19写道:

> Thanks for opening the FLIP and kicking off this discussion, Xia!
> The proposed changes make up an important missing part of the dynamic
> parallelism inference of adaptive batch scheduler.
>
> Besides that, it is also one good step towards supporting dynamic
> parallelism inference for streaming sources, e.g. allowing Kafka
> sources to determine its parallelism automatically based on the
> number of partitions.
>
> +1 for the proposal.
>
> Thanks,
> Zhu
>
> Xia Sun  于2023年10月31日周二 16:01写道:
>
> > Hi everyone,
> > I would like to start a discussion on FLIP-379: Dynamic source
> parallelism
> > inference for batch jobs[1].
> >
> > In general, there are three main ways to set source parallelism for batch
> > jobs:
> > (1) User-defined source parallelism.
> > (2) Connector static parallelism inference.
> > (3) Dynamic parallelism inference.
> >
> > Compared to manually setting parallelism, automatic parallelism inference
> > is easier to use and can better adapt to varying data volumes each day.
> > However, static parallelism inference cannot leverage runtime
> information,
> > resulting in inaccurate parallelism inference. Therefore, for batch jobs,
> > dynamic parallelism inference is the most ideal, but currently, the
> support
> > for adaptive batch scheduler is not very comprehensive.
> >
> > Therefore, we aim to introduce a general interface that enables the
> > adaptive batch scheduler to dynamically infer the source parallelism at
> > runtime. Please refer to the FLIP[1] document for more details about the
> > proposed design and implementation.
> >
> > I also thank Zhu Zhu and LiJie Wang for their suggestions during the
> > pre-discussion.
> > Looking forward to your feedback and suggestions, thanks.
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs
> >
> > Best regards,
> > Xia
> >
>


Re: [VOTE] Release 1.18.0, release candidate #2

2023-10-16 Thread Lijie Wang
+1 (non-binding)

 -  Verified the signature and checksum
 -  Built from the source code
 -  Ran an example job on yarn cluster
 -  Checked the website PR

Best,
Lijie

Jing Ge  于2023年10月16日周一 18:43写道:

> Hi everyone,
>
> Please review and vote on the release candidate #2 for the version
> 1.18.0, as follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
> The complete staging area is available for your review, which includes:
>
> * JIRA release notes [1], and the pull request adding release note for
> users [2]
> * the official Apache source release and binary convenience releases to be
> deployed to dist.apache.org [3], which are signed with the key with
> fingerprint 96AE0E32CBE6E0753CE6 [4],
> * all artifacts to be deployed to the Maven Central Repository [5],
> * source code tag "release-1.17.0-rc2" [6],
> * website pull request listing the new release and adding announcement blog
> post [7].
>
> The vote will be open for at least 72 hours. It is adopted by majority
> approval, with at least 3 PMC affirmative votes.
>
> Best regards,
> Konstantin, Qingsheng, Sergey, and Jing
>
> [1]
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12352885
> [2] https://github.com/apache/flink/pull/23527
> [3] https://dist.apache.org/repos/dist/dev/flink/flink-1.18.0-rc2/
> [4] https://dist.apache.org/repos/dist/release/flink/KEYS
> [5] https://repository.apache.org/content/repositories/orgapacheflink-1658
> [6] https://github.com/apache/flink/releases/tag/release-1.18.0-rc2
> [7] https://github.com/apache/flink-web/pull/680
>


Re: [ANNOUNCE] New Apache Flink Committer - Jane Chan

2023-10-15 Thread Lijie Wang
Congratulations Jane !

Best,
Lijie

Samrat Deb  于2023年10月16日周一 10:04写道:

> Congratulations Jane Chan :)
>
> On Mon, 16 Oct 2023 at 7:30 AM, tison  wrote:
>
> > Congrats! I noticed Jane has been around for a while; well-deserved.
> >
> > Best,
> > tison.
> >
> >
> > Jark Wu  于2023年10月16日周一 09:58写道:
> >
> > > Hi, everyone
> > >
> > > On behalf of the PMC, I'm very happy to announce Jane Chan as a new
> Flink
> > > Committer.
> > >
> > > Jane started code contribution in Jan 2021 and has been active in the
> > Flink
> > > community since. She authored more than 60 PRs and reviewed more than
> 40
> > > PRs. Her contribution mainly revolves around Flink SQL, including Plan
> > > Advice (FLIP-280), operator-level state TTL (FLIP-292), and ALTER TABLE
> > > statements (FLINK-21634). Jane participated deeply in development
> > > discussions and also helped answer user question emails. Jane was also
> a
> > > core contributor of Flink Table Store (now Paimon) when the project was
> > in
> > > the early days.
> > >
> > > Please join me in congratulating Jane Chan for becoming a Flink
> > Committer!
> > >
> > > Best,
> > > Jark Wu (on behalf of the Flink PMC)
> > >
> >
>


Re: [ANNOUNCE] New Apache Flink Committer - Ron Liu

2023-10-15 Thread Lijie Wang
Congratulations Ron !

Best,
Lijie

Samrat Deb  于2023年10月16日周一 10:03写道:

> Congratulations Ron Liu :)
>
> On Mon, 16 Oct 2023 at 7:29 AM, tison  wrote:
>
> > Congrats! Glad to see more and more committers on board :D
> >
> > Enjoy your journey ;-)
> >
> > Best,
> > tison.
> >
> >
> > Jark Wu  于2023年10月16日周一 09:57写道:
> >
> > > Hi, everyone
> > >
> > > On behalf of the PMC, I'm very happy to announce Ron Liu as a new Flink
> > > Committer.
> > >
> > > Ron has been continuously contributing to the Flink project for many
> > years,
> > > authored and reviewed a lot of codes. He mainly works on Flink SQL
> parts
> > > and drove several important FLIPs, e.g., USING JAR (FLIP-214), Operator
> > > Fusion CodeGen (FLIP-315), Runtime Filter (FLIP-324). He has a great
> > > knowledge of the Batch SQL and improved a lot of batch performance in
> the
> > > past several releases. He is also quite active in mailing lists,
> > > participating in discussions and answering user questions.
> > >
> > > Please join me in congratulating Ron Liu for becoming a Flink
> Committer!
> > >
> > > Best,
> > > Jark Wu (on behalf of the Flink PMC)
> > >
> >
>


Re: [VOTE] FLIP-366: Support standard YAML for FLINK configuration

2023-10-12 Thread Lijie Wang
+1 (binding)

Best,
Lijie

Zhanghao Chen  于2023年10月13日周五 10:56写道:

> +1 (non-binding)
>
> Best,
> Zhanghao Chen
> 
> From: Junrui Lee 
> Sent: Friday, October 13, 2023 10:12
> To: dev@flink.apache.org 
> Subject: [VOTE] FLIP-366: Support standard YAML for FLINK configuration
>
> Hi all,
>
> Thank you to everyone for the feedback on FLIP-366[1]: Support standard
> YAML for FLINK configuration in the discussion thread [2].
> I would like to start a vote for it. The vote will be open for at least 72
> hours (excluding weekends, unless there is an objection or an insufficient
> number of votes).
>
> Thanks,
> Junrui
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-366%3A+Support+standard+YAML+for+FLINK+configuration
> [2]https://lists.apache.org/thread/qfhcm7h8r5xkv38rtxwkghkrcxg0q7k5
>


Re: [Discuss] FLIP-366: Support standard YAML for FLINK configuration

2023-09-21 Thread Lijie Wang
Hi Junrui,

+1 for this proposal, thanks for driving.

Best,
Lijie

ConradJam  于2023年9月22日周五 10:07写道:

> +1 Support for standard YAML format facilitates specification
>
> Jing Ge  于2023年9月22日周五 02:23写道:
>
> > Hi Junrui,
> >
> > +1 for following the standard. Thanks for your effort!
> >
> > Best regards,
> > Jing
> >
> > On Thu, Sep 21, 2023 at 5:09 AM Junrui Lee  wrote:
> >
> > > Hi Jane,
> > >
> > > Thank you for your valuable feedback and suggestions.
> > > I agree with your point about differentiating between
> "flink-config.yaml"
> > > and "flink-conf.yaml" to determine the standard syntax at a glance.
> > >
> > > While I understand your suggestion of using "flink-conf-default.yaml"
> to
> > > represent the default YAML file for Flink 1.x, I have been considering
> > > the option of using "flink-configuration.yaml" as the file name for the
> > > new configuration file.
> > > This name "flink-configuration.yaml" provides a clear distinction
> between
> > > the new and old configuration files based on their names, and it does
> not
> > > introduce any additional semantics. Moreover, this name
> > > "flink-configuration.yaml" can continue to be used in future versions
> > > FLINK-2.0.
> > >
> > > WDYT? If we can reach a consensus on this, I will update the FLIP
> > > documentation
> > > accordingly.
> > >
> > > Best regards,
> > > Junrui
> > >
> > > Jane Chan  于2023年9月20日周三 23:38写道:
> > >
> > > > Hi Junrui,
> > > >
> > > > Thanks for driving this FLIP. +1 for adoption of the standard YAML
> > > syntax.
> > > > I just have one minor suggestion. It's a little bit challenging to
> > > > differentiate between `flink-config.yaml` and `flink-conf.yaml` to
> > > > determine which one uses the standard syntax at a glance. How about
> > > > using `flink-conf-default.yaml` to represent the default yaml file
> for
> > > > Flink 1.x?
> > > >
> > > > Best,
> > > > Jane
> > > >
> > > > On Wed, Sep 20, 2023 at 11:06 AM Junrui Lee 
> > wrote:
> > > >
> > > > > Hi devs,
> > > > >
> > > > > I would like to start a discussion about FLIP-366:
> > > > > Support standard YAML for FLINK configuration[1]
> > > > >
> > > > > The current flink-conf.yaml parser in FLINK is not a standard YAML
> > > > parser,
> > > > > which has some shortcomings.
> > > > > Firstly, it does not support nested structure configuration items
> and
> > > > only
> > > > > supports key-value pairs, resulting in poor readability. Secondly,
> if
> > > the
> > > > > value is a collection type, such as a List or Map, users are
> required
> > > to
> > > > > write the value in a FLINK-specific pattern, which is inconvenient
> to
> > > > use.
> > > > > Additionally, the parser of FLINK has some differences in syntax
> > > compared
> > > > > to the standard YAML parser, such as the syntax for parsing
> comments
> > > and
> > > > > null values. These inconsistencies can cause confusion for users,
> as
> > > seen
> > > > > in FLINK-15358 and FLINK-32740.
> > > > >
> > > > > By supporting standard YAML, these issues can be resolved, and
> users
> > > can
> > > > > create a Flink configuration file using third-party tools and
> > leverage
> > > > > some advanced YAML features. Therefore, we propose to support
> > standard
> > > > > YAML for FLINK configuration.
> > > > >
> > > > > You can find more details in the FLIP-366[1]. Looking forward to
> your
> > > > > feedback.
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-366%3A+Support+standard+YAML+for+FLINK+configuration
> > > > >
> > > > > Best,
> > > > > Junrui
> > > > >
> > > >
> > >
> >
>
>
> --
> Best
>
> ConradJam
>


Re: [ANNOUNCE] Release 1.18.0, release candidate #0

2023-09-20 Thread Lijie Wang
Hi community and release managers:

We found a critical bug[1] of the rest client a few days ago, which may
cause the inode to be used up. Now the fix-PR[2] is ready for merging, I
hope to backport it to release-1.18.

Please let me know if you have any concerns. Thanks.

[1] https://issues.apache.org/jira/browse/FLINK-32974
[2] https://github.com/apache/flink/pull/23363

Best,
Lijie

Zakelly Lan  于2023年9月19日周二 17:26写道:

> Hi Yuan and Jing,
>
> Thank you for sharing your thoughts. I completely agree that it is our
> top priority to ensure that there are no regressions from the last
> commit the previous benchmark pipeline covered to the final commit of
> this release. I will try to get this result first.
>
>
> Best,
> Zakelly
>
> On Tue, Sep 19, 2023 at 4:55 PM Jing Ge 
> wrote:
> >
> > Hi
> >
> > Thanks Zakelly and Yuan for your effort and update. Since we changed the
> > hardware, IMHO, if we are able to reach a consensus in the community that
> > there is no regression with the benchmarks, we could consider releasing
> rc1
> > without waiting for the new baseline scores which might take days.
> >
> > Best regards,
> > Jing
> >
> > On Tue, Sep 19, 2023 at 10:42 AM Yuan Mei 
> wrote:
> >
> > > Hey Zakelly,
> > >
> > > Thanks very much for the efforts to re-build the entire benchmark
> > > environment.
> > >
> > > As long as we have
> > > 1) the pipeline set up and ready (no need for the entire portal ready),
> > > 2) get benchmark comparison numbers (comparing with the commit just
> before
> > > the benchmark pipeline is down) and
> > > 3) confirmed no-regression, it should be good enough.
> > >
> > > Thanks again!
> > >
> > > Best
> > > Yuan
> > >
> > > On Tue, Sep 19, 2023 at 4:26 PM Zakelly Lan 
> wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > I am working on rebuilding the benchmark pipeline and it's almost
> > > > done. However, due to the change in machines for benchmarking, I will
> > > > need a few more days to run tests and gather the baseline scores for
> > > > further comparison. Once the pipeline is fully ready, we will proceed
> > > > with the performance test for release 1.18.0.
> > > >
> > > > Please let me know if you have any concerns. Thank you all for your
> > > > patience.
> > > >
> > > > Best,
> > > > Zakelly
> > > >
> > > > On Mon, Sep 18, 2023 at 6:57 PM Jing Ge 
> > > > wrote:
> > > > >
> > > > > Hi everyone,
> > > > >
> > > > > The RC0 for Apache Flink 1.18.0 has been created. This RC is
> currently
> > > > for
> > > > > preview only to facilitate the integrated testing since the
> benchmark
> > > > tests
> > > > > are not available yet[1] and the release announcement is still
> under
> > > > > review. The RC1 will be released after all benchmarks tests are
> passed.
> > > > The
> > > > > related voting process will be triggered once the announcement is
> > > ready.
> > > > > The RC0 has all the artifacts that we would typically have for a
> > > release,
> > > > > except for the release note and the website pull request for the
> > > release
> > > > > announcement.
> > > > >
> > > > > The following contents are available for your review:
> > > > >
> > > > > - The preview source release and binary convenience releases [2],
> which
> > > > > are signed with the key with fingerprint 96AE0E32CBE6E0753CE6 [3].
> > > > > - all artifacts that would normally be deployed to the Maven
> > > > > Central Repository [4].
> > > > > - source code tag "release-1.18.0-rc0" [5]
> > > > >
> > > > > Your help testing the release will be greatly appreciated! And
> we'll
> > > > > create the rc1 release and the voting thread as soon as all the
> efforts
> > > > are
> > > > > finished.
> > > > >
> > > > > [1] https://issues.apache.org/jira/browse/FLINK-33052
> > > > > [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.18.0-rc0/
> > > > > [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > > > > [4]
> > > >
> https://repository.apache.org/content/repositories/orgapacheflink-1656/
> > > > > [5]
> https://github.com/apache/flink/releases/tag/release-1.18.0-rc0
> > > > >
> > > > > Best regards,
> > > > > Qingsheng, Sergei, Konstantin and Jing
> > > >
> > >
>


[jira] [Created] (FLINK-32974) RestClusterClient leaks flink-rest-client-jobgraphs* directories

2023-08-28 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-32974:
--

 Summary: RestClusterClient leaks flink-rest-client-jobgraphs* 
directories
 Key: FLINK-32974
 URL: https://issues.apache.org/jira/browse/FLINK-32974
 Project: Flink
  Issue Type: Bug
  Components: Client / Job Submission
Affects Versions: 1.18.0
Reporter: Lijie Wang


After FLINK-32226, a temporary directory(named 
{{flink-rest-client-jobgraphs*}}) is created when creating a new 
RestClusterClient, but this directory will never be cleaned up.

This will result in a lot of {{flink-rest-client-jobgraphs*}} directories under 
{{/tmp}}, especially when using CollectDynamicSink/CollectResultFetcher.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32844) Runtime Filter should not be applied if the field is already filtered by DPP

2023-08-11 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-32844:
--

 Summary: Runtime Filter should not be applied if the field is 
already filtered by DPP
 Key: FLINK-32844
 URL: https://issues.apache.org/jira/browse/FLINK-32844
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.18.0
Reporter: Lijie Wang


Currently, the runtime filter and DPP may take effect on the same key. In this 
case, the runtime filter may be redundant because the data may have been 
filtered out by the DPP. We should avoid this because redundant runtime filters 
can have negative effects.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32831) RuntimeFilterProgram should aware join type when looking for the build side

2023-08-10 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-32831:
--

 Summary: RuntimeFilterProgram should aware join type when looking 
for the build side
 Key: FLINK-32831
 URL: https://issues.apache.org/jira/browse/FLINK-32831
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.18.0
Reporter: Lijie Wang


Currently, runtime filter program will try to look for an {{Exchange}} as build 
side to avoid affecting {{MultiInput}}. It will try to push down the runtime 
filter builder if the original build side is not {{Exchange}}.

Currenlty, the builder-push-down does not aware the join type, which may lead 
to incorrect results(For example, push down the builder to the right input of 
left-join).

We should only support following cases:
1. Inner join: builder can push to left + right input
2. semi join: builder can push to left + right input
3. left join: builder can only push to the left input
4. right join: builder can only push to the right input



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32827) Operator fusion codegen may not take effect when enable runtime filter

2023-08-10 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-32827:
--

 Summary: Operator fusion codegen may not take effect when enable 
runtime filter
 Key: FLINK-32827
 URL: https://issues.apache.org/jira/browse/FLINK-32827
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.18.0
Reporter: Lijie Wang


Currently, the RuntimeFilterOperator does not support operator fusion 
codegen(OFCG), which means the Runtime Filter and OFCG can not take affect 
together, we should fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [ANNOUNCE] New Apache Flink Committer - Hong Teoh

2023-08-04 Thread Lijie Wang
Congratulations, Hong!

Best,
Lijie

Paul Lam  于2023年8月4日周五 15:18写道:

> Congrats, Hong!
>
> Best,
> Paul Lam
>
> > 2023年8月4日 14:59,Jark Wu  写道:
> >
> > Congratulations, Hong!
> >
> > Best,
> > Jark
> >
> > On Fri, 4 Aug 2023 at 14:24, Sergey Nuyanzin 
> wrote:
> >
> >> Congratulations, Hong!
> >>
> >> On Fri, Aug 4, 2023 at 7:25 AM Shammon FY  wrote:
> >>
> >>> Congratulations, Hong!
> >>>
> >>> Best,
> >>> Shammon FY
> >>>
> >>> On Fri, Aug 4, 2023 at 12:33 PM Jing Ge 
> >>> wrote:
> >>>
>  congrats! Hong!
> 
>  Best regards,
>  Jing
> 
>  On Fri, Aug 4, 2023 at 11:48 AM Qingsheng Ren 
> >>> wrote:
> 
> > Congratulations and welcome aboard, Hong!
> >
> > Best,
> > Qingsheng
> >
> > On Fri, Aug 4, 2023 at 11:04 AM Matt Wang  wrote:
> >
> >> Congratulations, Hong!
> >>
> >>
> >> --
> >>
> >> Best,
> >> Matt Wang
> >>
> >>
> >>  Replied Message 
> >> | From | Weihua Hu |
> >> | Date | 08/4/2023 10:55 |
> >> | To |  |
> >> | Subject | Re: [ANNOUNCE] New Apache Flink Committer - Hong Teoh |
> >> Congratulations, Hong!
> >>
> >> Best,
> >> Weihua
> >>
> >>
> >> On Fri, Aug 4, 2023 at 10:49 AM Samrat Deb 
> > wrote:
> >>
> >> Congratulations, Hong Teoh
> >>
> >> On Fri, 4 Aug 2023 at 7:52 AM, Benchao Li 
>  wrote:
> >>
> >> Congratulations, Hong!
> >>
> >> yuxia  于2023年8月4日周五 09:23写道:
> >>
> >> Congratulations, Hong Teoh!
> >>
> >> Best regards,
> >> Yuxia
> >>
> >> - 原始邮件 -
> >> 发件人: "Matthias Pohl" 
> >> 收件人: "dev" 
> >> 发送时间: 星期四, 2023年 8 月 03日 下午 11:24:39
> >> 主题: Re: [ANNOUNCE] New Apache Flink Committer - Hong Teoh
> >>
> >> Congratulations, Hong! :)
> >>
> >> On Thu, Aug 3, 2023 at 3:39 PM Leonard Xu 
> >> wrote:
> >>
> >> Congratulations, Hong!
> >>
> >>
> >> Best,
> >> Leonard
> >>
> >> On Aug 3, 2023, at 8:45 PM, Jiabao Sun  >> .INVALID>
> >> wrote:
> >>
> >> Congratulations, Hong Teoh!
> >>
> >> Best,
> >> Jiabao Sun
> >>
> >> 2023年8月3日 下午7:32,Danny Cranmer  写道:
> >>
> >> On behalf of the PMC, I'm very happy to announce Hong Teoh as a
> >> new
> >> Flink
> >> Committer.
> >>
> >> Hong has been active in the Flink community for over 1 year and
> >> has
> >> played
> >> a key role in developing and maintaining AWS integrations, core
> >> connector
> >> APIs and more recently, improvements to the Flink REST API.
> >> Additionally,
> >> Hong is a very active community member, supporting users and
> >> participating
> >> in discussions on the mailing lists, Flink slack channels and
> >> speaking
> >> at
> >> conferences.
> >>
> >> Please join me in congratulating Hong for becoming a Flink
> >> Committer!
> >>
> >> Thanks,
> >> Danny Cranmer (on behalf of the Flink PMC)
> >>
> >>
> >>
> >>
> >>
> >>
> >> --
> >>
> >> Best,
> >> Benchao Li
> >>
> >>
> >>
> >
> 
> >>>
> >>
> >>
> >> --
> >> Best regards,
> >> Sergey
> >>
>
>


Re: [ANNOUNCE] New Apache Flink PMC Member - Matthias Pohl

2023-08-04 Thread Lijie Wang
Congratulation, Matthias!

Best,
Lijie

Konstantin Knauf  于2023年8月4日周五 15:27写道:

> Congrats, Matthias!
>
> Am Fr., 4. Aug. 2023 um 09:15 Uhr schrieb Paul Lam  >:
>
> > Congratulation, Matthias!
> >
> > Best,
> > Paul Lam
> >
> > > 2023年8月4日 15:09,yuxia  写道:
> > >
> > > Congratulation, Matthias!
> > >
> > > Best regards,
> > > Yuxia
> > >
> > > - 原始邮件 -
> > > 发件人: "Yun Tang" 
> > > 收件人: "dev" 
> > > 发送时间: 星期五, 2023年 8 月 04日 下午 3:04:52
> > > 主题: Re: [ANNOUNCE] New Apache Flink PMC Member - Matthias Pohl
> > >
> > > Congratulation, Matthias!
> > >
> > >
> > > Best
> > > Yun Tang
> > > 
> > > From: Jark Wu 
> > > Sent: Friday, August 4, 2023 15:00
> > > To: dev@flink.apache.org 
> > > Subject: Re: [ANNOUNCE] New Apache Flink PMC Member - Matthias Pohl
> > >
> > > Congratulations, Matthias!
> > >
> > > Best,
> > > Jark
> > >
> > > On Fri, 4 Aug 2023 at 14:59, Weihua Hu  wrote:
> > >
> > >> Congratulations,  Matthias!
> > >>
> > >> Best,
> > >> Weihua
> > >>
> > >>
> > >> On Fri, Aug 4, 2023 at 2:49 PM Yuxin Tan 
> > wrote:
> > >>
> > >>> Congratulations, Matthias!
> > >>>
> > >>> Best,
> > >>> Yuxin
> > >>>
> > >>>
> > >>> Sergey Nuyanzin  于2023年8月4日周五 14:21写道:
> > >>>
> >  Congratulations, Matthias!
> >  Well deserved!
> > 
> >  On Fri, Aug 4, 2023 at 7:59 AM liu ron  wrote:
> > 
> > > Congrats, Matthias!
> > >
> > > Best,
> > > Ron
> > >
> > > Shammon FY  于2023年8月4日周五 13:24写道:
> > >
> > >> Congratulations, Matthias!
> > >>
> > >> On Fri, Aug 4, 2023 at 1:13 PM Samrat Deb 
> >  wrote:
> > >>
> > >>> Congrats, Matthias!
> > >>>
> > >>>
> > >>> On Fri, 4 Aug 2023 at 10:13 AM, Benchao Li  > >>>
> > > wrote:
> > >>>
> >  Congratulations, Matthias!
> > 
> >  Jing Ge  于2023年8月4日周五 12:35写道:
> > 
> > > Congrats! Matthias!
> > >
> > > Best regards,
> > > Jing
> > >
> > > On Fri, Aug 4, 2023 at 12:09 PM Yangze Guo <
> > >> karma...@gmail.com
> > 
> > >> wrote:
> > >
> > >> Congrats, Matthias!
> > >>
> > >> Best,
> > >> Yangze Guo
> > >>
> > >> On Fri, Aug 4, 2023 at 11:44 AM Qingsheng Ren <
> >  re...@apache.org>
> >  wrote:
> > >>>
> > >>> Congratulations, Matthias! This is absolutely well
> > >>> deserved.
> > >>>
> > >>> Best,
> > >>> Qingsheng
> > >>>
> > >>> On Fri, Aug 4, 2023 at 11:31 AM Rui Fan <
> >  1996fan...@gmail.com>
> >  wrote:
> > >>>
> >  Congratulations Matthias, well deserved!
> > 
> >  Best,
> >  Rui Fan
> > 
> >  On Fri, Aug 4, 2023 at 11:30 AM Leonard Xu <
> > > xbjt...@gmail.com>
> > > wrote:
> > 
> > > Congratulations,  Matthias.
> > >
> > > Well deserved ^_^
> > >
> > > Best,
> > > Leonard
> > >
> > >
> > >> On Aug 4, 2023, at 11:18 AM, Xintong Song <
> >  tonysong...@gmail.com
> > >>
> >  wrote:
> > >>
> > >> Hi everyone,
> > >>
> > >> On behalf of the PMC, I'm very happy to announce
> > >> that
> > >>> Matthias
> > >> Pohl has
> > >> joined the Flink PMC!
> > >>
> > >> Matthias has been consistently contributing to the
> > > project
> >  since
> > >> Sep
> > > 2020,
> > >> and became a committer in Dec 2021. He mainly works
> > >>> in
> > >>> Flink's
> > > distributed
> > >> coordination and high availability areas. He has
> > >>> worked
> > > on
> > >>> many
> > >> FLIPs
> > >> including FLIP195/270/285. He helped a lot with the
> > > release
> > >> management,
> > >> being one of the Flink 1.17 release managers and
> > >> also
> > > very
> >  active
> > >> in
> > > Flink
> > >> 1.18 / 2.0 efforts. He also contributed a lot to
> > > improving
> > >>> the
> > >> build
> > >> stability.
> > >>
> > >> Please join me in congratulating Matthias!
> > >>
> > >> Best,
> > >>
> > >> Xintong (on behalf of the Apache Flink PMC)
> > >
> > >
> > 
> > >>
> > >
> > 
> > 
> >  --
> > 
> >  Best,
> >  Benchao Li
> > 
> > >>>
> > >>
> > >
> > 
> > 
> >  --
> >  Best regards,
> >  Sergey
> > 
> > >>>
> > >>
> >
> >
>
> --
> https://twitter.com/snntrable
> https://github.com/knaufk
>


Re: [ANNOUNCE] New Apache Flink Committer - Weihua Hu

2023-08-04 Thread Lijie Wang
Congratulations, Weihua!

Best,
Lijie

yuxia  于2023年8月4日周五 15:14写道:

> Congratulations, Weihua!
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "Yun Tang" 
> 收件人: "dev" 
> 发送时间: 星期五, 2023年 8 月 04日 下午 3:05:30
> 主题: Re: [ANNOUNCE] New Apache Flink Committer - Weihua Hu
>
> Congratulations, Weihua!
>
>
> Best
> Yun Tang
> 
> From: Jark Wu 
> Sent: Friday, August 4, 2023 15:00
> To: dev@flink.apache.org 
> Subject: Re: [ANNOUNCE] New Apache Flink Committer - Weihua Hu
>
> Congratulations, Weihua!
>
> Best,
> Jark
>
> On Fri, 4 Aug 2023 at 14:48, Yuxin Tan  wrote:
>
> > Congratulations Weihua!
> >
> > Best,
> > Yuxin
> >
> >
> > Junrui Lee  于2023年8月4日周五 14:28写道:
> >
> > > Congrats, Weihua!
> > > Best,
> > > Junrui
> > >
> > > Geng Biao  于2023年8月4日周五 14:25写道:
> > >
> > > > Congrats, Weihua!
> > > > Best,
> > > > Biao Geng
> > > >
> > > > 发送自 Outlook for iOS
> > > > 
> > > > 发件人: 周仁祥 
> > > > 发送时间: Friday, August 4, 2023 2:23:42 PM
> > > > 收件人: dev@flink.apache.org 
> > > > 抄送: Weihua Hu 
> > > > 主题: Re: [ANNOUNCE] New Apache Flink Committer - Weihua Hu
> > > >
> > > > Congratulations, Weihua~
> > > >
> > > > > 2023年8月4日 14:21,Sergey Nuyanzin  写道:
> > > > >
> > > > > Congratulations, Weihua!
> > > > >
> > > > > On Fri, Aug 4, 2023 at 8:03 AM Chen Zhanghao <
> > > zhanghao.c...@outlook.com>
> > > > > wrote:
> > > > >
> > > > >> Congratulations, Weihua!
> > > > >>
> > > > >> Best,
> > > > >> Zhanghao Chen
> > > > >> 
> > > > >> 发件人: Xintong Song 
> > > > >> 发送时间: 2023年8月4日 11:18
> > > > >> 收件人: dev 
> > > > >> 抄送: Weihua Hu 
> > > > >> 主题: [ANNOUNCE] New Apache Flink Committer - Weihua Hu
> > > > >>
> > > > >> Hi everyone,
> > > > >>
> > > > >> On behalf of the PMC, I'm very happy to announce Weihua Hu as a
> new
> > > > Flink
> > > > >> Committer!
> > > > >>
> > > > >> Weihua has been consistently contributing to the project since May
> > > > 2022. He
> > > > >> mainly works in Flink's distributed coordination areas. He is the
> > main
> > > > >> contributor of FLIP-298 and many other improvements in large-scale
> > job
> > > > >> scheduling and improvements. He is also quite active in mailing
> > lists,
> > > > >> participating discussions and answering user questions.
> > > > >>
> > > > >> Please join me in congratulating Weihua!
> > > > >>
> > > > >> Best,
> > > > >>
> > > > >> Xintong (on behalf of the Apache Flink PMC)
> > > > >>
> > > > >
> > > > >
> > > > > --
> > > > > Best regards,
> > > > > Sergey
> > > >
> > > >
> > >
> >
>


[jira] [Created] (FLINK-32680) Job vertex names get messed up once there is a source vertex chained with a MultipleInput vertex in job graph

2023-07-26 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-32680:
--

 Summary: Job vertex names get messed up once there is a source 
vertex chained with a MultipleInput vertex in job graph
 Key: FLINK-32680
 URL: https://issues.apache.org/jira/browse/FLINK-32680
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.17.1, 1.16.2, 1.18.0
Reporter: Lijie Wang
 Attachments: image-2023-07-26-15-01-51-886.png, 
image-2023-07-26-15-23-29-551.png, image-2023-07-26-15-24-24-077.png

Take the following test(put it to {{{}MultipleInputITCase{}}}) as example:
{code:java}
@Test
public void testMultipleInputDoesNotChainedWithSource() throws Exception {
testJobVertexName(false);
}

@Test
public void testMultipleInputChainedWithSource() throws Exception {
testJobVertexName(true);
}

public void testJobVertexName(boolean chain) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

TestListResultSink resultSink = new TestListResultSink<>();

DataStream source1 = env.fromSequence(0L, 3L).name("source1");
DataStream source2 = env.fromElements(4L, 6L).name("source2");
DataStream source3 = env.fromElements(7L, 9L).name("source3");

KeyedMultipleInputTransformation transform =
new KeyedMultipleInputTransformation<>(
"MultipleInput",
new KeyedSumMultipleInputOperatorFactory(),
BasicTypeInfo.LONG_TYPE_INFO,
1,
BasicTypeInfo.LONG_TYPE_INFO);
if (chain) {
transform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
}
KeySelector keySelector = (KeySelector) value 
-> value % 3;

env.addOperator(
transform
.addInput(source1.getTransformation(), keySelector)
.addInput(source2.getTransformation(), keySelector)
.addInput(source3.getTransformation(), keySelector));

new 
MultipleConnectedStreams(env).transform(transform).rebalance().addSink(resultSink).name("sink");

env.execute();
}{code}
 

When we run {{{}testMultipleInputDoesNotChainedWithSource{}}}, all job vertex 
names are normal.

!image-2023-07-26-15-24-24-077.png|width=494,height=246!

When we run {{{}testMultipleInputChainedWithSource{}}}, job vertex names get 
messed up (all names contain {{{}Source: source1{}}}). I think it's a bug.

!image-2023-07-26-15-23-29-551.png|width=515,height=182!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32492) Introduce FlinkRuntimeFilterProgram to inject runtime filter

2023-06-29 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-32492:
--

 Summary: Introduce FlinkRuntimeFilterProgram to inject runtime 
filter
 Key: FLINK-32492
 URL: https://issues.apache.org/jira/browse/FLINK-32492
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Lijie Wang
Assignee: Lijie Wang
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32486) FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-29 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-32486:
--

 Summary: FLIP-324: Introduce Runtime Filter for Flink Batch Jobs
 Key: FLINK-32486
 URL: https://issues.apache.org/jira/browse/FLINK-32486
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner, Table SQL / Runtime
Reporter: Lijie Wang
Assignee: Lijie Wang
 Fix For: 1.18.0


This is an umbrella ticket for 
[FLIP-324|https://cwiki.apache.org/confluence/display/FLINK/FLIP-324%3A+Introduce+Runtime+Filter+for+Flink+Batch+Jobs]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[RESULT][VOTE] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-29 Thread Lijie Wang
Hi, all
Happy to announce that FLIP-324 [1] has been approved unanimously.
According to the vote thread[2], there are 10 approving votes, out of
which 7 are binding:

- Jing Ge (binding)
- Rui Fan (binding)
- Zhu Zhu (binding)
- Yuepeng Pan (non-binding)
- Yuxia (binding)
- Xia Sun (non-binding)
- Jark Wu (binding)
- Yangze Guo (binding)
- Yuxin Tan (non-binding)
- Benchao Li (binding)

And no disapproving ones.

Thanks all for participating!

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-324%3A+Introduce+Runtime+Filter+for+Flink+Batch+Jobs
[2] https://lists.apache.org/thread/60c0obrgxrcxb7qv9pqywzxvtt5phnhy

Best,
Lijie.


Re: [VOTE] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-29 Thread Lijie Wang
Thanks for everyone's votes!
Closing this vote now and following up with the result in a separate email.

Best,
Lijie

Benchao Li  于2023年6月25日周日 12:48写道:

> +1 (binding)
>
> Yuxin Tan  于2023年6月25日周日 12:27写道:
>
> > +1 (non-binding)
> >
> > Best,
> > Yuxin
> >
> >
> > Yangze Guo  于2023年6月25日周日 12:21写道:
> >
> > > +1 (binding)
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Sun, Jun 25, 2023 at 11:41 AM Jark Wu  wrote:
> > > >
> > > > +1 (binding)
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > > > 2023年6月25日 10:04,Xia Sun  写道:
> > > > >
> > > > > +1 (non-binding)
> > > > >
> > > > > Best Regards,
> > > > >
> > > > > Xia
> > > > >
> > > > > yuxia  于2023年6月25日周日 09:23写道:
> > > > >
> > > > >> +1 (binding)
> > > > >> Thanks Lijie driving it.
> > > > >>
> > > > >> Best regards,
> > > > >> Yuxia
> > > > >>
> > > > >> - 原始邮件 -
> > > > >> 发件人: "Yuepeng Pan" 
> > > > >> 收件人: "dev" 
> > > > >> 发送时间: 星期六, 2023年 6 月 24日 下午 9:06:53
> > > > >> 主题: Re:[VOTE] FLIP-324: Introduce Runtime Filter for Flink Batch
> > Jobs
> > > > >>
> > > > >> +1 (non-binding)
> > > > >>
> > > > >> Thanks,
> > > > >> Yuepeng Pan
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >> At 2023-06-23 23:49:53, "Lijie Wang" 
> > > wrote:
> > > > >>> Hi all,
> > > > >>>
> > > > >>> Thanks for all the feedback about the FLIP-324: Introduce Runtime
> > > Filter
> > > > >>> for Flink Batch Jobs[1]. This FLIP was discussed in [2].
> > > > >>>
> > > > >>> I'd like to start a vote for it. The vote will be open for at
> least
> > > 72
> > > > >>> hours (until June 29th 12:00 GMT) unless there is an objection or
> > > > >>> insufficient votes.
> > > > >>>
> > > > >>> [1]
> > > > >>>
> > > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-324%3A+Introduce+Runtime+Filter+for+Flink+Batch+Jobs
> > > > >>> [2]
> > https://lists.apache.org/thread/mm0o8fv7x7k13z11htt88zhy7lo8npmg
> > > > >>>
> > > > >>> Best,
> > > > >>> Lijie
> > > > >>
> > > >
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>


[VOTE] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-23 Thread Lijie Wang
Hi all,

Thanks for all the feedback about the FLIP-324: Introduce Runtime Filter
for Flink Batch Jobs[1]. This FLIP was discussed in [2].

I'd like to start a vote for it. The vote will be open for at least 72
hours (until June 29th 12:00 GMT) unless there is an objection or
insufficient votes.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-324%3A+Introduce+Runtime+Filter+for+Flink+Batch+Jobs
[2] https://lists.apache.org/thread/mm0o8fv7x7k13z11htt88zhy7lo8npmg

Best,
Lijie


Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-22 Thread Lijie Wang
hich is beneficial to
> the
> > > join
> > > > operator, so whether to insert the RuntimeFilter or not is not
> > dependent
> > > on
> > > > the parallelism.
> > > >
> > > > > Does it make sense to reconsider the formula of ratio
> > > > calculation to help users easily control the filter injection?
> > > >
> > > > Only when ndv does not exist will row count be considered. when size
> > uses
> > > > the default value and ndv cannot be taken, it is true that this
> > condition
> > > > may always hold, but this does not seem to affect anything, and the
> > user
> > > is
> > > > also likely to change the value of the size. One question, how do you
> > > think
> > > > we should make it easier for users to control the  filter injection?
> > > >
> > > >
> > > > [1]:
> > > >
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#adaptive-batch-scheduler
> > > >
> > > > Best,
> > > > Ron
> > > >
> > > > Jing Ge  于2023年6月20日周二 07:11写道:
> > > >
> > > > > Hi Lijie,
> > > > >
> > > > > Thanks for your proposal. It is a really nice feature. I'd like to
> > ask
> > > a
> > > > > few questions to understand your thoughts.
> > > > >
> > > > > Afaiu, the runtime Filter will only be Injected when the gap
> between
> > > the
> > > > > build data size and prob data size is big enough. Let's make an
> > extreme
> > > > > example. If the small table(build side) has one row and the large
> > > > > table(probe side) contains tens of billions of rows. This will be
> the
> > > > ideal
> > > > > use case for the runtime filter and the improvement will be
> > > significant.
> > > > Is
> > > > > this correct?
> > > > >
> > > > > Speaking of the "Conditions of injecting Runtime Filter" in the
> FLIP,
> > > > will
> > > > > the value of max-build-data-size and min-prob-data-size depend on
> the
> > > > > parallelism config? I.e. with the same data-size setting, is it
> > > possible
> > > > to
> > > > > inject or don't inject runtime filters by adjusting the
> parallelism?
> > > > >
> > > > > In the FLIP, there are default values for the new configuration
> > > > parameters
> > > > > that will be used to check the injection condition. If ndv cannot
> be
> > > > > estimated, row count will be used. Given the max-build-data-size is
> > > 10MB
> > > > > and the min-prob-data-size is 10GB, in the worst case, the
> > > > min-filter-ratio
> > > > > will be 0.999, i.e. the probeNdv is 1000 times buildNdv . If we
> > > consider
> > > > > the duplication and the fact that the large table might have more
> > > columns
> > > > > than the small table, the probeNdv should still be 100 or 10 times
> > > > > buildNdv, which ends up with a min-filter-ratio equals to 0.99 or
> > 0.9.
> > > > Both
> > > > > are bigger than the default value 0.5 in the FLIP. If I am not
> > > mistaken,
> > > > > commonly, a min-filter-ratio less than 0.99 will always allow
> > injecting
> > > > the
> > > > > runtime filter. Does it make sense to reconsider the formula of
> ratio
> > > > > calculation to help users easily control the filter injection?
> > > > >
> > > > > Best regards,
> > > > > Jing
> > > > >
> > > > > On Mon, Jun 19, 2023 at 4:42 PM Lijie Wang <
> wangdachui9...@gmail.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Hi Stefan,
> > > > > >
> > > > > > >> bypassing the dataflow
> > > > > > I believe it's a possible solution, but it may require more
> > > > coordination
> > > > > > and extra conditions (such as DFS), I do think it should be
> > excluded
> > > > from
> > > > > > the first version. I'll put it in Future+Improvements as a
> > potential
> > > > > > improvement.
> > > > > >
> > > >

Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-20 Thread Lijie Wang
Hi all,

I've updated the FLIP according to the above discussion, the changes mainly
include:
Declare the supported shuffle types.
Build the runtime filters using a two-phase approach.
Add more potential future improvements.

Best,
Lijie

liu ron  于2023年6月20日周二 11:19写道:

> Hi, Jing
>
> Thanks for your feedback.
>
> > Afaiu, the runtime Filter will only be Injected when the gap between the
> build data size and prob data size is big enough. Let's make an extreme
> example. If the small table(build side) has one row and the large
> table(probe side) contains tens of billions of rows. This will be the ideal
> use case for the runtime filter and the improvement will be significant. Is
> this correct?
>
> Yes, you are right.
>
> > Speaking of the "Conditions of injecting Runtime Filter" in the FLIP,
> will
> the value of max-build-data-size and min-prob-data-size depend on the
> parallelism config? I.e. with the same data-size setting, is it possible to
> inject or don't inject runtime filters by adjusting the parallelism?
>
> First, let me clarify two points. The first is that RuntimeFilter decides
> whether to inject or not in the optimization phase, but we do not consider
> operator parallelism in the SQL optimization phase currently, which is set
> at the ExecNode level. The second is that in batch mode, the default
> AdaptiveBatchScheduler[1] is now used, which will derive the parallelism of
> the downstream operator based on the amount of data produced by the
> upstream operator, that is, the parallelism is determined by runtime
> adaptation. In the above case, we cannot decide whether to inject
> BloomFilter in the optimization stage based on parallelism.
> A more important point is that the purpose of Runtime Filter is to reduce
> the amount of data for shuffle, and thus the amount of data processed by
> the downstream join operator. Therefore, I understand that regardless of
> the parallelism of the probe, the amount of data in the shuffle must be
> reduced after inserting the Runtime Filter, which is beneficial to the join
> operator, so whether to insert the RuntimeFilter or not is not dependent on
> the parallelism.
>
> > Does it make sense to reconsider the formula of ratio
> calculation to help users easily control the filter injection?
>
> Only when ndv does not exist will row count be considered. when size uses
> the default value and ndv cannot be taken, it is true that this condition
> may always hold, but this does not seem to affect anything, and the user is
> also likely to change the value of the size. One question, how do you think
> we should make it easier for users to control the  filter injection?
>
>
> [1]:
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#adaptive-batch-scheduler
>
> Best,
> Ron
>
> Jing Ge  于2023年6月20日周二 07:11写道:
>
> > Hi Lijie,
> >
> > Thanks for your proposal. It is a really nice feature. I'd like to ask a
> > few questions to understand your thoughts.
> >
> > Afaiu, the runtime Filter will only be Injected when the gap between the
> > build data size and prob data size is big enough. Let's make an extreme
> > example. If the small table(build side) has one row and the large
> > table(probe side) contains tens of billions of rows. This will be the
> ideal
> > use case for the runtime filter and the improvement will be significant.
> Is
> > this correct?
> >
> > Speaking of the "Conditions of injecting Runtime Filter" in the FLIP,
> will
> > the value of max-build-data-size and min-prob-data-size depend on the
> > parallelism config? I.e. with the same data-size setting, is it possible
> to
> > inject or don't inject runtime filters by adjusting the parallelism?
> >
> > In the FLIP, there are default values for the new configuration
> parameters
> > that will be used to check the injection condition. If ndv cannot be
> > estimated, row count will be used. Given the max-build-data-size is 10MB
> > and the min-prob-data-size is 10GB, in the worst case, the
> min-filter-ratio
> > will be 0.999, i.e. the probeNdv is 1000 times buildNdv . If we consider
> > the duplication and the fact that the large table might have more columns
> > than the small table, the probeNdv should still be 100 or 10 times
> > buildNdv, which ends up with a min-filter-ratio equals to 0.99 or 0.9.
> Both
> > are bigger than the default value 0.5 in the FLIP. If I am not mistaken,
> > commonly, a min-filter-ratio less than 0.99 will always allow injecting
> the
> > runtime filter. Does it make sense to reconsider the formula of ratio
> &

Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-19 Thread Lijie Wang
Hi Stefan,

>> bypassing the dataflow
I believe it's a possible solution, but it may require more coordination
and extra conditions (such as DFS), I do think it should be excluded from
the first version. I'll put it in Future+Improvements as a potential
improvement.

Thanks again for your quick reply :)

Best,
Lijie

Stefan Richter  于2023年6月19日周一 20:51写道:

>
> Hi Lijie,
>
> I think you understood me correctly. But I would not consider this a true
> cyclic dependency in the dataflow because I would not suggest to send the
> filter through an edge in the job graph from join to scan. I’d rather
> bypass the stream graph for exchanging bringing the filter to the scan. For
> example, the join could report the filter after the build phase, e.g. to
> the JM or a predefined DFS folder. And when the probe scan is scheduled,
> the JM provides the filter information to the scan when it gets scheduled
> for execution or the scan looks in DFS if it can find any filter that it
> can use as part of initialization. I’m not suggesting to do it exactly in
> those ways, but just to show what I mean by "bypassing the dataflow".
>
> Anyways, I’m fine with excluding this optimization from the current FLIP
> if you believe it would be hard to implement in Flink.
>
> Best,
> Stefan
>
>
> > On 19. Jun 2023, at 14:07, Lijie Wang  wrote:
> >
> > Hi Stefan,
> >
> > If I understand correctly(I hope so), the hash join operator needs to
> send
> > the bloom filter to probe scan, and probe scan also needs to send the
> > filtered data to the hash join operator. This means there will be a cycle
> > in the data flow, it will be hard for current Flink to schedule this kind
> > of graph. I admit we can find a way to do this, but that's probably a
> > bit outside the scope of this FLIP.  So let's do these complex
> > optimizations later, WDYT?
> >
> > Best,
> > Lijie
> >
> > Stefan Richter  srich...@confluent.io.invalid>> 于2023年6月19日周一 18:15写道:
> >
> >> Hi Lijie,
> >>
> >> Exactly, my proposal was to build the bloom filter in the hash
> operator. I
> >> don’t know about all the details about the implementation of Flink’s
> join
> >> operator, but I’d assume that even if the join is a two input operator
> it
> >> gets scheduled for 2 different pipelines. First the build phase with the
> >> scan from the dimension table and after that’s completed the probe phase
> >> with the scan of the fact table. I’m not proposing the use the bloom
> filter
> >> only in the join operator, but rather send the bloom filter to the probe
> >> scan before starting the probe. I assume this would require some form of
> >> side channel to transport the filter and coordination to tell the
> sources
> >> that such a filter is available. I cannot answer how hard those would
> be to
> >> implement, but the idea doesn’t seem impossible to me.
> >>
> >> Best,
> >> Stefan
> >>
> >>
> >>> On 19. Jun 2023, at 11:56, Lijie Wang 
> wrote:
> >>>
> >>> Hi Stefan,
> >>>
> >>> Now I know what you mean about point 1. But currently it is unfeasible
> >> for
> >>> Flink, because the building of the hash table is inside the hash join
> >>> operator. The hash join operator has two inputs, it will first process
> >> the
> >>> data of the build-input to build a hash table, and then use the hash
> >> table
> >>> to process the data of the probe-input. If we want to use the built
> hash
> >>> table to deduplicate data for bloom filter, we must put the bloom
> filter
> >>> inside the hash join operator.  However, in this way, the data reaching
> >> the
> >>> join operator cannot be reduced (the shuffle/network overhead cannot be
> >>> reduced), which is not what we expected.
> >>>
> >>> Regarding the filter type, I agree with you, more types of filters can
> >>> get further
> >>> optimization,  and it is in our future plan (We described it in the
> >> section
> >>> Future+Improvements#More+underlying+implementations).
> >>>
> >>> Best,
> >>> Lijie
> >>>
> >>> Stefan Richter  srich...@confluent.io.invalid>  >> srich...@confluent.io.invalid <mailto:srich...@confluent.io.invalid>>>
> 于2023年6月19日周一 15:58写道:
> >>>
> >>>>
> >>>> Hi Lijie,
> >>>>
> >>>> thanks for your response, I agree wi

Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-19 Thread Lijie Wang
Hi Stefan,

If I understand correctly(I hope so), the hash join operator needs to send
the bloom filter to probe scan, and probe scan also needs to send the
filtered data to the hash join operator. This means there will be a cycle
in the data flow, it will be hard for current Flink to schedule this kind
of graph. I admit we can find a way to do this, but that's probably a
bit outside the scope of this FLIP.  So let's do these complex
optimizations later, WDYT?

Best,
Lijie

Stefan Richter  于2023年6月19日周一 18:15写道:

> Hi Lijie,
>
> Exactly, my proposal was to build the bloom filter in the hash operator. I
> don’t know about all the details about the implementation of Flink’s join
> operator, but I’d assume that even if the join is a two input operator it
> gets scheduled for 2 different pipelines. First the build phase with the
> scan from the dimension table and after that’s completed the probe phase
> with the scan of the fact table. I’m not proposing the use the bloom filter
> only in the join operator, but rather send the bloom filter to the probe
> scan before starting the probe. I assume this would require some form of
> side channel to transport the filter and coordination to tell the sources
> that such a filter is available. I cannot answer how hard those would be to
> implement, but the idea doesn’t seem impossible to me.
>
> Best,
> Stefan
>
>
> > On 19. Jun 2023, at 11:56, Lijie Wang  wrote:
> >
> > Hi Stefan,
> >
> > Now I know what you mean about point 1. But currently it is unfeasible
> for
> > Flink, because the building of the hash table is inside the hash join
> > operator. The hash join operator has two inputs, it will first process
> the
> > data of the build-input to build a hash table, and then use the hash
> table
> > to process the data of the probe-input. If we want to use the built hash
> > table to deduplicate data for bloom filter, we must put the bloom filter
> > inside the hash join operator.  However, in this way, the data reaching
> the
> > join operator cannot be reduced (the shuffle/network overhead cannot be
> > reduced), which is not what we expected.
> >
> > Regarding the filter type, I agree with you, more types of filters can
> > get further
> > optimization,  and it is in our future plan (We described it in the
> section
> > Future+Improvements#More+underlying+implementations).
> >
> > Best,
> > Lijie
> >
> > Stefan Richter  srich...@confluent.io.invalid>> 于2023年6月19日周一 15:58写道:
> >
> >>
> >> Hi Lijie,
> >>
> >> thanks for your response, I agree with what you said about points 2 and
> 3.
> >> Let me explain a bit more about point 1. This would not apply to all
> types
> >> of joins and my suggestion is also *not* to build a hash table only for
> the
> >> purpose to build the bloom filter.
> >> I was thinking about the scenario of a hash join, where you would build
> >> the hash table as part of the join algorithm anyways and then use the
> >> keyset of that hash table to 1) have better insights on about NDV and
> 2) be
> >> able to construct the bloom filter without duplicates and therefore
> faster.
> >> So the preconditions where I would use this is if you are building a
> hash
> >> table as part of the join and you know you are not building for a key
> >> column (because there would be no duplicates to eliminate). Then your
> bloom
> >> filter construction could benefit already from the deduplication work
> that
> >> was done for building the hash table.
> >>
> >> I also wanted to point out that besides bloom filter and IN filter you
> >> could also think of other types of filter that can become interesting
> for
> >> certain distributions and meta data. For example, if you have min/max
> >> information about columns and partitions you could have a bit vector
> >> represent equilibrium-sized ranges of the key space between min and max
> and
> >> have the bits represent what part of the range is present and push that
> >> information down to the scan.
> >>
> >> Best,
> >> Stefan
> >>
> >>
> >>> On 19. Jun 2023, at 08:26, Lijie Wang 
> wrote:
> >>>
> >>> Hi Stefan,
> >>>
> >>> Thanks for your feedback. Let me briefly summarize the optimization
> >> points
> >>> you mentioned above (Please correct me if I'm wrong):
> >>>
> >>> 1. Build an extra hash table for deduplication before building the
> bloom
> >>> filter.
> >>>

Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-19 Thread Lijie Wang
Hi Stefan,

Now I know what you mean about point 1. But currently it is unfeasible for
Flink, because the building of the hash table is inside the hash join
operator. The hash join operator has two inputs, it will first process the
data of the build-input to build a hash table, and then use the hash table
to process the data of the probe-input. If we want to use the built hash
table to deduplicate data for bloom filter, we must put the bloom filter
inside the hash join operator.  However, in this way, the data reaching the
join operator cannot be reduced (the shuffle/network overhead cannot be
reduced), which is not what we expected.

Regarding the filter type, I agree with you, more types of filters can
get further
optimization,  and it is in our future plan (We described it in the section
Future+Improvements#More+underlying+implementations).

Best,
Lijie

Stefan Richter  于2023年6月19日周一 15:58写道:

>
> Hi Lijie,
>
> thanks for your response, I agree with what you said about points 2 and 3.
> Let me explain a bit more about point 1. This would not apply to all types
> of joins and my suggestion is also *not* to build a hash table only for the
> purpose to build the bloom filter.
> I was thinking about the scenario of a hash join, where you would build
> the hash table as part of the join algorithm anyways and then use the
> keyset of that hash table to 1) have better insights on about NDV and 2) be
> able to construct the bloom filter without duplicates and therefore faster.
> So the preconditions where I would use this is if you are building a hash
> table as part of the join and you know you are not building for a key
> column (because there would be no duplicates to eliminate). Then your bloom
> filter construction could benefit already from the deduplication work that
> was done for building the hash table.
>
> I also wanted to point out that besides bloom filter and IN filter you
> could also think of other types of filter that can become interesting for
> certain distributions and meta data. For example, if you have min/max
> information about columns and partitions you could have a bit vector
> represent equilibrium-sized ranges of the key space between min and max and
> have the bits represent what part of the range is present and push that
> information down to the scan.
>
> Best,
> Stefan
>
>
> > On 19. Jun 2023, at 08:26, Lijie Wang  wrote:
> >
> > Hi Stefan,
> >
> > Thanks for your feedback. Let me briefly summarize the optimization
> points
> > you mentioned above (Please correct me if I'm wrong):
> >
> > 1. Build an extra hash table for deduplication before building the bloom
> > filter.
> > 2. Use the two-phase approach to build the bloom filter(first local, then
> > OR-combine).
> > 3. Use blocked bloom filters to improve the cache efficiency.
> >
> > For the above 3 points, I have the following questions or opinions:
> >
> > For point 1, it seems that building a hash table also requires traversing
> > all build side data, and the overhead seems to be the same as building a
> > bloom filter directly? In addition, the hash table will take up more
> space
> > when the amount of data is large, which is why we choose to use bloom
> > filter instead of hash table.
> >
> > For point 2, I think it's a good idea to use the two-phase approach to
> > build the bloom filter. But rather than directly broadcasting the local
> > bloom filter to the probe side, I prefer to introduce a global node for
> the
> > OR-combine(like two-phase-agg[1]), then broadcast the combined bloom
> filter
> > to the probe side. The latter can reduce the amount of data transferred
> by
> > the network. I will change the FLIP like this.
> >
> > For point 3, I think it's a nice optimization, but I prefer to put it to
> > the future improvements. There is already an implementation of bloom
> filter
> > in flink, we can simply reuse it. Introducing a new bloom filter
> > implementation introduces some complexity  (we need to implement it, test
> > it, etc), and is not the focus of this FLIP.
> >
> > [1]
> >
> https://www.google.com/url?q=https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/%23local-global-aggregation&source=gmail-imap&ust=168776080400&usg=AOvVaw2eoXknGWmG4TSiznxtHFWG
> >
> > Best,
> > Lijie
> >
> > Stefan Richter  srich...@confluent.io.invalid>> 于2023年6月16日周五 16:45写道:
> >
> >> Hi,
> >>
> >> Thanks for the proposal of this feature! I have a question about the
> >> filter build and a some suggestions for potential improvements. First, I
> >> wonder why you 

Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-18 Thread Lijie Wang
Hi Stefan,

Thanks for your feedback. Let me briefly summarize the optimization points
you mentioned above (Please correct me if I'm wrong):

1. Build an extra hash table for deduplication before building the bloom
filter.
2. Use the two-phase approach to build the bloom filter(first local, then
OR-combine).
3. Use blocked bloom filters to improve the cache efficiency.

For the above 3 points, I have the following questions or opinions:

For point 1, it seems that building a hash table also requires traversing
all build side data, and the overhead seems to be the same as building a
bloom filter directly? In addition, the hash table will take up more space
when the amount of data is large, which is why we choose to use bloom
filter instead of hash table.

For point 2, I think it's a good idea to use the two-phase approach to
build the bloom filter. But rather than directly broadcasting the local
bloom filter to the probe side, I prefer to introduce a global node for the
OR-combine(like two-phase-agg[1]), then broadcast the combined bloom filter
to the probe side. The latter can reduce the amount of data transferred by
the network. I will change the FLIP like this.

For point 3, I think it's a nice optimization, but I prefer to put it to
the future improvements. There is already an implementation of bloom filter
in flink, we can simply reuse it. Introducing a new bloom filter
implementation introduces some complexity  (we need to implement it, test
it, etc), and is not the focus of this FLIP.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#local-global-aggregation

Best,
Lijie

Stefan Richter  于2023年6月16日周五 16:45写道:

> Hi,
>
> Thanks for the proposal of this feature! I have a question about the
> filter build and a some suggestions for potential improvements. First, I
> wonder why you suggest to run the filter builder as separate operator with
> parallelism 1. I’d suggest to integrate the filter distributed build with
> the hash table build phase as follows:
>
> 1. Build the hash table completely in each subtask.
> 2. The keyset of the hash table is giving us a precise NDV count for every
> subtask.
> 3. Build a filter from the subtask hash table. For low cardinality tables,
> I’d go with the suggested optimization of IN-filter.
> 4. Each build subtask transfers the local bloom filter to all probe
> operators.
> 5. On the probe operator we can either probe against the individual
> filters, or we OR-combine all subtask filters into aggregated bloom filter.
>
> I’m suggesting this because building inserting into a (larger) bloom
> filter can be costly, especially once the filter exceeds cache sizes and is
> therefor better parallelized. First inserting into the hash table also
> deduplicates the keys and we avoid inserting records twice into the bloom
> filter. If we want to improve cache efficiency for the build of larger
> filters, we could structure them as blocked bloom filters, where the filter
> is separated into blocks and all bits of one key go only into one block.
> That allows us to apply software managed buffering to first group keys that
> go into the same partition (ideally fitting into cache) and then bulk load
> partitions once we collected enough keys for one round of loading.
>
> Best,
> Stefan
>
>
>   <https://www.confluent.io/>
> Stefan Richter
> Principal Engineer II
>
> Follow us:  <
> https://www.confluent.io/blog?utm_source=footer&utm_medium=email&utm_campaign=ch.email-signature_type.community_content.blog>
> <https://twitter.com/ConfluentInc>
>
>
>
> > On 15. Jun 2023, at 13:35, Lijie Wang  wrote:
> >
> > Hi,  Benchao and Aitozi,
> >
> > Thanks for your feedback about this FLIP.
> >
> > @Benchao
> >
> >>> I think it would be reasonable to also support "pipeline shuffle" if
> > possible.
> > As I said above, runtime filter can work well with all shuffle mode,
> > including pipeline shuffle.
> >
> >>> if the RuntimeFIlterBuilder could be done quickly than RuntimeFilter
> > operator, it can still filter out additional data afterwards.
> > I think the main purpose of runtime filter is to reduce the shuffle data
> > and the data arriving at join. Although eagerly running the large
> > table side can process datas in advance, most of the data may be
> > irrelevant, causing huge shuffle overhead and slowing the join. In
> > addition, if the join is a hash-join, the probe side of the hash-join
> also
> > needs to wait for its build side to complete, so the large table side is
> > likely to be back-pressed.
> > In addition, I don't tend to add too many configuration options in the
> > first version, which may make it more d

Re: [VOTE] FLIP-287: Extend Sink#InitContext to expose TypeSerializer, ObjectReuse and JobID

2023-06-16 Thread Lijie Wang
+1 (binding)

Thanks for driving it, Joao.

Best,
Lijie

Joao Boto  于2023年6月16日周五 15:53写道:

> Hi all,
>
> Thank you to everyone for the feedback on FLIP-287[1]. Based on the
> discussion thread [2], we have come to a consensus on the design and are
> ready to take a vote to contribute this to Flink.
>
> I'd like to start a vote for it. The vote will be open for at least 72
> hours(excluding weekends, unless there is an objection or an insufficient
> number of votes.
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853
> [2]https://lists.apache.org/thread/wb3myhqsdz81h08ygwx057mkn1hc3s8f
>
>
> Best,
> Joao Boto
>


Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-15 Thread Lijie Wang
Hi,  Benchao and Aitozi,

Thanks for your feedback about this FLIP.

@Benchao

>> I think it would be reasonable to also support "pipeline shuffle" if
possible.
As I said above, runtime filter can work well with all shuffle mode,
including pipeline shuffle.

>> if the RuntimeFIlterBuilder could be done quickly than RuntimeFilter
operator, it can still filter out additional data afterwards.
I think the main purpose of runtime filter is to reduce the shuffle data
and the data arriving at join. Although eagerly running the large
table side can process datas in advance, most of the data may be
irrelevant, causing huge shuffle overhead and slowing the join. In
addition, if the join is a hash-join, the probe side of the hash-join also
needs to wait for its build side to complete, so the large table side is
likely to be back-pressed.
In addition, I don't tend to add too many configuration options in the
first version, which may make it more difficult to use (users need to
understand a lot of internal implementation details). Maybe it could be a
future improvement (if it's worthwhile)?


@Aitozi

>> IMO, In the current implementation two source table operators will be
executed simultaneously.
The example in FLIP uses blocking shuffle(I will add this point to FLIP).
The runtime filter is generally chained with the large table side to reduce
the shuffle data (as shown in Figure 2 of FLIP). The job vertices should be
scheduled in topological order, so the large table side can only be
scheduled after the RuntimeFilterBuilder finishes.

>> Are there some tests to show the default value of
table.optimizer.runtime-filter.min-probe-data-size 10G is a good default
value.
It's not tested yet, but it will be done before merge the code. The current
value refers to systems such as spark and hive. Before code merging, we
will test on TPC-DS 10 T to find an optimal set of values. If you have
relevant experience on it, welcome to give some suggestions.

>> What's the representation of the runtime filter node in planner ?
As shown in Figure 1 of FLIP, we intend to add two new physical nodes,
RuntimeFilterBuilder and RuntimeFilter.

Best,
Lijie

Aitozi  于2023年6月15日周四 15:52写道:

> Hi Lijie,
>
> Nice to see this valuable feature. After reading the FLIP I have some
> questions below:
>
> >Schedule the TableSource(dim) first.
>
> How does it know to schedule the TableSource(dim) first ? IMO, In the
> current implementation two source table operators will be executed
> simultaneously.
>
> >If the data volume on the probe side is too small, the overhead of
> building runtime filter is not worth it.
>
> Are there some tests to show the default value of
> table.optimizer.runtime-filter.min-probe-data-size 10G is a good default
> value. The same to table.optimizer.runtime-filter.max-build-data-size
>
> >the runtime filter can be pushed down along the probe side, as close to
> data sources as possible
>
> What's the representation of the runtime filter node in planner ? Is it a
> Filternode
>
> Best,
>
> Aitozi.
>
> Benchao Li  于2023年6月15日周四 14:30写道:
>
> > Hi Lijie,
> >
> > Regarding the shuffle mode, I think it would be reasonable to also
> support
> > "pipeline shuffle" if possible.
> >
> > "pipeline shuffle" is a essential for OLAP/MPP computing, although this
> has
> > not been much exposed to users for now, I know a few companies that uses
> > Flink as a MPP computing engine, and there is an ongoing effort[1] to
> make
> > this usage more powerful.
> >
> > Back to your concern that "Even if the RuntimeFilter becomes running
> before
> > the RuntimeFilterBuilder finished, it will not process any data and will
> > occupy resources", whether it benefits us depends on the scale of data,
> if
> > the RuntimeFIlterBuilder could be done quickly than RuntimeFilter
> operator,
> > it can still filter out additional data afterwards. Hence in my opinion,
> we
> > do not need to make the edge between RuntimeFilterBuilder and
> RuntimeFilter
> > BLOCKING only, at least it can be configured.
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-25318
> >
> > Lijie Wang  于2023年6月15日周四 14:18写道:
> >
> > > Hi Yuxia,
> > >
> > > I made a mistake in the above response.
> > >
> > > The runtime filter can work well with all shuffle mode. However, hybrid
> > > shuffle and blocking shuffle are currently recommended for batch jobs
> > > (piepline shuffle is not recommended).
> > >
> > > One more thing to mention here is that we will force the edge between
> > > RuntimeFilterBuilder and RuntimeFilter to be BLOCKING(reg

Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-14 Thread Lijie Wang
Hi Yuxia,

I made a mistake in the above response.

The runtime filter can work well with all shuffle mode. However, hybrid
shuffle and blocking shuffle are currently recommended for batch jobs
(piepline shuffle is not recommended).

One more thing to mention here is that we will force the edge between
RuntimeFilterBuilder and RuntimeFilter to be BLOCKING(regardless of which
BatchShuffleMode is set). Because the RuntimeFilter really doesn’t need to
run before the RuntimeFilterBuilder finished. Even if the RuntimeFilter
becomes running before the RuntimeFilterBuilder finished, it will not
process any data and will occupy resources.

Best,
Lijie

Lijie Wang  于2023年6月15日周四 09:48写道:

> Hi Yuxia,
>
> Thanks for your feedback. The answers of your questions are as follows:
>
> 1. Yes, the row count comes from statistic of underlying table(Or
> estimated based on the statistic of underlying table, if the build side or
> probe side is not TableScan).  If the statistic unavailable, we will not
> inject a runtime filter(As you said, we can hardly evaluate the benefits).
> Besides, AFAIK, the estimated data size of build side is also based on the
> row count statistics, that is, if the statistics is unavailable, the
> requirement "table.optimizer.runtime-filter.max-build-data-size" cannot be
> evaluated either. I'll add this point into FLIP.
>
> 2.
> Estimated data size does not meet requirement (in planner optimization
> phase) -> No filter
> Estimated data size meets the requirement (in planner optimization phase),
> but the real data size does not meet the requirement(in execution phase) ->
> Fake filter
>
> 3. Yes, the runtime filter is only for batch jobs/blocking shuffle.
>
> Best,
> Lijie
>
> yuxia  于2023年6月14日周三 20:37写道:
>
>> Thanks Lijie for starting this discussion. Excited to see runtime filter
>> is to be implemented in Flink.
>> I have few questions about it:
>>
>> 1: As the FLIP said, `if the ndv cannot be estimated, use row count
>> instead`. So, does row count comes from the statistic from underlying
>> table? What if the the statistic is also unavailable considering users
>> maynot always remember to generate statistic in production.
>> I'm wondering whether it make senese that just disable runtime filter if
>> statistic is unavailable since in that case, we can hardly evaluate the
>> benefits of runtime-filter.
>>
>>
>> 2: The FLIP said: "We will inject the runtime filters only if the
>> following requirements are met:xxx", but it also said, "Once this limit is
>> exceeded, it will output a fake filter(which always returns true)" in
>> `RuntimeFilterBuilderOperator` part; Seems they are contradictory, so i'm
>> wondering what's the real behavior, no filter will be injected or fake
>> filter?
>>
>>
>> 3: Does it also mean runtime-filter can only take effect in blocking
>> shuffle?
>>
>>
>>
>> Best regards,
>> Yuxia
>>
>> - 原始邮件 -
>> 发件人: "ron9 liu" 
>> 收件人: "dev" 
>> 发送时间: 星期三, 2023年 6 月 14日 下午 5:29:28
>> 主题: Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs
>>
>> Thanks Lijie start this discussion. Runtime Filter is a common
>> optimization
>> to improve the join performance that has been adopted by many computing
>> engines such as Spark, Doris, etc... Flink is a streaming batch computing
>> engine, and we are continuously optimizing the performance of batches.
>> Runtime filter is a general performance optimization technique that can
>> improve the performance of Flink batch jobs, so we are introducing it on
>> batch as well.
>>
>> Looking forward to all feedback.
>>
>> Best,
>> Ron
>>
>> Lijie Wang  于2023年6月14日周三 17:17写道:
>>
>> > Hi devs
>> >
>> > Ron Liu, Gen Luo and I would like to start a discussion about FLIP-324:
>> > Introduce Runtime Filter for Flink Batch Jobs[1]
>> >
>> > Runtime Filter is a common optimization to improve join performance. It
>> is
>> > designed to dynamically generate filter conditions for certain Join
>> queries
>> > at runtime to reduce the amount of scanned or shuffled data, avoid
>> > unnecessary I/O and network transmission, and speed up the query. Its
>> > working principle is building a filter(e.g. bloom filter) based on the
>> data
>> > on the small table side(build side) first, then pass this filter to the
>> > large table side(probe side) to filter the irrelevant data on it, this
>> can
>> > reduce the data reaching the join and improve performance.
>> >
>> > You can find more details in the FLIP-324[1]. Looking forward to your
>> > feedback.
>> >
>> > [1]
>> >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-324%3A+Introduce+Runtime+Filter+for+Flink+Batch+Jobs
>> >
>> > Best,
>> > Ron & Gen & Lijie
>> >
>>
>


Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-14 Thread Lijie Wang
Hi Yuxia,

Thanks for your feedback. The answers of your questions are as follows:

1. Yes, the row count comes from statistic of underlying table(Or estimated
based on the statistic of underlying table, if the build side or probe side
is not TableScan).  If the statistic unavailable, we will not inject a
runtime filter(As you said, we can hardly evaluate the benefits). Besides,
AFAIK, the estimated data size of build side is also based on the row count
statistics, that is, if the statistics is unavailable, the requirement
"table.optimizer.runtime-filter.max-build-data-size" cannot be evaluated
either. I'll add this point into FLIP.

2.
Estimated data size does not meet requirement (in planner optimization
phase) -> No filter
Estimated data size meets the requirement (in planner optimization phase),
but the real data size does not meet the requirement(in execution phase) ->
Fake filter

3. Yes, the runtime filter is only for batch jobs/blocking shuffle.

Best,
Lijie

yuxia  于2023年6月14日周三 20:37写道:

> Thanks Lijie for starting this discussion. Excited to see runtime filter
> is to be implemented in Flink.
> I have few questions about it:
>
> 1: As the FLIP said, `if the ndv cannot be estimated, use row count
> instead`. So, does row count comes from the statistic from underlying
> table? What if the the statistic is also unavailable considering users
> maynot always remember to generate statistic in production.
> I'm wondering whether it make senese that just disable runtime filter if
> statistic is unavailable since in that case, we can hardly evaluate the
> benefits of runtime-filter.
>
>
> 2: The FLIP said: "We will inject the runtime filters only if the
> following requirements are met:xxx", but it also said, "Once this limit is
> exceeded, it will output a fake filter(which always returns true)" in
> `RuntimeFilterBuilderOperator` part; Seems they are contradictory, so i'm
> wondering what's the real behavior, no filter will be injected or fake
> filter?
>
>
> 3: Does it also mean runtime-filter can only take effect in blocking
> shuffle?
>
>
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "ron9 liu" 
> 收件人: "dev" 
> 发送时间: 星期三, 2023年 6 月 14日 下午 5:29:28
> 主题: Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs
>
> Thanks Lijie start this discussion. Runtime Filter is a common optimization
> to improve the join performance that has been adopted by many computing
> engines such as Spark, Doris, etc... Flink is a streaming batch computing
> engine, and we are continuously optimizing the performance of batches.
> Runtime filter is a general performance optimization technique that can
> improve the performance of Flink batch jobs, so we are introducing it on
> batch as well.
>
> Looking forward to all feedback.
>
> Best,
> Ron
>
> Lijie Wang  于2023年6月14日周三 17:17写道:
>
> > Hi devs
> >
> > Ron Liu, Gen Luo and I would like to start a discussion about FLIP-324:
> > Introduce Runtime Filter for Flink Batch Jobs[1]
> >
> > Runtime Filter is a common optimization to improve join performance. It
> is
> > designed to dynamically generate filter conditions for certain Join
> queries
> > at runtime to reduce the amount of scanned or shuffled data, avoid
> > unnecessary I/O and network transmission, and speed up the query. Its
> > working principle is building a filter(e.g. bloom filter) based on the
> data
> > on the small table side(build side) first, then pass this filter to the
> > large table side(probe side) to filter the irrelevant data on it, this
> can
> > reduce the data reaching the join and improve performance.
> >
> > You can find more details in the FLIP-324[1]. Looking forward to your
> > feedback.
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-324%3A+Introduce+Runtime+Filter+for+Flink+Batch+Jobs
> >
> > Best,
> > Ron & Gen & Lijie
> >
>


[DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-14 Thread Lijie Wang
Hi devs

Ron Liu, Gen Luo and I would like to start a discussion about FLIP-324:
Introduce Runtime Filter for Flink Batch Jobs[1]

Runtime Filter is a common optimization to improve join performance. It is
designed to dynamically generate filter conditions for certain Join queries
at runtime to reduce the amount of scanned or shuffled data, avoid
unnecessary I/O and network transmission, and speed up the query. Its
working principle is building a filter(e.g. bloom filter) based on the data
on the small table side(build side) first, then pass this filter to the
large table side(probe side) to filter the irrelevant data on it, this can
reduce the data reaching the join and improve performance.

You can find more details in the FLIP-324[1]. Looking forward to your
feedback.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-324%3A+Introduce+Runtime+Filter+for+Flink+Batch+Jobs

Best,
Ron & Gen & Lijie


[jira] [Created] (FLINK-32244) flink-master-benchmarks-regression-check always fails since 2023.05.30

2023-06-01 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-32244:
--

 Summary: flink-master-benchmarks-regression-check always fails 
since 2023.05.30
 Key: FLINK-32244
 URL: https://issues.apache.org/jira/browse/FLINK-32244
 Project: Flink
  Issue Type: Bug
  Components: Benchmarks
Affects Versions: 1.18.0
Reporter: Lijie Wang


Since 2023.05.30, the flink-master-benchmarks-regression-check always fail:

[http://codespeed.dak8s.net:8080/job/flink-master-benchmarks-regression-check/1631|http://codespeed.dak8s.net:8080/job/flink-master-benchmarks-regression-check/1631/]

[http://codespeed.dak8s.net:8080/job/flink-master-benchmarks-regression-check/1632|http://codespeed.dak8s.net:8080/job/flink-master-benchmarks-regression-check/1631/]

[http://codespeed.dak8s.net:8080/job/flink-master-benchmarks-regression-check/1633]
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-05-23 Thread Lijie Wang
Hi Joao,

I noticed the FLIP currently contains the following 2 methods about type
serializer:

(1)  TypeSerializer createInputSerializer();
(2)  TypeSerializer createSerializer(TypeInformation inType);

Is the method (2) still needed now?

Best,
Lijie

João Boto  于2023年5月19日周五 16:53写道:

> Updated the FLIP to use this option.
>


Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-05-17 Thread Lijie Wang
Hi,

+1 for `InitContext#createInputSerializer()` .

I think we can get the serializer directly in InitContextImpl through
`getOperatorConfig().getTypeSerializerIn(0,
getUserCodeClassloader()).duplicate()`.

Best,
Lijie.

João Boto  于2023年4月24日周一 19:04写道:

> Hi @Gordon,
>
> `InitContext#createInputSerializer()` is a great option and will solve
> more than one problem, but I cant find a way to get the TypeInformation
> on InitContextImpl (I can be missing something)
>
> On current (legacy) implementations we rely on interface ´
> InputTypeConfigurable´ to get the TypeInformation but this will not work
> for Sink2 as is not implemented (DataStream.sinkTo vs DataStream.addSink)
> As a side note, the ExecutionConfig provided by this interface could not
> be used as can be changed after the call is made, for Table Planning for
> example on DefaultExecutor.configureBatchSpecificProperties()
>
> At the end what we need to do is something like:
> if (isObjectReuseEnabled()) serializer.copy(record) else record;
>
> So responding to your question, yes last option is ok for this but I dont
> see how to implementing it as Im missing the TypeInformation on
> InitContextImpl.
>
> Best regards,
>
> On 2023/04/21 15:04:24 "Tzu-Li (Gordon) Tai" wrote:
> > Do we have to introduce
> `InitContext#createSerializer(TypeInformation)`
> > which returns TypeSerializer, or is it sufficient to only provide
> > `InitContext#createInputSerializer()` which returns TypeSerializer?
> >
> > I had the impression that buffering sinks like JDBC only need the
> > latter. @Joao, could you confirm?
> >
> > If that's the case, +1 to adding the following method signatures to
> > InitContext:
> > * TypeSerializer createInputSerializer()
> > * boolean isObjectReuseEnabled()
> >
> > Thanks,
> > Gordon
> >
> > On Fri, Apr 21, 2023 at 3:04 AM Zhu Zhu  wrote:
> >
> > > Good point! @Gordon
> > > Introducing an `InitContext#createSerializer(TypeInformation)` looks a
> > > better option to me, so we do not need to introduce an unmodifiable
> > > `ExecutionConfig` at this moment.
> > >
> > > Hope that we can make `ExecutionConfig` a read-only interface in
> > > Flink 2.0. It is exposed in `RuntimeContext` to user functions already,
> > > while mutating the values at runtime is actually an undefined behavior.
> > >
> > > Thanks,
> > > Zhu
> > >
> > > Tzu-Li (Gordon) Tai  于2023年4月18日周二 01:02写道:
> > > >
> > > > Hi,
> > > >
> > > > Sorry for chiming in late.
> > > >
> > > > I'm not so sure that exposing ExecutionConfig / ReadExecutionConfig
> > > > directly through Sink#InitContext is the right thing to do.
> > > >
> > > > 1. A lot of the read-only getter methods on ExecutionConfig are
> > > irrelevant
> > > > for sinks. Expanding the scope of the InitContext interface with so
> many
> > > > irrelevant methods is probably going to make writing unit tests a
> pain.
> > > >
> > > > 2. There's actually a few getter methods on `InitContext` that have
> > > > duplicate/redundant info for what ExecutionConfig exposes. For
> example,
> > > > InitContext#getNumberOfParallelSubtasks and
> InitContext#getAttemptNumber
> > > > currently exist and it can be confusing if users find 2 sources of
> that
> > > > information (either via the `InitContext` and via the wrapped
> > > > `ExecutionConfig`).
> > > >
> > > > All in all, it feels like `Sink#InitContext` was introduced
> initially as
> > > a
> > > > means to selectively only expose certain information to sinks.
> > > >
> > > > It looks like right now, the only requirement is that some sinks
> require
> > > 1)
> > > > isObjectReuseEnabled, and 2) TypeSerializer for the input type.
> Would it
> > > > make sense to follow the original intent and only selectively expose
> > > these?
> > > > For 1), we can just add a new method to `InitContext` and forward the
> > > > information from `ExecutionConfig` accessible at the operator level.
> > > > For 2), would it make sense to create the serializer at the operator
> > > level
> > > > and then provide it through `InitContext`?
> > > >
> > > > Thanks,
> > > > Gordon
> > > >
> > > > On Mon, Apr 17, 2023 at 8:23 AM Zhu Zhu  wrote:
> > > >
> > > > > We can let the `InitContext` return `ExecutionConfig` in the
> interface.
> > > > > However, a `ReadableExecutionConfig` implementation should be
> returned
> > > > > so that exceptions will be thrown if users tries to modify the
> > > > > `ExecutionConfig`.
> > > > >
> > > > > We can rework all the setters of `ExecutionConfig` to internally
> > > invoke a
> > > > > `setConfiguration(...)` method. Then the `ReadableExecutionConfig`
> can
> > > > > just override that method. But pay attention to a few exceptional
> > > > > setters, i.e. those for globalJobParameters and serializers.
> > > > >
> > > > > We should also explicitly state in the documentation of
> > > > > `InitContext #getExecutionConfig()`, that the returned
> > > `ExecutionConfig`
> > > > > is unmodifiable.
> > > > >
> > > > > Thanks,
> > > > > Zhu
> > > > >
> > > > > João B

Re: [DISCUSS] FLIP-229: Prometheus Sink Connector

2023-05-15 Thread Lijie Wang
Hi Karthi,

I think you are using a wrong FLIP id, the FLIP-229 has already be used[1].

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job

Best,
Lijie

Martijn Visser  于2023年5月16日周二 04:44写道:

> Hi Karthi,
>
> Thanks for the FLIP and opening up the discussion. My main question is: why
> should we create a separate connector and not use and/or improve the
> existing integrations with Prometheus? I would like to understand more so
> that it can be added to the motivation of the FLIP.
>
> Best regards,
>
> Martijn
>
> On Mon, May 15, 2023 at 6:03 PM Karthi Thyagarajan 
> wrote:
>
> > Hello all,
> >
> > We would like to start a discussion thread on FLIP-229: Prometheus Sink
> > Connector [1] where we propose to provide a sink connector for Prometheus
> > [2] based on the Async Sink [3]. Looking forward to comments and
> feedback.
> > Thank you.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Prometheus+Sink+Connector
> > [2] https://prometheus.io/
> > [3]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
> >
>


Re: [DISCUSS] Release Flink 1.17.1

2023-05-15 Thread Lijie Wang
+1 for the release.

Best,
Lijie

Jing Ge  于2023年5月15日周一 17:07写道:

> +1 for releasing 1.17.1
>
> Best Regards,
> Jing
>
> On Thu, May 11, 2023 at 10:03 AM Martijn Visser 
> wrote:
>
> > +1, thanks for volunteering!
> >
> > On Thu, May 11, 2023 at 9:23 AM Xintong Song 
> > wrote:
> >
> > > +1
> > >
> > > I'll help with the steps that require PMC privileges.
> > >
> > > Best,
> > >
> > > Xintong
> > >
> > >
> > >
> > > On Thu, May 11, 2023 at 3:12 PM Jingsong Li 
> > > wrote:
> > >
> > > > +1 for releasing 1.17.1
> > > >
> > > > Best,
> > > > Jingsong
> > > >
> > > > On Thu, May 11, 2023 at 1:29 PM Gyula Fóra 
> > wrote:
> > > > >
> > > > > +1 for the release
> > > > >
> > > > > Gyula
> > > > >
> > > > > On Thu, 11 May 2023 at 05:35, Yun Tang  wrote:
> > > > >
> > > > > > +1 for release flink-1.17.1
> > > > > >
> > > > > > The blocker issue might cause silent incorrect data, it's better
> to
> > > > have a
> > > > > > fix release ASAP.
> > > > > >
> > > > > >
> > > > > > Best
> > > > > > Yun Tang
> > > > > > 
> > > > > > From: weijie guo 
> > > > > > Sent: Thursday, May 11, 2023 11:08
> > > > > > To: dev@flink.apache.org ;
> > > tonysong...@gmail.com
> > > > <
> > > > > > tonysong...@gmail.com>
> > > > > > Subject: [DISCUSS] Release Flink 1.17.1
> > > > > >
> > > > > > Hi all,
> > > > > >
> > > > > >
> > > > > > I would like to discuss creating a new 1.17 patch release
> (1.17.1).
> > > The
> > > > > > last 1.17 release is nearly two months old, and since then, 66
> > > tickets
> > > > have
> > > > > > been closed [1], of which 14 are blocker/critical [2].  Some of
> > them
> > > > are
> > > > > > quite important, such as FLINK-31293 [3] and  FLINK-32027 [4].
> > > > > >
> > > > > >
> > > > > > I am not aware of any unresolved blockers and there are no
> > > in-progress
> > > > > > tickets [5].
> > > > > > Please let me know if there are any issues you'd like to be
> > included
> > > in
> > > > > > this release but still not merged.
> > > > > >
> > > > > >
> > > > > > If the community agrees to create this new patch release, I could
> > > > > > volunteer as the release manager
> > > > > >  and Xintong can help with actions that require a PMC role.
> > > > > >
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Weijie
> > > > > >
> > > > > >
> > > > > > [1]
> > > > > >
> > > > > >
> > > >
> > >
> >
> https://issues.apache.org/jira/browse/FLINK-32027?jql=project%20%3D%20FLINK%20AND%20fixVersion%20%3D%201.17.1%20%20and%20resolution%20%20!%3D%20%20Unresolved%20order%20by%20priority%20DESC
> > > > > >
> > > > > > [2]
> > > > > >
> > > > > >
> > > >
> > >
> >
> https://issues.apache.org/jira/browse/FLINK-31273?jql=project%20%3D%20FLINK%20AND%20fixVersion%20%3D%201.17.1%20and%20resolution%20%20!%3D%20Unresolved%20%20and%20priority%20in%20(Blocker%2C%20Critical)%20ORDER%20by%20priority%20%20DESC
> > > > > >
> > > > > > [3] https://issues.apache.org/jira/browse/FLINK-31293
> > > > > >
> > > > > > [4] https://issues.apache.org/jira/browse/FLINK-32027
> > > > > >
> > > > > > [5]
> > https://issues.apache.org/jira/projects/FLINK/versions/12352886
> > > > > >
> > > >
> > >
> >
>


[jira] [Created] (FLINK-32048) DecimalITCase.testAggMinGroupBy fails with "Insufficient number of network buffers"

2023-05-10 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-32048:
--

 Summary: DecimalITCase.testAggMinGroupBy fails with "Insufficient 
number of network buffers"
 Key: FLINK-32048
 URL: https://issues.apache.org/jira/browse/FLINK-32048
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner, Tests
Affects Versions: 1.18.0
Reporter: Lijie Wang


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48855&view=logs&j=0c940707-2659-5648-cbe6-a1ad63045f0a&t=075c2716-8010-5565-fe08-3c4bb45824a4]
{code:java}
May 10 09:37:41 Caused by: java.io.IOException: Insufficient number of network 
buffers: required 1, but only 0 available. The total number of network buffers 
is currently set to 2048 of 32768 bytes each. You can increase this number by 
setting the configuration keys 'taskmanager.memory.network.fraction', 
'taskmanager.memory.network.min', and 'taskmanager.memory.network.max'.
May 10 09:37:41 at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalCreateBufferPool(NetworkBufferPool.java:495)
May 10 09:37:41 at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:456)
May 10 09:37:41 at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory.lambda$createBufferPoolFactory$3(SingleInputGateFactory.java:330)
May 10 09:37:41 at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:274)
May 10 09:37:41 at 
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:105)
May 10 09:37:41 at 
org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:969)
May 10 09:37:41 at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:654)
May 10 09:37:41 at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
May 10 09:37:41 at java.lang.Thread.run(Thread.java:748)
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Preventing Mockito usage for the new code with Checkstyle

2023-04-25 Thread Lijie Wang
Thanks for driving this. +1 for the proposal.

Can we also prevent Junit4 usage in new code by this way?Because currently
we are aiming to migrate our codebase to JUnit 5.

Best,
Lijie

Piotr Nowojski  于2023年4月25日周二 23:02写道:

> Ok, thanks for the clarification.
>
> Piotrek
>
> wt., 25 kwi 2023 o 16:38 Chesnay Schepler  napisał(a):
>
> > The checkstyle rule would just ban certain imports.
> > We'd add exclusions for all existing usages as we did when introducing
> > other rules.
> > So far we usually disabled checkstyle rules for a specific files.
> >
> > On 25/04/2023 16:34, Piotr Nowojski wrote:
> > > +1 to the idea.
> > >
> > > How would this checkstyle rule work? Are you suggesting to start with a
> > > number of exclusions? On what level will those exclusions be? Per file?
> > Per
> > > line?
> > >
> > > Best,
> > > Piotrek
> > >
> > > wt., 25 kwi 2023 o 13:18 David Morávek  napisał(a):
> > >
> > >> Hi Everyone,
> > >>
> > >> A long time ago, the community decided not to use Mockito-based tests
> > >> because those are hard to maintain. This is already baked in our Code
> > Style
> > >> and Quality Guide [1].
> > >>
> > >> Because we still have Mockito imported into the code base, it's very
> > easy
> > >> for newcomers to unconsciously introduce new tests violating the code
> > style
> > >> because they're unaware of the decision.
> > >>
> > >> I propose to prevent Mockito usage with a Checkstyle rule for a new
> > code,
> > >> which would eventually allow us to eliminate it. This could also
> prevent
> > >> some wasted work and unnecessary feedback cycles during reviews.
> > >>
> > >> WDYT?
> > >>
> > >> [1]
> > >>
> > >>
> >
> https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#avoid-mockito---use-reusable-test-implementations
> > >>
> > >> Best,
> > >> D.
> > >>
> >
> >
>


[jira] [Created] (FLINK-31651) Improve logging of granting/revoking leadership in JobMasterServiceLeadershipRunner to INFO level

2023-03-28 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-31651:
--

 Summary: Improve logging of granting/revoking leadership in 
JobMasterServiceLeadershipRunner to INFO level
 Key: FLINK-31651
 URL: https://issues.apache.org/jira/browse/FLINK-31651
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.15.4, 1.16.1, 1.17.0
Reporter: Lijie Wang


Currently, the log level of granting/revoking leadership of 
JobMasterServiceLeadershipRunner is DEBUG. However, we usually configure it to 
INFO level in production jobs, which make it hard to understand the behaviour 
from Flink's logs because JobMasterServiceLeadershipRunner may suddenly be 
stopped. I suggest to improve the logging to INFO level.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31650) Incorrect busyMsTimePerSecond metric value for FINISHED task

2023-03-28 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-31650:
--

 Summary: Incorrect busyMsTimePerSecond metric value for FINISHED 
task
 Key: FLINK-31650
 URL: https://issues.apache.org/jira/browse/FLINK-31650
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Metrics, Runtime / REST
Affects Versions: 1.16.1, 1.17.0
Reporter: Lijie Wang
 Attachments: busyMsTimePerSecond.png

As shown in the figure below, the busyMsTimePerSecond of the FINISHED task is 
100%, which is obviously unreasonable.
!busyMsTimePerSecond.png|width=827,height=341!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Use 1.15.4 as the final patch version of Flink 1.15 and drop 1.15 support

2023-03-24 Thread Lijie Wang
Thanks for driving this, +1 for dropping 1.15 support.

Best,
Lijie

Leonard Xu  于2023年3月24日周五 17:58写道:

> +1 as 1.17.0 is available.
>
> Best,
> Leonard
>
> > On Mar 24, 2023, at 5:50 PM, Jing Ge  wrote:
> >
> > Thanks Qingsheng for driving this. +1 since there are no changes after
> 1.15.4
> >
> > Best regards,
> > JIng
> >
> > On Fri, Mar 24, 2023 at 10:34 AM Qingsheng Ren  > wrote:
> > Hi devs,
> >
> > I'd like to start a discussion about marking Flink 1.15 as an end-of-life
> > version, and take 1.15.4 directly as the final patch version for Flink
> > 1.15.
> >
> > As discussed in the mailing list before [1], once a minor version is
> > released (1.17 now), we need to release a final patch version (1.15.5)
> for
> > the minor version losing support (1.15). We release managers of 1.17
> made a
> > check and found no changes after 1.15.4. So hereby I'd like to propose
> that
> > we use 1.15.4 directly as the final patch version of 1.15, and drop
> support
> > for 1.15 from now on.
> >
> > Looking forward to your feedback!
> >
> > [1] https://lists.apache.org/thread/szq23kr3rlkm80rw7k9n95js5vqpsnbv <
> https://lists.apache.org/thread/szq23kr3rlkm80rw7k9n95js5vqpsnbv>
> >
> > Best Regards,
> > Qingsheng
>
>


Re: [DISCUSS] FLIP-304: Pluggable failure handling for Apache Flink

2023-03-20 Thread Lijie Wang
Hi Panagiotis,

Thanks for driving this.

+1 for supporting custom restart strategy, we did receive such requests
from the user mailing list [1][2].

Besides, in current design, the plugin will only do some statistical and
classification work, and will not affect the *FailureHandlingResult*. Just
listening, no handling, it doesn't quite match the title.

[1] https://lists.apache.org/thread/ch3s4jhh09wnff3tscqnb6btp2zlp2r1
[2] https://lists.apache.org/thread/lwjfdr7c1ypo77r4rwojdk7kxx2sw4sx

Best,
Lijie

Zhu Zhu  于2023年3月20日周一 21:39写道:

> Hi Panagiotis,
>
> Thanks for creating this proposal! It's good to enable Flink to handle
> different errors in different ways, through a pluggable way.
>
> There are requests for flexible restart strategies from time to time, for
> different strategies of restart backoff time, or to suppress restarting
> on certain errors. Therefore, I think it's better that the proposed
> failure handling plugin can also support custom restart strategies.
>
> Maybe we can call it FailureHandlingAdvisor which provides more
> information (labels) and gives advice (restart backoff time, whether
> to restart)? I do not have a strong opinion though, any explanatory
> name would be good.
>
> To avoid unexpected mutation, how about to make the context immutable
> and let the plugin return an immutable result? i.e. remove the setters
> from the context, and let the plugin method return a result which
> contains `labels`, `canRestart` and `restartBackoffTime`. Flink should
> apply the result to the context before invoking the next plugin, so
> that the next plugin will see the updated context.
>
> The plugin should avoid taking too much time to return the result, because
> it will block the RPC and result in instability. However, it can still
> perform heavy actions in a different thread. The context can provide an
> `ioExecutor` to the plugins for reuse.
>
> Thanks,
> Zhu
>
> Shammon FY  于2023年3月20日周一 20:21写道:
> >
> > Hi Panagiotis
> >
> > Thank you for your answer. I agree that `FailureListener` could be
> > stateless, then I have some thoughts as follows
> >
> > 1. I see that listeners and tag collections are associated. When
> JobManager
> > fails and restarts, how can the new listener be associated with the tag
> > collection before failover? Is the listener loading order?
> >
> > 2. The tag collection may be too large, resulting in the JobManager OOM,
> do
> > we need to provide a management class that supports some obsolescence
> > strategies instead of a direct Collection?
> >
> > 3. Is it possible to provide a more complex data structure than a simple
> > string collection for tags in listeners, such as key-value?
> >
> > Best,
> > Shammon FY
> >
> >
> > On Mon, Mar 20, 2023 at 7:48 PM Leonard Xu  wrote:
> >
> > > Hi,Panagiotis
> > >
> > >
> > > Thank you for kicking off this discussion. Overall, the proposed
> feature of
> > > this FLIP makes sense to me. We have also discussed similar
> requirements
> > > with our users and developers, and I believe it will help many users.
> > >
> > >
> > > In terms of FLIP content, I have some thoughts:
> > >
> > > (1) For the FailureListenerContextget interface, the methods
> > > FailureListenerContext#addTag and FailureListenerContextgetTags looks
> very
> > > inconsistent because they imply specific implementation details, and
> not
> > > all FailureListeners need to handle them, we shouldn't put them in the
> > > interface. Minor: The comment "UDF loading" in the getUserClassLoader()
> > > method looks like a typo, IIUC it should return the classloader of the
> > > current job.
> > >
> > > (2) Regarding the implementation in
> ExecutionFailureHandler#handleFailure,
> > > some custom listeners may have heavy IO operations, such as reporting
> to
> > > their monitoring system. The current logic appears to be processing in
> the
> > > JobMaster's main thread, and it is recommended not to do this kind of
> > > processing in the main thread.
> > >
> > > (3) The results of FailureListener's processing and the
> > > FailureHandlingResult returned by ExecutionFailureHandler are not
> related.
> > > I think these two are closely related, the motivation of this FLIP is
> to
> > > make current failure handling more flexible. From this perspective,
> > > different listeners should have the opportunity to affect the job's
> failure
> > > handling flow. For example, a Flink job is configured with a
> > > RestartStrategy with huge numbers retry , but the Kafka topic of
> Source has
> > > been deleted, the job will failover continuously. In this case, the
> user
> > > should have their listener to determine whether this failure is
> recoverable
> > > or unrecoverable, and then wrap the processing result into
> > > FailureHandlingResult.unrecoverable(xx) and pass it to JobMaster, this
> > > approach will be more flexible.
> > >
> > > (4) All FLIPs have an important section named Public Interfaces.
> Current
> > > FLIP mixes the interface section and the implement

Re: [VOTE] Release 1.17.0, release candidate #3

2023-03-18 Thread Lijie Wang
+1 (binding)

 -  Built from the source code
 -  Verified the signature and checksum
 -  Ran both streaming/batch jobs on yarn cluster
 -  Checked the website PR

Best,
Lijie

Qingsheng Ren  于2023年3月17日周五 22:02写道:

> Hi everyone,
>
> Please review and vote on the release candidate #3 for the version 1.17.0,
> as follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
> The complete staging area is available for your review, which includes:
>
> * JIRA release notes [1], and the pull request adding release note for
> users [2]
> * the official Apache source release and binary convenience releases to be
> deployed to dist.apache.org [3], which are signed with the key with
> fingerprint A1BD477F79D036D2C30CA7DBCA8AEEC2F6EB040B [4],
> * all artifacts to be deployed to the Maven Central Repository [5],
> * source code tag "release-1.17.0-rc3" [6],
> * website pull request listing the new release and adding announcement blog
> post [7].
>
> The vote will be open for at least 72 hours. It is adopted by majority
> approval, with at least 3 PMC affirmative votes.
>
> [1]
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12351585
> [2] https://github.com/apache/flink/pull/22146
> [3] https://dist.apache.org/repos/dist/dev/flink/flink-1.17.0-rc3/
> [4] https://dist.apache.org/repos/dist/release/flink/KEYS
> [5] https://repository.apache.org/content/repositories/orgapacheflink-1600
> [6] https://github.com/apache/flink/releases/tag/release-1.17.0-rc3
> [7] https://github.com/apache/flink-web/pull/618
>
> Thanks,
> Martijn and Matthias, Leonard and Qingsheng
>


[jira] [Created] (FLINK-31114) Batch job fails with IllegalStateException when using adaptive batch scheduler

2023-02-16 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-31114:
--

 Summary: Batch job fails with IllegalStateException when using 
adaptive batch scheduler
 Key: FLINK-31114
 URL: https://issues.apache.org/jira/browse/FLINK-31114
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Reporter: Lijie Wang
Assignee: Lijie Wang
 Fix For: 1.17.0


This is caused by FLINK-30942. Currently, if two job vertices have the same 
input and the same parallelism(even the parallelism is -1), they will share 
partitions. However after FLINK-30942, the scheduler may change the job 
vertices' parallelism before scheduling, resulting in two job vertices having 
the same parallelism in  compilation phase (in which case will share 
partitions), but different parallelism in the scheduling phase, and then cause 
the following exception:

{code:java}
Caused by: java.util.concurrent.CompletionException: 
java.lang.IllegalStateException: Consumers must have the same max parallelism.
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:975)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
... 37 more
Caused by: java.lang.IllegalStateException: Consumers must have the same max 
parallelism.
at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
at 
org.apache.flink.runtime.executiongraph.IntermediateResult.getConsumersMaxParallelism(IntermediateResult.java:219)
at 
org.apache.flink.runtime.executiongraph.Execution.getPartitionMaxParallelism(Execution.java:501)
at 
org.apache.flink.runtime.executiongraph.Execution.registerProducedPartitions(Execution.java:472)
at 
org.apache.flink.runtime.executiongraph.Execution.registerProducedPartitions(Execution.java:431)
at 
org.apache.flink.runtime.scheduler.DefaultExecutionDeployer.lambda$registerProducedPartitions$5(DefaultExecutionDeployer.java:277)
at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
... 38 more
{code}

Putting the following test into {{AdaptiveBatchSchedulerITCase}} can reproduce 
the problem:

{code:java}
@Test
void testDifferentConsumerParallelism() throws Exception {
final Configuration configuration = createConfiguration();
final StreamExecutionEnvironment env =

StreamExecutionEnvironment.createLocalEnvironment(configuration);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(8);

final DataStream source1 =
env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
.setParallelism(8)
.name("source1")
.slotSharingGroup("group1");

final DataStream source2 =
env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
.setParallelism(8)
.name("source2")
.slotSharingGroup("group2");

source1.forward()
.union(source2)
.map(new NumberCounter())
.name("map1")
.slotSharingGroup("group3");

source2.map(new 
NumberCounter()).name("map2").slotSharingGroup("group4");

env.execute();
}
{code}





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31079) Release Testing: Verify FLINK-29663 Further improvements of adaptive batch scheduler

2023-02-14 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-31079:
--

 Summary: Release Testing: Verify FLINK-29663 Further improvements 
of adaptive batch scheduler
 Key: FLINK-31079
 URL: https://issues.apache.org/jira/browse/FLINK-31079
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Lijie Wang
 Fix For: 1.17.0


This task aims to verify FLINK-29663 which improves the adaptive batch 
scheduler.

Before the change of FLINK-29663, adaptive batch scheduler will distribute 
subpartitoins according to the number of subpartitions, make different 
downstream subtasks consume roughly the same number of subpartitions. This will 
lead to imbalance loads of different downstream tasks when the subpartitions 
contain different amounts of data.

To solve this problem, in FLINK-29663, we let the adaptive batch scheduler 
distribute subpartitoins according to the amount of data, so that different 
downstream subtasks consume roughly the same amount of data. Note that 
currently it only takes effect for All-To-All edges.

The documentation of adaptive scheduler can be found 
[here|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#adaptive-batch-scheduler]

One can verify it by creating intended data skew on All-To-All edges.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31071) Release Testing: Verify FLIP-283 Use adaptive batch scheduler as default scheduler for batch jobs

2023-02-14 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-31071:
--

 Summary: Release Testing: Verify FLIP-283 Use adaptive batch 
scheduler as default scheduler for batch jobs
 Key: FLINK-31071
 URL: https://issues.apache.org/jira/browse/FLINK-31071
 Project: Flink
  Issue Type: Sub-task
Reporter: Lijie Wang


This task aims to verify [FLIP-283 Use adaptive batch scheduler as default 
scheduler for batch jobs|https://issues.apache.org/jira/browse/FLINK-30682].
The documentation of adaptive batch scheduler can be found 
[here|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#adaptive-batch-scheduler]
 .

Things to verify:
1. Verify the adaptive batch scheduler is the default scheduler of batch jobs. 
By default, Flink will automatically decide parallelism for operators of batch 
jobs if the scheduler is not specified.
2. Verify the configuration 
options(execution.batch.adaptive.auto-parallelism.xxx) are in effect. Besides, 
it is also necessary to verify that the default parallelism set via 
parallelism.default or StreamExecutionEnvironment#setParallelism() will be used 
as upper bound of allowed parallelism if the 
execution.batch.adaptive.auto-parallelism.max-parallelism is not configured.
3. Verify the final data produced are correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [ANNOUNCE] New Apache Flink Committer - Jing Ge

2023-02-14 Thread Lijie Wang
Congratulations, Jing Ge !

Best,
Lijie

Sergey Nuyanzin  于2023年2月14日周二 21:47写道:

> Congratulations, Jing Ge!
>
> On Tue, Feb 14, 2023 at 2:47 PM Rui Fan  wrote:
>
> > Congratulations, Jing!
> >
> > Best,
> > Rui Fan
> >
> > On Tue, Feb 14, 2023 at 19:36 Yuepeng Pan  wrote:
> >
> > > Congratulations, Jing Ge !
> > >
> > > Best,Yuepeng Pan
> > >
> > >
> > >
> > >
> > >
> > >
> > > At 2023-02-14 15:49:24, "godfrey he"  wrote:
> > > >Hi everyone,
> > > >
> > > >On behalf of the PMC, I'm very happy to announce Jing Ge as a new
> Flink
> > > >committer.
> > > >
> > > >Jing has been consistently contributing to the project for over 1
> year.
> > > >He authored more than 50 PRs and reviewed more than 40 PRs
> > > >with mainly focus on connector, test, and document modules.
> > > >He was very active on the mailing list (more than 90 threads) last
> year,
> > > >which includes participating in a lot of dev discussions (30+),
> > > >providing many effective suggestions for FLIPs and answering
> > > >many user questions. He was the Flink Forward 2022 keynote speaker
> > > >to help promote Flink and  a trainer for Flink troubleshooting and
> > > performance
> > > >tuning of Flink Forward 2022 training program.
> > > >
> > > >Please join me in congratulating Jing for becoming a Flink committer!
> > > >
> > > >Best,
> > > >Godfrey
> > >
> >
>
>
> --
> Best regards,
> Sergey
>


[jira] [Created] (FLINK-31055) The dynamic flag of stream graph does not take effect when translating the transformations

2023-02-13 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-31055:
--

 Summary: The dynamic flag of stream graph does not take effect 
when translating the transformations
 Key: FLINK-31055
 URL: https://issues.apache.org/jira/browse/FLINK-31055
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Reporter: Lijie Wang
 Fix For: 1.17.0


Currently, the dynamic flag of stream graph is not set when [translate 
transformations|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java#L324].
 However, the dynamic flag will be used 
([here|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java#L696])
 when translating, we should set the dynamic flag before the translating.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-02-12 Thread Lijie Wang
Hi Konstantin,

I checked the usage of ExecutionConfig in Kinesis and KafKa sinks:
- Kinesis sink: ExecutionConfig is not used by Kinesis sink. The one that
uses getAutoWatermarkInterval is the Kinesis Source, Joao may have made a
mistake.
- Kafka sink: isObjectReuseEnabled and ExecutionConfig are used in upsert
kafka table sink. The upsert kafka table sink obtains the ExecutionConfig
through DataStreamSinkProvider, however, this way cannot be used for
datastream sink and other SinkRuntimeProviders.

Besides, I know that all the jdbc sinks(DataStream/Table) need the
isObjectReuseEnabled and ExecutionConfig. The jdbc sink will buffer the
records received and only flush them out when the buffer is full or a
periodic timer is triggered or a checkpoint happens. The jdbc sinks will
decide whether to buffer the copies or the original records based on
isObjectReuseEnabled, when the object reuse is enabled, we should buffer
the copies(because the content of the objects may be changed before flush),
otherwise we should buffer the original records. And it needs the
ExecutionConfig to create TypeSerializer to copy the records.

Actaully, the upsert kafka table sink is similar to jdbc sink, I think all
the sinks that with the "buffer records" behavior needs the
isObjectReuseEnabled and ExecutionConfig.

Best,
Lijie

Konstantin Knauf  于2023年2月4日周六 01:41写道:

> Hi everyone,
>
> if I am not mistaken of the sinks mentioned by Joao Kafka, Kinesis &
> Kinesis already use the Sink2 API. How were those implemented without
> exposing the ExecutionConfig?
>
> Best,
>
> Konstantin
>
>
> Am Mi., 1. Feb. 2023 um 12:28 Uhr schrieb Lijie Wang <
> wangdachui9...@gmail.com>:
>
> > +1 for Option 2, if we can abstract an "ReadableExecutionConfig"
> > interface(contains all is/get mehtod), and let ExecutionConfig implements
> > ReadableExecutionConfig
> >
> > Best,
> > Lijie
> >
> > João Boto  于2023年1月17日周二 20:39写道:
> >
> > > Hi all,
> > >
> > > As establish a read-only contract seems to be consensual approach,
> > talking
> > > to Lijie we saw two ways for doing this..
> > >
> > > Option 1: UnmodifiableExecutionConfig that extends ExecutionConfig
> (just
> > > like the UnmodifiableConfiguration)
> > > Pros:
> > > - we have all the get methods
> > > - don't need to change TypeInformation.createSerializer(ExecutionConfig
> > > config)
> > > Cons:
> > > - we have to override 34 methods that modify things..
> > > - new methods to ExecutionConfig will need to be override on
> > > UnmodifiableExecutionConfig
> > >
> > >
> > > Option 2: UnmodifiableExecutionConfig without extending
> ExecutionConfig.
> > > Pros:
> > > - new class so we don't need to override nothing.
> > > - modifications to ExecutionConfig don't affect this class
> > > Cons:
> > > - need to change TypeInformation adding
> > > createSerializer(UnmodifiableExecutionConfig config)
> > > - need to add all get methods or only what needed (this could be a
> pros)
> > >
> > >
> > > What option you think is better?
> > >
> > >
> > >
> > > On 2023/01/13 14:15:04 Joao Boto wrote:
> > > > Hi flink devs,
> > > >
> > > > I'd like to start a discussion thread for FLIP-287[1].
> > > > This comes from an offline discussion with @Lijie Wang, from
> > FLIP-239[2]
> > > > specially for the sink[3].
> > > >
> > > > Basically to expose the ExecutionConfig and JobId on
> > SinkV2#InitContext.
> > > > This  changes are necessary to correct migrate the current sinks to
> > > SinkV2
> > > > like JdbcSink, KafkaTableSink and so on, that relies on
> RuntimeContext
> > > >
> > > > Comments are welcome!
> > > > Thanks,
> > > >
> > > > [1]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853
> > > > [2]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271
> > > > [3] https://issues.apache.org/jira/browse/FLINK-25421
> > > >
> > >
> >
>
>
> --
> https://twitter.com/snntrable
> https://github.com/knaufk
>


Re: [ANNOUNCE] New Apache Flink Committer - Weijie Guo

2023-02-12 Thread Lijie Wang
Congratulations, Weijie!

Best,
Lijie

Xintong Song  于2023年2月13日周一 11:45写道:

> Hi everyone,
>
> On behalf of the PMC, I'm very happy to announce Weijie Guo as a new Flink
> committer.
>
> Weijie has been consistently contributing to the project for over 1 year.
> He mainly works on Flink runtime and shuffle parts. He has authored 66 PRs
> and reviewed 34 PRs. He is the major contributor of FLIP-235 Hybrid Shuffle
> Mode. He has also helped with many other maintenance works in Flink's
> runtime and shuffle components, including bug fixing, usability
> improvements, testability enhancements, etc.
>
> Please join me in congratulating Weijie for becoming a Flink committer!
>
> Best,
> Xintong
>


[jira] [Created] (FLINK-30972) E2e tests always fail in phase "Prepare E2E run"

2023-02-08 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-30972:
--

 Summary: E2e tests always fail in phase "Prepare E2E run"
 Key: FLINK-30972
 URL: https://issues.apache.org/jira/browse/FLINK-30972
 Project: Flink
  Issue Type: Bug
  Components: Build System / CI, Tests
Reporter: Lijie Wang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30942) Fix the bug that the decided parallelism by adaptive batch scheduler may be larger than the max parallelism

2023-02-07 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-30942:
--

 Summary: Fix the bug that the decided parallelism by adaptive 
batch scheduler may be larger than the max parallelism
 Key: FLINK-30942
 URL: https://issues.apache.org/jira/browse/FLINK-30942
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Reporter: Lijie Wang
Assignee: Lijie Wang
 Fix For: 1.17.0


Currently, when using the adaptive batch scheduler, the vertex parallelism 
decided by  forward group may be larger than the global max parallelism(which 
is configured by option {{parallelism.default}} or 
{{execution.batch.adaptive.auto-parallelism.max-parallelism}}, see FLINK-30686 
for details), which will cause the following exception:

{code:java}
Caused by: java.lang.IllegalArgumentException: Vertex's parallelism should be 
smaller than or equal to vertex's max parallelism.
at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
at 
org.apache.flink.runtime.scheduler.DefaultVertexParallelismInfo.setParallelism(DefaultVertexParallelismInfo.java:95)
at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.setParallelism(ExecutionJobVertex.java:317)
at 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.changeJobVertexParallelism(AdaptiveBatchScheduler.java:385)
at 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.initializeVerticesIfPossible(AdaptiveBatchScheduler.java:284)
at 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.onTaskFinished(AdaptiveBatchScheduler.java:183)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:745)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:725)
at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:80)
at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:479)
... 30 more
{code}





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30917) The user configured max parallelism does not take effect when using adaptive batch scheduler

2023-02-06 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-30917:
--

 Summary: The user configured max parallelism does not take effect 
when using adaptive batch scheduler
 Key: FLINK-30917
 URL: https://issues.apache.org/jira/browse/FLINK-30917
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Reporter: Lijie Wang
 Fix For: 1.17.0


Currently, the adaptive batch scheduler only respects the global maximum 
parallelism(which is configured by option {{parallelism.default}} or 
{{execution.batch.adaptive.auto-parallelism.max-parallelism}}, see FLINK-30686 
for details) when deciding parallelism for job vertices, the maximum 
parallelism of vertices configured by the user through {{setMaxParallelism}} 
will not be respected.

In this ticket, we will change the behavior so that the user-configured max 
parallelism also be respected.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-02-01 Thread Lijie Wang
+1 for Option 2, if we can abstract an "ReadableExecutionConfig"
interface(contains all is/get mehtod), and let ExecutionConfig implements
ReadableExecutionConfig

Best,
Lijie

João Boto  于2023年1月17日周二 20:39写道:

> Hi all,
>
> As establish a read-only contract seems to be consensual approach, talking
> to Lijie we saw two ways for doing this..
>
> Option 1: UnmodifiableExecutionConfig that extends ExecutionConfig (just
> like the UnmodifiableConfiguration)
> Pros:
> - we have all the get methods
> - don't need to change TypeInformation.createSerializer(ExecutionConfig
> config)
> Cons:
> - we have to override 34 methods that modify things..
> - new methods to ExecutionConfig will need to be override on
> UnmodifiableExecutionConfig
>
>
> Option 2: UnmodifiableExecutionConfig without extending ExecutionConfig.
> Pros:
> - new class so we don't need to override nothing.
> - modifications to ExecutionConfig don't affect this class
> Cons:
> - need to change TypeInformation adding
> createSerializer(UnmodifiableExecutionConfig config)
> - need to add all get methods or only what needed (this could be a pros)
>
>
> What option you think is better?
>
>
>
> On 2023/01/13 14:15:04 Joao Boto wrote:
> > Hi flink devs,
> >
> > I'd like to start a discussion thread for FLIP-287[1].
> > This comes from an offline discussion with @Lijie Wang, from FLIP-239[2]
> > specially for the sink[3].
> >
> > Basically to expose the ExecutionConfig and JobId on SinkV2#InitContext.
> > This  changes are necessary to correct migrate the current sinks to
> SinkV2
> > like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext
> >
> > Comments are welcome!
> > Thanks,
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853
> > [2]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271
> > [3] https://issues.apache.org/jira/browse/FLINK-25421
> >
>


Re: [SUMMARY] Flink 1.17 release sync 23rd of January, 2023

2023-01-26 Thread Lijie Wang
Hi Martijn,

Thanks for your detailed explanation. Honestly, the FLINK-30624 is unlikely
to be completed before the 26th, but it 's likely to be completed before
the 31st. Considering that there will be a period of time between feature
freeze and release, I think we have enough time to see if it is actually
fixed.

In summary, I would prefer to extend the "revert regression commits" to
31st.

Best,
Lijie

Martijn Visser  于2023年1月26日周四 16:22写道:

> Hi Lijie,
>
> You're right, that's not clearly phrased. Ideally we would like to get it
> fixed before the 26th, because that gives us a couple of days to monitor
> the benchmarks to see if the regression is actually fixed. If a contributor
> is actively working on a ticket, we could consider delaying the branch cut
> (not the feature freeze) over until the issue is resolved. That is of
> course except the fix would take something like more than a week after the
> 31st, then it makes more sense to revert the commit that introduced the
> regression.
>
> Let me know what you think.
>
> Best regards,
>
> Martijn
>
> Op do 26 jan. 2023 om 01:11 schreef Lijie Wang :
>
> > Hi Martijn,
> >
> > I'm working on FLINK-30624,  and it may take a while to be resolved. Do
> you
> > mean we should resolve it before the 26th? I used to think the deadline
> was
> > the 31st(the date of feature freeze).
> >
> > Best,
> > Lijie
> >
> > Martijn Visser  于2023年1月25日周三 18:07写道:
> >
> > > Hi everyone,
> > >
> > > A summary of the release sync of yesterday:
> > >
> > > - We still have 3 performance regressions (
> > > https://issues.apache.org/jira/browse/FLINK-30623,
> > > https://issues.apache.org/jira/browse/FLINK-30625,
> > > https://issues.apache.org/jira/browse/FLINK-30624) that are being
> worked
> > > on
> > > but need to be completed before the release branch cut on the 31st. If
> we
> > > can't merge PRs to resolve this (at latest on Thursday the 26th) we
> will
> > > revert the commits that introduced the regressions.
> > > - There are 3 release blockers from a test perspective:
> > > https://issues.apache.org/jira/browse/FLINK-29405,
> > > https://issues.apache.org/jira/browse/FLINK-30727 and
> > > https://issues.apache.org/jira/browse/FLINK-29427. Please make sure
> that
> > > if
> > > you are assigned to this ticket, that you have marked the ticket as "In
> > > Progress".
> > > - The feature freeze starts on Thursday the 31st of January and the
> > release
> > > branch will be cut as soon as the blockers have been resolved. When the
> > > release branch has been cut, the release testing will start.
> > >
> > > Best regards,
> > >
> > > Qingsheng, Leonard, Matthias and Martijn
> > >
> >
>


Re: [SUMMARY] Flink 1.17 release sync 23rd of January, 2023

2023-01-25 Thread Lijie Wang
Hi Martijn,

I'm working on FLINK-30624,  and it may take a while to be resolved. Do you
mean we should resolve it before the 26th? I used to think the deadline was
the 31st(the date of feature freeze).

Best,
Lijie

Martijn Visser  于2023年1月25日周三 18:07写道:

> Hi everyone,
>
> A summary of the release sync of yesterday:
>
> - We still have 3 performance regressions (
> https://issues.apache.org/jira/browse/FLINK-30623,
> https://issues.apache.org/jira/browse/FLINK-30625,
> https://issues.apache.org/jira/browse/FLINK-30624) that are being worked
> on
> but need to be completed before the release branch cut on the 31st. If we
> can't merge PRs to resolve this (at latest on Thursday the 26th) we will
> revert the commits that introduced the regressions.
> - There are 3 release blockers from a test perspective:
> https://issues.apache.org/jira/browse/FLINK-29405,
> https://issues.apache.org/jira/browse/FLINK-30727 and
> https://issues.apache.org/jira/browse/FLINK-29427. Please make sure that
> if
> you are assigned to this ticket, that you have marked the ticket as "In
> Progress".
> - The feature freeze starts on Thursday the 31st of January and the release
> branch will be cut as soon as the blockers have been resolved. When the
> release branch has been cut, the release testing will start.
>
> Best regards,
>
> Qingsheng, Leonard, Matthias and Martijn
>


Re: [DISCUSS] Promote SinkV2 to @Public and deprecate SinkFunction

2023-01-18 Thread Lijie Wang
Hi Martijn,

Thanks for driving this. I have a only concern about the Sink.InitContext.

Does the Sink.InitContext will also be changed to @Public ? As described in
FLIP-287, currently the Sink.InitContext still lacks some necessary
information to migrate existing connectors to new sinks. If it is marked as
public/stable, we can no longer modify it in the future(since most
connectors are not migrated to SinkV2 currently, we may find we need more
information via InitContext in the future migrations).

Best,
Lijie

Yun Tang  于2023年1月18日周三 21:13写道:

> SinkV2 was introduced in Flink-1.15 and annotated as @PublicEvolving from
> the 1st day [1]. From FLIP-197, we can promote it to @Public since it
> already existed with two releases.
> And I didn't find a FLIP to discuss the process to deprecate APIs,
> considering the SinkFunction has actually been stale for some time, I think
> we can deprecate it with the @Public SinkV2.
>
> Thus, +1 (binding) for this proposal.
>
> [1] https://issues.apache.org/jira/browse/FLINK-2
>
> Best
> Yun Tang
>
> 
> From: Martijn Visser 
> Sent: Wednesday, January 18, 2023 18:50
> To: dev ; Jing Ge ; Yun Tang <
> myas...@live.com>
> Subject: [DISCUSS] Promote SinkV2 to @Public and deprecate SinkFunction
>
> Hi all,
>
> While discussing FLIP-281 [1] the discussion also turned to the
> SinkFunction and the SinkV2 API. For a broader discussion I'm opening up a
> separate discussion thread.
>
> As Yun Tang has mentioned in that discussion thread, it would be a good
> time to deprecate the SinkFunction to avoid the need to introduce new
> functions towards (to be) deprecated APIs. Jing rightfully mentioned that
> it would be confusing to deprecate the SinkFunction if its successor is not
> yet marked as @Public (it's currently @PublicEvolving).
>
> My proposal would be to promote the SinkV2 API to @public in Flink 1.17
> and mark the SinkFunction as @deprecated in Flink 1.17
>
> The original Sink interface was introduced in Flink 1.12 with FLIP-143 [2]
> and extended with FLIP-177 in Flink 1.14 [3] and has been improved on
> further as Sink V2 via FLIP-191 in Flink 1.15 [4].
>
> Looking at the API stability graduation process [5], the fact that Sink V2
> was introduced in Flink 1.15 would mean that we could warrant a promotion
> to @public already (given that there have been two releases with 1.15 and
> 1.16 where it was introduced). Combined with the fact that SinkV2 has been
> the result of iteration over the introduction of the original Sink API
> since Flink 1.12, I would argue that the promotion is overdue.
>
> If we promote the Sink API to @public, I think we should also immediately
> mark the SinkFunction as @deprecated.
>
> Looking forward to your thoughts.
>
> Best regards,
>
> Martijn
>
>
> [1] https://lists.apache.org/thread/l05m6cf8fwkkbpnjtzbg9l2lo40oxzd1
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
> [3]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-177%3A+Extend+Sink+API
> [4]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction
> [5]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-197%3A+API+stability+graduation+process
>
>


Re: [Discuss] Checkout the CURRENT_SNAPSHOT_VERSION branch of the flink-benchmarks repo during each major release

2023-01-17 Thread Lijie Wang
+1 for the proposal

Best,
Lijie

Martijn Visser  于2023年1月17日周二 21:54写道:

> Hi Yun Tang,
>
> +1 for the proposal. Thanks for driving this.
>
> Best regards,
>
> Martijn
>
> Op ma 16 jan. 2023 om 12:29 schreef Yanfei Lei :
>
> > Hi Yun,
> >
> > Thanks for kicking off this discussion, +1 for the proposal.
> > In the past, we occasionally encountered flink-benchmarks compilation
> > failures caused by the code updates in flink repo, like FLINK-28931[1].
> > Managing the flink-benchmarks repository by release branch can help us
> > handle these cases better.
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-28931
> >
> > Best,
> > Yanfei
> >
> > Qingsheng Ren  于2023年1月16日周一 16:27写道:
> >
> > >
> > > Thanks for the proposal, Yun!
> > >
> > > +1 for managing the flink-benchmark repository by branch. I didn't put
> my
> > > eyes on the benchmark repo a lot but having only one branch for testing
> > > multiple Flink versions requires a lot of adaptive code I assume. Also
> > this
> > > will make it easier to compare performance between different Flink
> > > versions.
> > >
> > > We can start from 1.17 and cut a new branch in flink-benchmarks every
> > time
> > > before releasing.
> > >
> > > Best,
> > > Qingsheng
> > >
> > > On Mon, Jan 16, 2023 at 1:02 PM Yun Tang  wrote:
> > >
> > > > Hi all,
> > > >
> > > > More than two years ago, we migrated the flink-benchmarks repo under
> > > > apache project[1]. And we planned to make the flink-benchmarks repo
> > could
> > > > align with different versions of Flink.
> > > > However, I somehow forget to make this done finally[2]. Thus, we
> still
> > > > cannot build the master branch of flink-benchmarks against flink-1.13
> > due
> > > > to some incompatible changes, e.g we have removed the scala suffix of
> > many
> > > > modules in Flink-1.15.
> > > > The release-1.17 feature freeze date is near and I think we can
> > continue
> > > > this work to add steps to checkout the CURRENT_SNAPSHOT_VERSION
> branch
> > of
> > > > the flink-benchmarks repo in the release documentation[3] just like
> > current
> > > > what we did in flink-docker repo. Since the original discussion is
> > launched
> > > > more than two years ago, I think it's better to launch another
> > discussion
> > > > email and cc all flink-1.17 release managers.
> > > >
> > > >
> > > > Best
> > > > Yun Tang
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-17280
> > > > [2]
> > > >
> >
> >
> https://issues.apache.org/jira/browse/FLINK-16850?focusedCommentId=17202288&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17202288
> > > > [3]
> > > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Creating+a+Flink+Release
> > > >
> > > >
> > > >
> >
>


Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-01-16 Thread Lijie Wang
Hi Joao,

Thanks for driving this FLIP, +1 for exposing a read-only ExecutionConfig,
users/developers are not expected to modify the ExecutionConfig in Sink.

Can we directly introduce the read-only ExecutionConfig in this FLIP? No
need for a separate FLIP, because currently it will only be used in
InitContext and will not affect other user interfaces. WDYT? cc @Gunnar

Best,
Lijie


João Boto  于2023年1月16日周一 19:22写道:

> Hi Jing Ge,
> Thanks for your response..
>
> Making the review left above about all connectors, I realise that we need
> the full ExecutionConfig as it is needed to generate the serializer
> correctly if objectReuse is enabled as we call
> TypeInformation.createSerializer(ExecutionConfig config)
>
> On the PoC we use the ExecutionConfig and I don't see this... :(
>
> Regards
>
> On 2023/01/14 00:01:52 Jing Ge wrote:
> > Hi Joao,
> >
> > Thanks for bringing this up. Exposing internal domain instances depends
> on
> > your requirements. Technically, it is even possible to expose the
> > RuntimeContext [1] (must be considered very carefully). Since you
> mentioned
> > that you only need to know if objectReuse is enabled, how about just
> expose
> > isObjectReuseEnabled instead of the whole ExecutionConfig? The idea is to
> > shrink the scope as small as possible to satisfy the requirement. If more
> > information from ExecutionConfig is needed later, we still can refactor
> the
> > code properly according to the strong motivation.
> >
> > Best regards,
> > Jing
> >
> > [1]
> >
> https://github.com/apache/flink/blob/560b4612735a2b9cd3b5db88adf5cb223e85535b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java#L279
> >
> > On Fri, Jan 13, 2023 at 6:19 PM João Boto  wrote:
> >
> > > Hi Gunnar,
> > > Thanks for your time and response...
> > >
> > > I think the problem you want to solve is the exposure of the
> > > ExecutionConfig (that can be mutated) no?
> > > The configuration is not mutated, we only need to know if objectReuse
> is
> > > enable.
> > > This is already expose on RuntimeContext we think to keep it similar
> to it
> > > to simplify any migrations, but as said, for this migration from
> > > ExecutionConfig we only need the isObjectReuseEnabled, and we could
> expose
> > > only this configuration..
> > >
> > > Best regards,
> > >
> > >
> > > On 2023/01/13 15:50:09 Gunnar Morling wrote:
> > > > Hey Joao,
> > > >
> > > > Thanks for this FLIP! One question on the proposed interface changes:
> > > > is it expected that the configuration is *mutated* via the
> InitContext
> > > > passed to Sink::createWriter()? If that's not the case, how about
> > > > establishing a read-only contract representing the current
> > > > configuration and passing in that one instead? That would probably
> > > > deserve its own FLIP upon which yours here then would depend. Later
> > > > on, other contracts which effectively shouldn't modify a config could
> > > > use that one, too.
> > > >
> > > > Note I don't mean to stall your efforts here, but I thought it'd be a
> > > > good idea to bring it up and gauge the general interest in this.
> > > >
> > > > Best,
> > > >
> > > > --Gunnar
> > > >
> > > > Am Fr., 13. Jan. 2023 um 15:17 Uhr schrieb Joao Boto  >:
> > > > >
> > > > > Hi flink devs,
> > > > >
> > > > > I'd like to start a discussion thread for FLIP-287[1].
> > > > > This comes from an offline discussion with @Lijie Wang, from
> > > FLIP-239[2]
> > > > > specially for the sink[3].
> > > > >
> > > > > Basically to expose the ExecutionConfig and JobId on
> > > SinkV2#InitContext.
> > > > > This  changes are necessary to correct migrate the current sinks to
> > > SinkV2
> > > > > like JdbcSink, KafkaTableSink and so on, that relies on
> RuntimeContext
> > > > >
> > > > > Comments are welcome!
> > > > > Thanks,
> > > > >
> > > > > [1]
> > > > >
> > >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853
> > > > > [2]
> > > > >
> > >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271
> > > > > [3] https://issues.apache.org/jira/browse/FLINK-25421
> > > >
> > >
> >
>


[jira] [Created] (FLINK-30670) Ignore broadcast bytes when computing parallelism and input infos

2023-01-12 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-30670:
--

 Summary: Ignore broadcast bytes when computing parallelism and 
input infos
 Key: FLINK-30670
 URL: https://issues.apache.org/jira/browse/FLINK-30670
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Lijie Wang
 Fix For: 1.17.0


Currently, we include the broadcast bytes in the 
"jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task" when calculating 
the parallelism, and set a cap ratio(0.5) for the broadcast ratio. Considering 
that the broadcast bytes are generally relatively small, we can ignore the 
broadcast bytes to simplify the logic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-281: Sink Supports Speculative Execution For Batch Job

2023-01-12 Thread Lijie Wang
+1 (binding)

Best,
Lijie

Martijn Visser  于2023年1月12日周四 15:56写道:

> +0 (binding)
>
> Op di 10 jan. 2023 om 13:11 schreef yuxia :
>
> > +1 (non-binding).
> >
> > Best regards,
> > Yuxia
> >
> > - 原始邮件 -
> > 发件人: "Zhu Zhu" 
> > 收件人: "dev" 
> > 发送时间: 星期二, 2023年 1 月 10日 下午 5:50:39
> > 主题: Re: [VOTE] FLIP-281: Sink Supports Speculative Execution For Batch
> Job
> >
> > +1 (binding)
> >
> > Thanks,
> > Zhu
> >
> > Biao Liu  于2023年1月5日周四 10:37写道:
> > >
> > > Hi Martijn,
> > >
> > > Sure, thanks for the reminder about the holiday period.
> > > Looking forward to your feedback!
> > >
> > > Thanks,
> > > Biao /'bɪ.aʊ/
> > >
> > >
> > >
> > > On Thu, 5 Jan 2023 at 03:07, Martijn Visser 
> > > wrote:
> > >
> > > > Hi Biao,
> > > >
> > > > To be honest, I haven't read the FLIP yet since this is still a
> holiday
> > > > period in Europe. I would like to read it in the next few days. Can
> you
> > > > keep the vote open a little longer?
> > > >
> > > > Best regards,
> > > >
> > > > Martijn
> > > >
> > > > On Wed, Jan 4, 2023 at 1:31 PM Biao Liu  wrote:
> > > >
> > > > > Hi everyone,
> > > > >
> > > > > Thanks for all the feedback!
> > > > >
> > > > > Based on the discussion[1], we seem to have a consensus. So I'd
> like
> > to
> > > > > start a vote on FLIP-281: Sink Supports Speculative Execution For
> > Batch
> > > > > Job[2]. The vote will last for 72 hours, unless there is an
> > objection or
> > > > > insufficient votes.
> > > > >
> > > > > [1]
> https://lists.apache.org/thread/l05m6cf8fwkkbpnjtzbg9l2lo40oxzd1
> > > > > [2]
> > > > >
> > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job
> > > > >
> > > > > Thanks,
> > > > > Biao /'bɪ.aʊ/
> > > > >
> > > >
> >
>


[jira] [Created] (FLINK-30641) docs_404_check fail

2023-01-11 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-30641:
--

 Summary: docs_404_check fail
 Key: FLINK-30641
 URL: https://issues.apache.org/jira/browse/FLINK-30641
 Project: Flink
  Issue Type: Bug
Reporter: Lijie Wang


{code:java}
Cloning into 'flink-connector-rabbitmq'...
Note: switching to '325b6ba8d866bb02204736229aa54ade304119a3'.

You are in 'detached HEAD' state. You can look around, make experimental
changes and commit them, and you can discard any commits you make in this
state without impacting any branches by switching back to a branch.

If you want to create a new branch to retain commits you create, you may
do so (now or later) by using -c with the switch command. Example:

  git switch -c 

Or undo this operation with:

  git switch -

Turn off this advice by setting config variable advice.detachedHead to false

Start building sites … 
ERROR 2023/01/12 06:49:21 [en] REF_NOT_FOUND: Ref 
"docs/connectors/table/kinesis": 
"/home/vsts/work/1/s/docs/content/docs/connectors/table/formats/overview.md:45:20":
 page not found
ERROR 2023/01/12 06:49:21 [en] REF_NOT_FOUND: Ref 
"docs/connectors/table/firehose": 
"/home/vsts/work/1/s/docs/content/docs/connectors/table/formats/overview.md:46:20":
 page not found
ERROR 2023/01/12 06:49:21 [en] REF_NOT_FOUND: Ref 
"docs/connectors/table/kinesis": 
"/home/vsts/work/1/s/docs/content/docs/connectors/table/formats/overview.md:53:20":
 page not found
ERROR 2023/01/12 06:49:21 [en] REF_NOT_FOUND: Ref 
"docs/connectors/table/firehose": 
"/home/vsts/work/1/s/docs/content/docs/connectors/table/formats/overview.md:54:20":
 page not found
ERROR 2023/01/12 06:49:21 [en] REF_NOT_FOUND: Ref 
"docs/connectors/table/kinesis": 
"/home/vsts/work/1/s/docs/content/docs/connectors/table/formats/overview.md:62:21":
 page not found
ERROR 2023/01/12 06:49:21 [en] REF_NOT_FOUND: Ref 
"docs/connectors/table/firehose": 
"/home/vsts/work/1/s/docs/content/docs/connectors/table/formats/overview.md:63:21":
 page not found
ERROR 2023/01/12 06:49:21 [en] REF_NOT_FOUND: Ref 
"docs/connectors/table/kinesis": 
"/home/vsts/work/1/s/docs/content/docs/connectors/table/formats/overview.md:103:20":
 page not found
ERROR 2023/01/12 06:49:21 [en] REF_NOT_FOUND: Ref 
"docs/connectors/table/firehose": 
"/home/vsts/work/1/s/docs/content/docs/connectors/table/formats/overview.md:104:20":
 page not found
ERROR 2023/01/12 06:49:21 [en] REF_NOT_FOUND: Ref 
"docs/connectors/datastream/dynamodb": 
"/home/vsts/work/1/s/docs/content/docs/connectors/datastream/overview.md:43:22":
 page not found
ERROR 2023/01/12 06:49:21 [en] REF_NOT_FOUND: Ref 
"docs/connectors/datastream/kinesis": 
"/home/vsts/work/1/s/docs/content/docs/connectors/datastream/overview.md:44:34":
 page not found
ERROR 2023/01/12 06:49:21 [en] REF_NOT_FOUND: Ref 
"docs/connectors/datastream/firehose": 
"/home/vsts/work/1/s/docs/content/docs/connectors/datastream/overview.md:45:35":
 page not found
ERROR 2023/01/12 06:49:21 [en] REF_NOT_FOUND: Ref 
"docs/connectors/table/dynamodb": 
"/home/vsts/work/1/s/docs/content/docs/connectors/table/overview.md:70:20": 
page not found
ERROR 2023/01/12 06:49:21 [en] REF_NOT_FOUND: Ref 
"docs/connectors/table/kinesis": 
"/home/vsts/work/1/s/docs/content/docs/connectors/table/overview.md:76:20": 
page not found
ERROR 2023/01/12 06:49:21 [en] REF_NOT_FOUND: Ref 
"docs/connectors/table/firehose": 
"/home/vsts/work/1/s/docs/content/docs/connectors/table/overview.md:82:20": 
page not found
ERROR 2023/01/12 06:49:30 [zh] REF_NOT_FOUND: Ref 
"docs/connectors/table/kinesis": 
"/home/vsts/work/1/s/docs/content.zh/docs/connectors/table/formats/overview.md:45:20":
 page not found
ERROR 2023/01/12 06:49:30 [zh] REF_NOT_FOUND: Ref 
"docs/connectors/table/firehose": 
"/home/vsts/work/1/s/docs/content.zh/docs/connectors/table/formats/overview.md:46:20":
 page not found
ERROR 2023/01/12 06:49:30 [zh] REF_NOT_FOUND: Ref 
"docs/connectors/table/kinesis": 
"/home/vsts/work/1/s/docs/content.zh/docs/connectors/table/formats/overview.md:53:20":
 page not found
ERROR 2023/01/12 06:49:30 [zh] REF_NOT_FOUND: Ref 
"docs/connectors/table/firehose": 
"/home/vsts/work/1/s/docs/content.zh/docs/connectors/table/formats/overview.md:54:20":
 page not found
ERROR 2023/01/12 06:49:30 [zh] REF_NOT_FOUND: Ref 
"docs/connectors/table/kinesis": 
"/home/vsts/work/1/s/docs/content.zh/docs/connectors/table/formats/overview.md:62:21":
 page not found
ERROR 2023/01/12 06:49:30 [zh] REF_NOT_FOUND: Ref 
"docs/connectors/table/firehose": 
"/home/vsts

[jira] [Created] (FLINK-30631) Limit the max number of subpartitons consumed by each downstream task

2023-01-11 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-30631:
--

 Summary: Limit the max number of subpartitons consumed by each 
downstream task
 Key: FLINK-30631
 URL: https://issues.apache.org/jira/browse/FLINK-30631
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Lijie Wang
 Fix For: 1.17.0


In the current implementation(FLINK-25035), when the upstream vertex 
parallelism is much greater than the downstream vertex parallelism, it may lead 
to a large number of channels in the downstream tasks(for example, A -> B, all 
to all edge, max parallelism is 1000. If parallelism of A is 1000, parallelism 
of B is decided to be 1, then the only subtask of B will consume 1000 * 1000 
subpartitions), resulting in a large overhead for processing channels.

In this ticket, we temporarily address this issue by limiting the max number of 
subpartitons consumed by each downstream task. The ultimate solution should be 
to support single channel consume multiple subpartitons.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-283: Use adaptive batch scheduler as default scheduler for batch jobs

2023-01-10 Thread Lijie Wang
+1 (binding)

Best,
Lijie

Junrui Lee  于2023年1月10日周二 11:31写道:

> Hi all,
>
> Thanks for all the feedback so far.
> Based on the discussion[1], we have come to a consensus,
> so I would like to start a vote on FLIP-283: Use adaptive
> batch scheduler as default scheduler for batch jobs[2].
>
> The vote will last for at least 72 hours (Jan 12th at 12:00 GMT)
> unless there is an objection or insufficient votes.
>
> [1] https://lists.apache.org/thread/qwrh22do8scghz79vy852pqx2ny4jqv6
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-283%3A+Use+adaptive+batch+scheduler+as+default+scheduler+for+batch+jobs
>


Re: [ANNOUNCE] New Apache Flink Committer - Lincoln Lee

2023-01-09 Thread Lijie Wang
Congratulations, Lincoln!

Best,
Lijie

Jingsong Li  于2023年1月10日周二 12:07写道:

> Congratulations, Lincoln!
>
> Best,
> Jingsong
>
> On Tue, Jan 10, 2023 at 11:56 AM Leonard Xu  wrote:
> >
> > Congratulations, Lincoln!
> >
> > Impressive work in streaming semantics, well deserved!
> >
> >
> > Best,
> > Leonard
> >
> >
> > > On Jan 10, 2023, at 11:52 AM, Jark Wu  wrote:
> > >
> > > Hi everyone,
> > >
> > > On behalf of the PMC, I'm very happy to announce Lincoln Lee as a new
> Flink
> > > committer.
> > >
> > > Lincoln Lee has been a long-term Flink contributor since 2017. He
> mainly
> > > works on Flink
> > > SQL parts and drives several important FLIPs, e.g., FLIP-232 (Retry
> Async
> > > I/O), FLIP-234 (
> > > Retryable Lookup Join), FLIP-260 (TableFunction Finish). Besides, He
> also
> > > contributed
> > > much to Streaming Semantics, including the non-determinism problem and
> the
> > > message
> > > ordering problem.
> > >
> > > Please join me in congratulating Lincoln for becoming a Flink
> committer!
> > >
> > > Cheers,
> > > Jark Wu
> >
>


[jira] [Created] (FLINK-30604) Remove the limitation of that parallelism decided by adaptive batch scheduler must be power-of-two

2023-01-09 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-30604:
--

 Summary: Remove the limitation of that parallelism decided by 
adaptive batch scheduler must be power-of-two
 Key: FLINK-30604
 URL: https://issues.apache.org/jira/browse/FLINK-30604
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Lijie Wang
 Fix For: 1.17.0


In FLINK-26517, we add a limitation that the parallelism decided by adaptive 
batch scheduler must be 2^N(for the subpartitions can evenly distribute to 
downstream tasks). After FLINK-29666, we can evenly distribute data to 
downstream tasks, so we can remove the limitation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30601) Omit "setKeyContextElement" call for non-keyed stream/operators to improve performance

2023-01-08 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-30601:
--

 Summary: Omit "setKeyContextElement" call for non-keyed 
stream/operators to improve performance
 Key: FLINK-30601
 URL: https://issues.apache.org/jira/browse/FLINK-30601
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Reporter: Lijie Wang
 Fix For: 1.17.0


Currently, flink will set the correct key context(by call 
[setKeyContextElement|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java#:~:text=input.setKeyContextElement(castRecord)%3B])
 before processing each record, which is typically used to extract key from 
record and pass that key to the state backends. 

However, the "setKeyContextElement" is obviously not need for non-keyed 
stream/operator, in which case we should omit the "setKeyContextElement" calls 
to improve performance. Note that setKeyContextElement is an interface method, 
it requires looking up the interface table when calling, which will further 
increase the method call overhead.
 
We run the following program as benchmark with parallelism=1 and object re-use 
enabled. The benchmark results are averaged across 5 runs for each setup. 
Before and after applying the proposed change, the average execution time 
changed from 88.39 s to 78.76 s, which increases throughput by 10.8%.
 
{code:java}
env.fromSequence(1, 10L)
.map(x -> x)
.map(x -> x)
.map(x -> x)
.map(x -> x)
.map(x -> x).addSink(new DiscardingSink<>());
{code}
 
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE]FLIP-266: Simplify network memory configurations for TaskManager

2023-01-04 Thread Lijie Wang
+1 (binding)

Best,
Lijie

17610775726 <17610775...@163.com> 于2023年1月4日周三 13:03写道:

>
>
> +1 (no binding)
>
>
> Best
> JasonLee
>
>
>  Replied Message 
> | From | Yuxin Tan |
> | Date | 01/3/2023 17:56 |
> | To |  |
> | Subject | [VOTE]FLIP-266: Simplify network memory configurations for
> TaskManager |
> Hi all,
>
> Thanks for all the feedback so far.
> Based on the discussion[1], we have come to a consensus,
> so I would like to start a vote on FLIP-266: Simplify network
> memory configurations for TaskManager[2].
>
> The vote will last for at least 72 hours (Jan 6th at 11:00 GMT)
> unless there is an objection or insufficient votes.
>
> [1]https://lists.apache.org/thread/yzfn5yf2tf8o165ns337bvfmb7t8h7mf
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager
>
> Best,
> Yuxin
>


Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job

2023-01-03 Thread Lijie Wang
Hi Biao,

Thanks for the explanation of how SinkV2  knows the right subtask
attempt. I have no more questions, +1 for the proposal.

Best,
Lijie

Biao Liu  于2022年12月28日周三 17:22写道:

> Thanks for all your feedback!
>
> To @Yuxia,
>
> > What the sink expect to do to isolate data produced by speculative
> > executions?  IIUC, if the taks failover, it also generate a new attempt.
> > Does it make difference in isolating data produced?
>
>
> Yes there is something different from the task failover scenario. The
> attempt number is more necessary for speculative execution than failover.
> Because there can be only one subtask instance running at the same time in
> the failover scenario.
>
> Let's take FileSystemOutputFormat as an example. For the failover scenario,
> the temporary directory to store produced data can be something like
> "$root_dir/task-$taskNumber/". At the initialization phase, subtask deletes
> and re-creates the temporary directory.
>
> However in the speculative execution scenario, it does not work because
> there might be several subtasks running at the same time. These subtasks
> might delete, re-create and write the same temporary directory at the
> same time. The correct temporary directory should be like
> "$root_dir/task-$taskNumber/attempt-$attemptNumber". So it's necessary to
> expose the attempt number to the Sink implementation to do the data
> isolation.
>
>
> To @Lijie,
>
> > I have a question about this: does SinkV2 need to do the same thing?
>
>
> Actually, yes.
>
> Should we/users do it in the committer? If yes, how does the commiter know
> > which one is the right subtask attempt?
>
>
> Yes, we/users should do it in the committer.
>
> In the current design, the Committer of Sink V2 should get the "which one
> is the right subtask attempt" information from the "committable data''
> produced by SinkWriter. Let's take the FileSink as example, the
> "committable data" sent to the Committer contains the full path of the
> files produced by SinkWriter. Users could also pass the attempt number
> through "committable data" from SinkWriter to Committer.
>
> In the "Rejected Alternatives -> Introduce a way to clean leaked data of
> Sink V2" section of the FLIP document, we discussed some of the reasons
> that we didn't provide the API like OutputFormat.
>
> To @Jing Zhang
>
> I have a question about this: Speculative execution of Committer will be
> > disabled.
>
> I agree with your point and I saw the similar requirements to disable
> speculative
> > execution for specified operators.
>
> However the requirement is not supported currently. I think there
> should be some
> > place to describe how to support it.
>
>
> In this FLIP design, the speculative execution of Committer of Sink V2 will
> be disabled by Flink. It's not an optional operation. Users can not change
> it.
> And as you said, "disable speculative execution for specified operators" is
> not supported in the FLIP. Because it's a bit out of scope: "Sink Supports
> Speculative Execution For Batch Job". I think it's better to start another
> FLIP to discuss it. "Fine-grained control of enabling speculative execution
> for operators" can be the title of that FLIP. And we can discuss there how
> to enable or disable speculative execution for specified operators
> including Committer and pre/post-committer of Sink V2.
>
> What do you think?
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Wed, 28 Dec 2022 at 11:30, Jing Zhang  wrote:
>
> > Hi Biao,
> >
> > Thanks for driving this FLIP. It's meaningful to support speculative
> > execution
> > of sinks is important.
> >
> > I have a question about this: Speculative execution of Committer will be
> > disabled.
> >
> > I agree with your point and I saw the similar requirements to disable
> > speculative execution for specified operators.
> >
> > However the requirement is not supported currently. I think there should
> be
> > some place to describe how to support it.
> >
> > Best,
> > Jing Zhang
> >
> > Lijie Wang  于2022年12月27日周二 18:51写道:
> >
> > > Hi Biao,
> > >
> > > Thanks for driving this FLIP.
> > > In this FLIP, it introduces "int getFinishedAttempt(int subtaskIndex)"
> > for
> > > OutputFormat to know which subtask attempt is the one marked as
> finished
> > by
> > > JM and commit the right data.
> > > I have a question about this: does SinkV2 need 

[jira] [Created] (FLINK-30544) Speed up finding minimum watermark across all channels by introducing heap-based algorithm

2023-01-02 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-30544:
--

 Summary: Speed up finding minimum watermark across all channels by 
introducing heap-based algorithm
 Key: FLINK-30544
 URL: https://issues.apache.org/jira/browse/FLINK-30544
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Reporter: Lijie Wang
 Fix For: 1.17.0


Currently, every time a task receives a watermark, it tries to update the 
minimum watermark.Currently, we use the traversal algorithm to find the minimum 
watermark across all channels(see 
[StatusWatermarkValue#findAndOutputNewMinWatermarkAcrossAlignedChannels|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValve.java#:~:text=private%20void-,findAndOutputNewMinWatermarkAcrossAlignedChannels,-(DataOutput%3C%3F%3E]
 for details), and the time complexity is O(N), where N is the number of 
channels.

We can optimize it by introducing a heap-based algorthim, reducing the time 
complexity to O(log(N)))



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-283: Use adaptive batch scheduler as default scheduler for batch jobs

2023-01-02 Thread Lijie Wang
Hi Junrui,

Thanks for driving this FLIP, + 1 for this proposal. I believe it will
greatly improve the experiences of batch users.

Best,
Lijie

Zhu Zhu  于2022年12月30日周五 12:40写道:

> Hi Junrui,
>
> Thanks for creating this FLIP!
>
> AdaptiveBatchScheduler is more powerful than DefaultScheduler in batch
> scheduling, also with some must-have features like speculative execution.
> It will be great that users can easily use it, without required to knowing
> the underlying scheduler and configuring some advanced items.
>
> So generally +1 for this proposal.
>
> Regarding the configuration key renaming, like yuxia mentioned, we should
> deprecate the old ones and add new ones with new names, to guarantee
> compatibility.
>
> Thanks,
> Zhu
>
> yuxia  于2022年12月30日周五 11:10写道:
> >
> > Hi, JunRui Lee.
> >
> > Thanks for driving this FLIP. It must a good improvement for batch
> users' experiences.
> > I have few questions about this FLIP:
> > 1: About the configuration renaming. The old configurations will be
> deprecated or removed directly? if user upgrade their Flink version, these
> old configuration will still be considered or just ignored?  If ignore, the
> users may need to modify their configurations after they upgrade their
> Flink.
> >
> > 2: I'm cursion in which case users will disable auto parallelism
> derivation if they have enabled adaptive batch scheduler.  IIUC, auto
> parallelism derivation is what adaptive batch scheduler aim to do. If use
> want to diable auto parallelism derivation, can they just disable adaptive
> batch scheduler.?
> >
> > Best regards,
> > Yuxia
> >
> > - 原始邮件 -
> > 发件人: "JunRui Lee" 
> > 收件人: "dev" 
> > 发送时间: 星期四, 2022年 12 月 29日 下午 7:45:36
> > 主题: [DISCUSS] FLIP-283: Use adaptive batch scheduler as default
> scheduler for batch jobs
> >
> > Hi, devs,
> >
> > I'd like to start a discussion about FLIP-283: Use adaptive batch
> > scheduler as default scheduler for batch jobs[1].
> >
> > In FLIP-187, we introduced an adaptive batch scheduler. The adaptive
> > batch scheduler has stronger batch scheduling capabilities, including
> > automatically deciding parallelisms of job vertices for batch
> > jobs (FLIP-187)[2], data balanced distribution (FLINK-29663)[3],
> > and speculative execution (FLIP-168)[4]. To further use the adaptive
> > batch scheduler to improve flink's batch capability, in this FLIP
> > we aim to make the adaptive batch scheduler as the default batch
> > scheduler.
> >
> > Currently, users have to set some configuration of the adaptive
> > batch scheduler, which is not very convenient. To use the adaptive
> > batch scheduler as the default batch scheduler, we need to improve
> > the user's out-of-the-box experience. Therefore,  we also need to
> > optimize the current adaptive batch scheduler configuration.
> >
> > Looking forward to your feedback.
> >
> > [1]:
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-283%3A+Use+adaptive+batch+scheduler+as+default+scheduler+for+batch+jobs
> > [2]:
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Scheduler
> > [3]:https://issues.apache.org/jira/browse/FLINK-29663
> > [4]:
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> >
> > Best regards,
> > JunRui Lee
>


Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job

2022-12-27 Thread Lijie Wang
Hi Biao,

Thanks for driving this FLIP.
In this FLIP, it introduces "int getFinishedAttempt(int subtaskIndex)" for
OutputFormat to know which subtask attempt is the one marked as finished by
JM and commit the right data.
I have a question about this: does SinkV2 need to do the same thing? Should
we/users do it in the committer? If yes, how does the commiter know which
one is the right subtask attempt?

Best,
Lijie

yuxia  于2022年12月27日周二 10:01写道:

> HI, Biao.
> Thanks for driving this FLIP.
> After quick look of this FLIP, I have a question about "expose the attempt
> number which can be used to isolate data produced by speculative executions
> with the same subtask id".
> What the sink expect to do to isolate data produced by speculative
> executions?  IIUC, if the taks failover, it also generate a new attempt.
> Does it make difference in isolating data produced?
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "Biao Liu" 
> 收件人: "dev" 
> 发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21
> 主题: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job
>
> Hi everyone,
>
> I would like to start a discussion about making Sink support speculative
> execution for batch jobs. This proposal is a follow up of "FLIP-168:
> Speculative Execution For Batch Job"[1]. Speculative execution is very
> meaningful for batch jobs. And it would be more complete after supporting
> speculative execution of Sink. Please find more details in the FLIP
> document
> [2].
>
> Looking forward to your feedback.
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job
>
> Thanks,
> Biao /'bɪ.aʊ/
>


  1   2   3   >