Re: Re: Re: [DISCUSS] FLIP-466: Introduce ProcessFunction Attribute in DataStream API V2

2024-07-14 Thread Xuannan Su
Hi, Wencong,
Thanks for driving the FLIP.

+1 for this FLIP.

It is a useful mechanism to let the operator provide the hint for the
engine to optimize the execution.

Best regards,
Xuannan

On Sat, Jul 6, 2024 at 3:53 PM Wencong Liu  wrote:
>
> Hi Yuxin,
> Thanks for the reply.
> > For idempotence annotation, what's the specific behavior?
>
>
> StreamTask will reduce the frequency of output record, which will
> have a default value and can also be set through configuration options.
> The specific rules will be described in detail in the subsequent FLIP.
>
>
> Best,
> Wencong
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2024-07-05 14:19:44,"Yuxin Tan"  写道:
> >Hi, Wencong,
> >Thanks for driving the FLIP.
> >
> >+1 for this FLIP.
> >
> >I believe these hints will improve the performance in many use cases.
> >I only have a minor question about the Idempotence annotation. When
> >this annotation is added, how does StreamTask optimize the frequency?
> >Does it ensure a single output, or does it merely reduce the frequency
> >of the outputs?
> >
> >Best,
> >Yuxin
> >
> >
> >Wencong Liu  于2024年7月1日周一 16:39写道:
> >
> >> Hi, Jeyhun,
> >> Thanks for the reply.
> >>
> >>
> >> > Integrate these annotations with the execution plan.
> >> I believe DataStream is an Imperative API, which means
> >> that the actual execution plan is basically consistent with
> >> the computational logic expressed by the user with DataStream,
> >> and it is different from SQL, so the significance of supporting
> >> getExecutionPlan in the short term may not be great. If it is to
> >> be supported later, it is possible to consider the impact of Hints.
> >>
> >>
> >> > Check for misuse of attributes or ignore it.
> >> For illegal use (annotated on the inappropriate ProcessFunction),
> >> an exception will be thrown. For legal use, the framework can also
> >> choose to ignore it.
> >>
> >>
> >> > A framework to include attributes.
> >> Yes, we will provide a basic framework in the implementation
> >> to help developers for extension.
> >>
> >>
> >> Best,
> >> Wencong
> >>
> >>
> >> At 2024-06-28 02:06:37, "Jeyhun Karimov"  wrote:
> >> >Hi Wencong,
> >> >
> >> >Thanks for the FLIP. +1 for it.
> >> >
> >> >Providing hints to users will enable more optimization potential for DSv2.
> >> >I have a few questions.
> >> >
> >> >I think currently, DSv2 ExecutionEnvironment does not support getting
> >> >execution plan (getExecutionPlan()).
> >> >Do you plan to integrate these annotations with the execution plan?
> >> >
> >> >Any plans to check for misuse of attributes? Or any plans for a framework
> >> >to implicitly include attributes?
> >> >
> >> >Also, now that we make analogy with SQL hints, SQL query planners usually
> >> >ignore wrong hints and continue with its best plan.
> >> >Do we want to consider this approach? Or should we throw exception
> >> whenever
> >> >the hint (attribute in this case) is wrong?
> >> >
> >> >
> >> >Regards,
> >> >Jeyhun
> >> >
> >> >
> >> >On Thu, Jun 27, 2024 at 7:47 AM Xintong Song 
> >> wrote:
> >> >
> >> >> +1 for this FLIP.
> >> >>
> >> >> I think this is similar to SQL hints, where users can provide optional
> >> >> information to help the engine execute the workload more efficiently.
> >> >> Having a unified mechanism for such kind of hints should improve
> >> usability
> >> >> compared to introducing tons of configuration knobs.
> >> >>
> >> >> Best,
> >> >>
> >> >> Xintong
> >> >>
> >> >>
> >> >>
> >> >> On Wed, Jun 26, 2024 at 8:09 PM Wencong Liu 
> >> wrote:
> >> >>
> >> >> > Hi devs,
> >> >> >
> >> >> >
> >> >> > I'm proposing a new FLIP[1] to introduce the ProcessFunction
> >> Attribute in
> >> >> > the
> >> >> > DataStream API V2. The goal is to optimize job execution by leveraging
> >> >> > additional information about users' ProcessFunction logic. The
> >> proposal
> >> >> > includes
> >> >> > the following scenarios where the ProcessFunction Attribute can
> >> >> > significantly
> >> >> > enhance optimization:
> >> >> >
> >> >> >
> >> >> > Scenario 1: If the framework recognizes that the ProcessFunction
> >> outputs
> >> >> > data
> >> >> > only after all input is received, the downstream operators can be
> >> >> > scheduled until
> >> >> > the ProcessFunction is finished, which effectively reduces resource
> >> >> > consumption.
> >> >> > Ignoring this information could lead to premature scheduling of
> >> >> downstream
> >> >> > operators with no data to process. This scenario is addressed and
> >> >> > optimized by FLIP-331[2].
> >> >> >
> >> >> >
> >> >> > Scenario 2: For stream processing, where users are only interested in
> >> the
> >> >> > latest
> >> >> > result per key at the current time, the framework can optimize by
> >> >> > adjusting the
> >> >> > frequency of ProcessFunction outputs. This reduces shuffle data volume
> >> >> and
> >> >> > downstream operator workload. If this optimization is ignored, each
> >> new
> >> >> > input
> >> >> > would trigger a new output. This scenario is addressed and
> >> 

Re: [ANNOUNCE] New Apache Flink PMC Member - Fan Rui

2024-06-05 Thread Xuannan Su
Congratulations!

Best regards,
Xuannan

On Thu, Jun 6, 2024 at 9:53 AM Hangxiang Yu  wrote:
>
> Congratulations, Rui !
>
> On Thu, Jun 6, 2024 at 9:18 AM Lincoln Lee  wrote:
>
> > Congratulations, Rui!
> >
> > Best,
> > Lincoln Lee
> >
> >
> > Lijie Wang  于2024年6月6日周四 09:11写道:
> >
> > > Congratulations, Rui!
> > >
> > > Best,
> > > Lijie
> > >
> > > Rodrigo Meneses  于2024年6月5日周三 21:35写道:
> > >
> > > > All the best
> > > >
> > > > On Wed, Jun 5, 2024 at 5:56 AM xiangyu feng 
> > > wrote:
> > > >
> > > > > Congratulations, Rui!
> > > > >
> > > > > Regards,
> > > > > Xiangyu Feng
> > > > >
> > > > > Feng Jin  于2024年6月5日周三 20:42写道:
> > > > >
> > > > > > Congratulations, Rui!
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Feng Jin
> > > > > >
> > > > > > On Wed, Jun 5, 2024 at 8:23 PM Yanfei Lei 
> > > wrote:
> > > > > >
> > > > > > > Congratulations, Rui!
> > > > > > >
> > > > > > > Best,
> > > > > > > Yanfei
> > > > > > >
> > > > > > > Luke Chen  于2024年6月5日周三 20:08写道:
> > > > > > > >
> > > > > > > > Congrats, Rui!
> > > > > > > >
> > > > > > > > Luke
> > > > > > > >
> > > > > > > > On Wed, Jun 5, 2024 at 8:02 PM Jiabao Sun <
> > jiabao...@apache.org>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Congrats, Rui. Well-deserved!
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Jiabao
> > > > > > > > >
> > > > > > > > > Zhanghao Chen  于2024年6月5日周三
> > > 19:29写道:
> > > > > > > > >
> > > > > > > > > > Congrats, Rui!
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Zhanghao Chen
> > > > > > > > > > 
> > > > > > > > > > From: Piotr Nowojski 
> > > > > > > > > > Sent: Wednesday, June 5, 2024 18:01
> > > > > > > > > > To: dev ; rui fan <
> > > 1996fan...@gmail.com>
> > > > > > > > > > Subject: [ANNOUNCE] New Apache Flink PMC Member - Fan Rui
> > > > > > > > > >
> > > > > > > > > > Hi everyone,
> > > > > > > > > >
> > > > > > > > > > On behalf of the PMC, I'm very happy to announce another
> > new
> > > > > Apache
> > > > > > > Flink
> > > > > > > > > > PMC Member - Fan Rui.
> > > > > > > > > >
> > > > > > > > > > Rui has been active in the community since August 2019.
> > > During
> > > > > this
> > > > > > > time
> > > > > > > > > he
> > > > > > > > > > has contributed a lot of new features. Among others:
> > > > > > > > > >   - Decoupling Autoscaler from Kubernetes Operator, and
> > > > > supporting
> > > > > > > > > > Standalone Autoscaler
> > > > > > > > > >   - Improvements to checkpointing, flamegraphs, restart
> > > > > strategies,
> > > > > > > > > > watermark alignment, network shuffles
> > > > > > > > > >   - Optimizing the memory and CPU usage of large operators,
> > > > > greatly
> > > > > > > > > > reducing the risk and probability of TaskManager OOM
> > > > > > > > > >
> > > > > > > > > > He reviewed a significant amount of PRs and has been active
> > > > both
> > > > > on
> > > > > > > the
> > > > > > > > > > mailing lists and in Jira helping to both maintain and grow
> > > > > Apache
> > > > > > > > > Flink's
> > > > > > > > > > community. He is also our current Flink 1.20 release
> > manager.
> > > > > > > > > >
> > > > > > > > > > In the last 12 months, Rui has been the most active
> > > contributor
> > > > > in
> > > > > > > the
> > > > > > > > > > Flink Kubernetes Operator project, while being the 2nd most
> > > > > active
> > > > > > > Flink
> > > > > > > > > > contributor at the same time.
> > > > > > > > > >
> > > > > > > > > > Please join me in welcoming and congratulating Fan Rui!
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Piotrek (on behalf of the Flink PMC)
> > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
>
> --
> Best,
> Hangxiang.


Re: [ANNOUNCE] New Apache Flink PMC Member - Weijie Guo

2024-06-05 Thread Xuannan Su
Congratulations!

Best regards,
Xuannan

On Thu, Jun 6, 2024 at 9:20 AM Jiabao Sun  wrote:
>
> Congratulations, Weijie!
>
> Best,
> Jiabao
>
> Lincoln Lee  于2024年6月6日周四 09:17写道:
>
> > Congratulations, Weijie!
> >
> > Best,
> > Lincoln Lee
> >
> >
> > ConradJam  于2024年6月5日周三 10:46写道:
> >
> > > Congratulations!
> > >
> > > Biao Geng  于2024年6月5日周三 10:44写道:
> > >
> > > > Congratulations, Weijie!
> > > > Best,
> > > > Biao Geng
> > > >
> > > >
> > > > Yun Tang  于2024年6月5日周三 10:42写道:
> > > >
> > > > > Congratulations, Weijie!
> > > > >
> > > > > Best
> > > > > Yun Tang
> > > > > 
> > > > > From: Hangxiang Yu 
> > > > > Sent: Wednesday, June 5, 2024 10:00
> > > > > To: dev@flink.apache.org 
> > > > > Subject: Re: [ANNOUNCE] New Apache Flink PMC Member - Weijie Guo
> > > > >
> > > > > Congratulations, Weijie!
> > > > >
> > > > > On Tue, Jun 4, 2024 at 11:40 PM Zhanghao Chen <
> > > zhanghao.c...@outlook.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Congrats, Weijie!
> > > > > >
> > > > > > Best,
> > > > > > Zhanghao Chen
> > > > > > 
> > > > > > From: Hang Ruan 
> > > > > > Sent: Tuesday, June 4, 2024 16:37
> > > > > > To: dev@flink.apache.org 
> > > > > > Subject: Re: [ANNOUNCE] New Apache Flink PMC Member - Weijie Guo
> > > > > >
> > > > > > Congratulations Weijie!
> > > > > >
> > > > > > Best,
> > > > > > Hang
> > > > > >
> > > > > > Yanfei Lei  于2024年6月4日周二 16:24写道:
> > > > > >
> > > > > > > Congratulations!
> > > > > > >
> > > > > > > Best,
> > > > > > > Yanfei
> > > > > > >
> > > > > > > Leonard Xu  于2024年6月4日周二 16:20写道:
> > > > > > > >
> > > > > > > > Congratulations!
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Leonard
> > > > > > > >
> > > > > > > > > 2024年6月4日 下午4:02,Yangze Guo  写道:
> > > > > > > > >
> > > > > > > > > Congratulations!
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Yangze Guo
> > > > > > > > >
> > > > > > > > > On Tue, Jun 4, 2024 at 4:00 PM Weihua Hu <
> > > huweihua@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > > >>
> > > > > > > > >> Congratulations, Weijie!
> > > > > > > > >>
> > > > > > > > >> Best,
> > > > > > > > >> Weihua
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> On Tue, Jun 4, 2024 at 3:03 PM Yuxin Tan <
> > > > tanyuxinw...@gmail.com>
> > > > > > > wrote:
> > > > > > > > >>
> > > > > > > > >>> Congratulations, Weijie!
> > > > > > > > >>>
> > > > > > > > >>> Best,
> > > > > > > > >>> Yuxin
> > > > > > > > >>>
> > > > > > > > >>>
> > > > > > > > >>> Yuepeng Pan  于2024年6月4日周二 14:57写道:
> > > > > > > > >>>
> > > > > > > >  Congratulations !
> > > > > > > > 
> > > > > > > > 
> > > > > > > >  Best,
> > > > > > > >  Yuepeng Pan
> > > > > > > > 
> > > > > > > >  At 2024-06-04 14:45:45, "Xintong Song" <
> > > tonysong...@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > > > Hi everyone,
> > > > > > > > >
> > > > > > > > > On behalf of the PMC, I'm very happy to announce that
> > > Weijie
> > > > > Guo
> > > > > > > has
> > > > > > > >  joined
> > > > > > > > > the Flink PMC!
> > > > > > > > >
> > > > > > > > > Weijie has been an active member of the Apache Flink
> > > > community
> > > > > > for
> > > > > > > many
> > > > > > > > > years. He has made significant contributions in many
> > > > > components,
> > > > > > > > >>> including
> > > > > > > > > runtime, shuffle, sdk, connectors, etc. He has driven /
> > > > > > > participated in
> > > > > > > > > many FLIPs, authored and reviewed hundreds of PRs, been
> > > > > > > consistently
> > > > > > > >  active
> > > > > > > > > on mailing lists, and also helped with release management
> > > of
> > > > > 1.20
> > > > > > > and
> > > > > > > > > several other bugfix releases.
> > > > > > > > >
> > > > > > > > > Congratulations and welcome Weijie!
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > >
> > > > > > > > > Xintong (on behalf of the Flink PMC)
> > > > > > > > 
> > > > > > > > >>>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Best,
> > > > > Hangxiang.
> > > > >
> > > >
> > >
> > >
> > > --
> > > Best
> > >
> > > ConradJam
> > >
> >


[jira] [Created] (FLINK-35475) Introduce isInternalSorterSupport to OperatorAttributes

2024-05-28 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-35475:
--

 Summary: Introduce isInternalSorterSupport to OperatorAttributes
 Key: FLINK-35475
 URL: https://issues.apache.org/jira/browse/FLINK-35475
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: Xuannan Su


Introduce isInternalSorterSupport to OperatorAttributes to notify Flink whether 
the operator will sort the data internally in batch mode or during backlog.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-457: Improve Table/SQL Configuration for Flink 2.0

2024-05-27 Thread Xuannan Su
+1 (non-binding)

Best regards,
Xuannan

On Mon, May 27, 2024 at 3:43 PM Jark Wu  wrote:
>
> +1 (binding)
>
> Best,
> Jark
>
> On Mon, 27 May 2024 at 14:29, Hang Ruan  wrote:
>
> > +1 (non-binding)
> >
> > Best,
> > Hang
> >
> > gongzhongqiang  于2024年5月27日周一 14:16写道:
> >
> > > +1 (non-binding)
> > >
> > > Best,
> > > Zhongqiang Gong
> > >
> > > Jane Chan  于2024年5月24日周五 09:52写道:
> > >
> > > > Hi all,
> > > >
> > > > I'd like to start a vote on FLIP-457[1] after reaching a consensus
> > > through
> > > > the discussion thread[2].
> > > >
> > > > The vote will be open for at least 72 hours unless there is an
> > objection
> > > or
> > > > insufficient votes.
> > > >
> > > >
> > > > [1]
> > > >
> > >
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=307136992
> > > > [2] https://lists.apache.org/thread/1sthbv6q00sq52pp04n2p26d70w4fqj1
> > > >
> > > > Best,
> > > > Jane
> > > >
> > >
> >


[jira] [Created] (FLINK-35461) Improve Runtime Configuration for Flink 2.0

2024-05-26 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-35461:
--

 Summary: Improve Runtime Configuration for Flink 2.0
 Key: FLINK-35461
 URL: https://issues.apache.org/jira/browse/FLINK-35461
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Configuration
Reporter: Xuannan Su


As Flink moves toward 2.0, we have revisited all runtime configurations and 
identified several improvements to enhance user-friendliness and 
maintainability. In this FLIP, we aim to refine the runtime configuration.

 

This ticket implements all the changes discussed in 
[FLIP-450|https://cwiki.apache.org/confluence/display/FLINK/FLIP-450%3A+Improve+Runtime+Configuration+for+Flink+2.0].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[RESULT][VOTE] FLIP-450: Improve Runtime Configuration for Flink 2.0

2024-05-20 Thread Xuannan Su
Hi devs,

I'm happy to announce that FLIP-450: Improve Runtime Configuration for
Flink 2.0[1] has been accepted with 3 approving
votes (3 bindings) [2]:

- Rui Fan (binding)
- Weijie Guo (binding)
- Xintong Song (binding)

There are no disapproving votes.

Thanks again to everyone who participated in the discussion and voting.

Best regards,
Xuannan

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-450%3A+Improve+Runtime+Configuration+for+Flink+2.0
[2] https://lists.apache.org/thread/1m6kzm05mpfqvj45x7j3ymkjr17q89hl


Re: [VOTE] FLIP-450: Improve Runtime Configuration for Flink 2.0

2024-05-20 Thread Xuannan Su
Hi all,

Thank you all! Closing the vote. The result will be announced in a
separate email.

Best regards,
Xuannan

On Thu, May 16, 2024 at 10:05 AM Xintong Song  wrote:
>
> +1 (binding)
>
> Best,
>
> Xintong
>
>
>
> On Wed, May 15, 2024 at 6:53 PM weijie guo 
> wrote:
>
> > +1(binding)
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > Rui Fan <1996fan...@gmail.com> 于2024年5月15日周三 17:50写道:
> >
> > > +1(binding)
> > >
> > > Best,
> > > Rui
> > >
> > > On Wed, May 15, 2024 at 5:01 PM Xuannan Su 
> > wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > Thanks for all the feedback about the FLIP-450: Improve Runtime
> > > > Configuration for Flink 2.0 [1] [2].
> > > >
> > > > I'd like to start a vote for it. The vote will be open for at least 72
> > > > hours(excluding weekends,until MAY 20, 12:00AM GMT) unless there is an
> > > > objection or an insufficient number of votes.
> > > >
> > > > [1]
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-450%3A+Improve+Runtime+Configuration+for+Flink+2.0
> > > > [2] https://lists.apache.org/thread/20mkzd31607posls793hxy7mht40xp2x
> > > >
> > > >
> > > > Best regards,
> > > > Xuannan
> > > >
> > >
> >


Re: [DISCUSSION] FLIP-450: Improve Runtime Configuration for Flink 2.0

2024-05-15 Thread Xuannan Su
Hi David,

Thanks for the feedback!

> For the proposed deprecations, will there be a proposed recommended 
> alternatives that will be mentioned in the deprecation. If they are going to 
> be removed for v2, it would be good to explicitly document the thinking and 
> impact of the deprecations and consequent removals,

Some options to be deprecated are considered too complicated for user
to tweak. There will be alternatives for those deprecated options,
e.g., the option related to the floating buffer. While some options
have a reasonable default value and there are hardly tweaked, those
options will not have an alternative and the current default value
will be used, e.g., the options related to the netty. And we will
document the reason for the deprecation in 1.20.

Best,
Xuannan



On Thu, May 16, 2024 at 12:01 AM David Radley  wrote:
>
> Hi Xuannan,
> I like that you are cleaning up options that I assume are not recommended or 
> currently used in some way.
>
> I have not got experience of these options. For the proposed deprecations, 
> will there be a proposed recommended alternatives that will be mentioned in 
> the deprecation. If they are going to be removed for v2, it would be good to 
> explicitly document the thinking and impact of the deprecations and 
> consequent removals,
>   Kind regards, David.
>
> From: Xuannan Su 
> Date: Tuesday, 14 May 2024 at 02:08
> To: dev@flink.apache.org 
> Subject: [EXTERNAL] Re: [DISCUSSION] FLIP-450: Improve Runtime Configuration 
> for Flink 2.0
> Hi all,
>
> Thank you for all the comments and suggestions! If there are no
> further comments, I'd like to close the discussion and start the
> voting in two days.
>
> Best regards,
> Xuannan
>
>
>
> On Mon, May 13, 2024 at 3:10 PM Jeyhun Karimov  wrote:
> >
> > Hi Xuannan,
> >
> > Thanks a lot for the update. The FLIP looks good to me. +1 for it.
> >
> > Regards,
> > Jeyhun
> >
> > On Mon, May 13, 2024 at 4:45 AM Xuannan Su  wrote:
> >
> > > Hi Jeyhun,
> > >
> > > Thanks for the comment!
> > >
> > > Yes, we intended to remove the StreamPiplineOptions in 2.0. I updated
> > > the FLIP to include the information.
> > >
> > > Best regards,
> > > Xuannan
> > >
> > > On Sun, May 12, 2024 at 9:16 PM Jeyhun Karimov 
> > > wrote:
> > > >
> > > > Hi Xuannan,
> > > >
> > > > Thanks for driving this FLIP!
> > > > I have a minor comment. Do we plan to remove StreamPipelineOptions in
> > > 2.0,
> > > > as it only contains deprecated options?
> > > >
> > > > Regards,
> > > > Jeyhun
> > > >
> > > > On Sat, May 11, 2024 at 4:40 AM Rui Fan <1996fan...@gmail.com> wrote:
> > > >
> > > > > Thanks Xuannan for the update!
> > > > >
> > > > > LGTM, +1 for this proposal.
> > > > >
> > > > > Best,
> > > > > Rui
> > > > >
> > > > > On Sat, May 11, 2024 at 10:20 AM Xuannan Su 
> > > wrote:
> > > > >
> > > > > > Hi Rui,
> > > > > >
> > > > > > Thanks for the suggestion!
> > > > > >
> > > > > > I updated the description of
> > > > > > taskmanager.network.memory.max-overdraft-buffers-per-gate and
> > > > > > hard-coded it to 20.
> > > > > >
> > > > > > Best regards,
> > > > > > Xuannan
> > > > > >
> > > > > > On Mon, May 6, 2024 at 11:28 AM Rui Fan <1996fan...@gmail.com>
> > > wrote:
> > > > > > >
> > > > > > > Thanks Xuannan for driving this proposal!
> > > > > > >
> > > > > > > > taskmanager.network.memory.max-overdraft-buffers-per-gate will 
> > > > > > > > be
> > > > > > removed
> > > > > > > and hard-coded to either 10 or 20.
> > > > > > >
> > > > > > > Currently, it's a public option. Could we determine the value of
> > > > > > > the overdraft buffer in the current FLIP?
> > > > > > >
> > > > > > > I vote 20 as the hard code value due to 2 reasons:
> > > > > > > - Removing this option means users cannot change it, it might be
> > > better
> > > > > > to
> > > > > > > turn it up.
> > > > > 

Re: [DISCUSSION] FLIP-457: Improve Table/SQL Configuration for Flink 2.0

2024-05-15 Thread Xuannan Su
Hi Jane,

Thanks for driving this effort! And +1 for the proposed changes.

I have one comment on the migration plan.

For options to be moved to another module/package, I think we have to
mark the old option deprecated in 1.20 for it to be removed in 2.0,
according to the API compatibility guarantees[1]. We can introduce the
new option in 1.20 with the same option key in the intended class.
WDYT?

Best,
Xuannan

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/#api-compatibility-guarantees



On Wed, May 15, 2024 at 6:20 PM Jane Chan  wrote:
>
> Hi all,
>
> I'd like to start a discussion on FLIP-457: Improve Table/SQL Configuration
> for Flink 2.0 [1]. This FLIP revisited all Table/SQL configurations to
> improve user-friendliness and maintainability as Flink moves toward 2.0.
>
> I am looking forward to your feedback.
>
> Best regards,
> Jane
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=307136992


[jira] [Created] (FLINK-35359) General Improvement to Configuration for Flink 2.0

2024-05-15 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-35359:
--

 Summary: General Improvement to Configuration for Flink 2.0
 Key: FLINK-35359
 URL: https://issues.apache.org/jira/browse/FLINK-35359
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Configuration
Reporter: Xuannan Su


As Flink moves toward version 2.0, we want to provide users with a better 
experience with the existing configuration. In this FLIP, we outline several 
general improvements to the current configuration:
 * Ensure all the ConfigOptions are properly annotated

 * Ensure all user-facing configurations are included in the documentation 
generation process

 * Make the existing ConfigOptions use the proper type

 * Mark all internally used ConfigOptions with the @Internal annotation

 

https://cwiki.apache.org/confluence/display/FLINK/FLIP-442%3A+General+Improvement+to+Configuration+for+Flink+2.0

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[VOTE] FLIP-450: Improve Runtime Configuration for Flink 2.0

2024-05-15 Thread Xuannan Su
Hi everyone,

Thanks for all the feedback about the FLIP-450: Improve Runtime
Configuration for Flink 2.0 [1] [2].

I'd like to start a vote for it. The vote will be open for at least 72
hours(excluding weekends,until MAY 20, 12:00AM GMT) unless there is an
objection or an insufficient number of votes.

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-450%3A+Improve+Runtime+Configuration+for+Flink+2.0
[2] https://lists.apache.org/thread/20mkzd31607posls793hxy7mht40xp2x


Best regards,
Xuannan


Re: [DISCUSSION] FLIP-450: Improve Runtime Configuration for Flink 2.0

2024-05-13 Thread Xuannan Su
Hi all,

Thank you for all the comments and suggestions! If there are no
further comments, I'd like to close the discussion and start the
voting in two days.

Best regards,
Xuannan



On Mon, May 13, 2024 at 3:10 PM Jeyhun Karimov  wrote:
>
> Hi Xuannan,
>
> Thanks a lot for the update. The FLIP looks good to me. +1 for it.
>
> Regards,
> Jeyhun
>
> On Mon, May 13, 2024 at 4:45 AM Xuannan Su  wrote:
>
> > Hi Jeyhun,
> >
> > Thanks for the comment!
> >
> > Yes, we intended to remove the StreamPiplineOptions in 2.0. I updated
> > the FLIP to include the information.
> >
> > Best regards,
> > Xuannan
> >
> > On Sun, May 12, 2024 at 9:16 PM Jeyhun Karimov 
> > wrote:
> > >
> > > Hi Xuannan,
> > >
> > > Thanks for driving this FLIP!
> > > I have a minor comment. Do we plan to remove StreamPipelineOptions in
> > 2.0,
> > > as it only contains deprecated options?
> > >
> > > Regards,
> > > Jeyhun
> > >
> > > On Sat, May 11, 2024 at 4:40 AM Rui Fan <1996fan...@gmail.com> wrote:
> > >
> > > > Thanks Xuannan for the update!
> > > >
> > > > LGTM, +1 for this proposal.
> > > >
> > > > Best,
> > > > Rui
> > > >
> > > > On Sat, May 11, 2024 at 10:20 AM Xuannan Su 
> > wrote:
> > > >
> > > > > Hi Rui,
> > > > >
> > > > > Thanks for the suggestion!
> > > > >
> > > > > I updated the description of
> > > > > taskmanager.network.memory.max-overdraft-buffers-per-gate and
> > > > > hard-coded it to 20.
> > > > >
> > > > > Best regards,
> > > > > Xuannan
> > > > >
> > > > > On Mon, May 6, 2024 at 11:28 AM Rui Fan <1996fan...@gmail.com>
> > wrote:
> > > > > >
> > > > > > Thanks Xuannan for driving this proposal!
> > > > > >
> > > > > > > taskmanager.network.memory.max-overdraft-buffers-per-gate will be
> > > > > removed
> > > > > > and hard-coded to either 10 or 20.
> > > > > >
> > > > > > Currently, it's a public option. Could we determine the value of
> > > > > > the overdraft buffer in the current FLIP?
> > > > > >
> > > > > > I vote 20 as the hard code value due to 2 reasons:
> > > > > > - Removing this option means users cannot change it, it might be
> > better
> > > > > to
> > > > > > turn it up.
> > > > > > - Most of tasks don't use the overdraft buffer, so increasing it
> > > > doesn't
> > > > > > introduce more risk.
> > > > > >
> > > > > > Best,
> > > > > > Rui
> > > > > >
> > > > > > On Mon, May 6, 2024 at 10:47 AM Yuxin Tan 
> > > > > wrote:
> > > > > >
> > > > > > > Thanks for the effort, Xuannan.
> > > > > > >
> > > > > > > +1 for the proposal.
> > > > > > >
> > > > > > > Best,
> > > > > > > Yuxin
> > > > > > >
> > > > > > >
> > > > > > > Xintong Song  于2024年4月29日周一 15:40写道:
> > > > > > >
> > > > > > > > Thanks for driving this effort, Xuannan.
> > > > > > > >
> > > > > > > > +1 for the proposed changes.
> > > > > > > >
> > > > > > > > Just one suggestion: Some of the proposed changes involve not
> > > > solely
> > > > > > > > changing the configuration options, but are bound to changing /
> > > > > removal
> > > > > > > of
> > > > > > > > certain features. E.g., the removal of hash-blocking shuffle
> > and
> > > > > legacy
> > > > > > > > hybrid shuffle mode, and the behavior change of overdraft
> > network
> > > > > > > buffers.
> > > > > > > > Therefore, it might be nicer to provide an implementation plan
> > > > with a
> > > > > > > list
> > > > > > > > of related tasks in the FLIP. This should not block the FLIP
> > > > though.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > >
> > > > > > > > Xintong
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Thu, Apr 25, 2024 at 4:35 PM Xuannan Su <
> > suxuanna...@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi all,
> > > > > > > > >
> > > > > > > > > I'd like to start a discussion on FLIP-450: Improve Runtime
> > > > > > > > > Configuration for Flink 2.0 [1]. As Flink moves toward 2.0,
> > we
> > > > have
> > > > > > > > > revisited all runtime configurations and identified several
> > > > > > > > > improvements to enhance user-friendliness and
> > maintainability. In
> > > > > this
> > > > > > > > > FLIP, we aim to refine the runtime configuration.
> > > > > > > > >
> > > > > > > > > Looking forward to everyone's feedback and suggestions. Thank
> > > > you!
> > > > > > > > >
> > > > > > > > > Best regards,
> > > > > > > > > Xuannan
> > > > > > > > >
> > > > > > > > > [1]
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-450%3A+Improve+Runtime+Configuration+for+Flink+2.0
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > >
> > > >
> >


Re: [DISCUSSION] FLIP-450: Improve Runtime Configuration for Flink 2.0

2024-05-12 Thread Xuannan Su
Hi Jeyhun,

Thanks for the comment!

Yes, we intended to remove the StreamPiplineOptions in 2.0. I updated
the FLIP to include the information.

Best regards,
Xuannan

On Sun, May 12, 2024 at 9:16 PM Jeyhun Karimov  wrote:
>
> Hi Xuannan,
>
> Thanks for driving this FLIP!
> I have a minor comment. Do we plan to remove StreamPipelineOptions in 2.0,
> as it only contains deprecated options?
>
> Regards,
> Jeyhun
>
> On Sat, May 11, 2024 at 4:40 AM Rui Fan <1996fan...@gmail.com> wrote:
>
> > Thanks Xuannan for the update!
> >
> > LGTM, +1 for this proposal.
> >
> > Best,
> > Rui
> >
> > On Sat, May 11, 2024 at 10:20 AM Xuannan Su  wrote:
> >
> > > Hi Rui,
> > >
> > > Thanks for the suggestion!
> > >
> > > I updated the description of
> > > taskmanager.network.memory.max-overdraft-buffers-per-gate and
> > > hard-coded it to 20.
> > >
> > > Best regards,
> > > Xuannan
> > >
> > > On Mon, May 6, 2024 at 11:28 AM Rui Fan <1996fan...@gmail.com> wrote:
> > > >
> > > > Thanks Xuannan for driving this proposal!
> > > >
> > > > > taskmanager.network.memory.max-overdraft-buffers-per-gate will be
> > > removed
> > > > and hard-coded to either 10 or 20.
> > > >
> > > > Currently, it's a public option. Could we determine the value of
> > > > the overdraft buffer in the current FLIP?
> > > >
> > > > I vote 20 as the hard code value due to 2 reasons:
> > > > - Removing this option means users cannot change it, it might be better
> > > to
> > > > turn it up.
> > > > - Most of tasks don't use the overdraft buffer, so increasing it
> > doesn't
> > > > introduce more risk.
> > > >
> > > > Best,
> > > > Rui
> > > >
> > > > On Mon, May 6, 2024 at 10:47 AM Yuxin Tan 
> > > wrote:
> > > >
> > > > > Thanks for the effort, Xuannan.
> > > > >
> > > > > +1 for the proposal.
> > > > >
> > > > > Best,
> > > > > Yuxin
> > > > >
> > > > >
> > > > > Xintong Song  于2024年4月29日周一 15:40写道:
> > > > >
> > > > > > Thanks for driving this effort, Xuannan.
> > > > > >
> > > > > > +1 for the proposed changes.
> > > > > >
> > > > > > Just one suggestion: Some of the proposed changes involve not
> > solely
> > > > > > changing the configuration options, but are bound to changing /
> > > removal
> > > > > of
> > > > > > certain features. E.g., the removal of hash-blocking shuffle and
> > > legacy
> > > > > > hybrid shuffle mode, and the behavior change of overdraft network
> > > > > buffers.
> > > > > > Therefore, it might be nicer to provide an implementation plan
> > with a
> > > > > list
> > > > > > of related tasks in the FLIP. This should not block the FLIP
> > though.
> > > > > >
> > > > > > Best,
> > > > > >
> > > > > > Xintong
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Thu, Apr 25, 2024 at 4:35 PM Xuannan Su 
> > > > > wrote:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > I'd like to start a discussion on FLIP-450: Improve Runtime
> > > > > > > Configuration for Flink 2.0 [1]. As Flink moves toward 2.0, we
> > have
> > > > > > > revisited all runtime configurations and identified several
> > > > > > > improvements to enhance user-friendliness and maintainability. In
> > > this
> > > > > > > FLIP, we aim to refine the runtime configuration.
> > > > > > >
> > > > > > > Looking forward to everyone's feedback and suggestions. Thank
> > you!
> > > > > > >
> > > > > > > Best regards,
> > > > > > > Xuannan
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > >
> > > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-450%3A+Improve+Runtime+Configuration+for+Flink+2.0
> > > > > > >
> > > > > >
> > > > >
> > >
> >


Re: [DISCUSSION] FLIP-450: Improve Runtime Configuration for Flink 2.0

2024-05-10 Thread Xuannan Su
Hi Rui,

Thanks for the suggestion!

I updated the description of
taskmanager.network.memory.max-overdraft-buffers-per-gate and
hard-coded it to 20.

Best regards,
Xuannan

On Mon, May 6, 2024 at 11:28 AM Rui Fan <1996fan...@gmail.com> wrote:
>
> Thanks Xuannan for driving this proposal!
>
> > taskmanager.network.memory.max-overdraft-buffers-per-gate will be removed
> and hard-coded to either 10 or 20.
>
> Currently, it's a public option. Could we determine the value of
> the overdraft buffer in the current FLIP?
>
> I vote 20 as the hard code value due to 2 reasons:
> - Removing this option means users cannot change it, it might be better to
> turn it up.
> - Most of tasks don't use the overdraft buffer, so increasing it doesn't
> introduce more risk.
>
> Best,
> Rui
>
> On Mon, May 6, 2024 at 10:47 AM Yuxin Tan  wrote:
>
> > Thanks for the effort, Xuannan.
> >
> > +1 for the proposal.
> >
> > Best,
> > Yuxin
> >
> >
> > Xintong Song  于2024年4月29日周一 15:40写道:
> >
> > > Thanks for driving this effort, Xuannan.
> > >
> > > +1 for the proposed changes.
> > >
> > > Just one suggestion: Some of the proposed changes involve not solely
> > > changing the configuration options, but are bound to changing / removal
> > of
> > > certain features. E.g., the removal of hash-blocking shuffle and legacy
> > > hybrid shuffle mode, and the behavior change of overdraft network
> > buffers.
> > > Therefore, it might be nicer to provide an implementation plan with a
> > list
> > > of related tasks in the FLIP. This should not block the FLIP though.
> > >
> > > Best,
> > >
> > > Xintong
> > >
> > >
> > >
> > > On Thu, Apr 25, 2024 at 4:35 PM Xuannan Su 
> > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I'd like to start a discussion on FLIP-450: Improve Runtime
> > > > Configuration for Flink 2.0 [1]. As Flink moves toward 2.0, we have
> > > > revisited all runtime configurations and identified several
> > > > improvements to enhance user-friendliness and maintainability. In this
> > > > FLIP, we aim to refine the runtime configuration.
> > > >
> > > > Looking forward to everyone's feedback and suggestions. Thank you!
> > > >
> > > > Best regards,
> > > > Xuannan
> > > >
> > > > [1]
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-450%3A+Improve+Runtime+Configuration+for+Flink+2.0
> > > >
> > >
> >


Re: [DISCUSSION] FLIP-450: Improve Runtime Configuration for Flink 2.0

2024-05-10 Thread Xuannan Su
Hi Xintong,

Thanks for the suggestion! I updated the FLIP to include a list of
related tasks.

Best regards,
Xuannan

On Mon, Apr 29, 2024 at 3:40 PM Xintong Song  wrote:
>
> Thanks for driving this effort, Xuannan.
>
> +1 for the proposed changes.
>
> Just one suggestion: Some of the proposed changes involve not solely
> changing the configuration options, but are bound to changing / removal of
> certain features. E.g., the removal of hash-blocking shuffle and legacy
> hybrid shuffle mode, and the behavior change of overdraft network buffers.
> Therefore, it might be nicer to provide an implementation plan with a list
> of related tasks in the FLIP. This should not block the FLIP though.
>
> Best,
>
> Xintong
>
>
>
> On Thu, Apr 25, 2024 at 4:35 PM Xuannan Su  wrote:
>
> > Hi all,
> >
> > I'd like to start a discussion on FLIP-450: Improve Runtime
> > Configuration for Flink 2.0 [1]. As Flink moves toward 2.0, we have
> > revisited all runtime configurations and identified several
> > improvements to enhance user-friendliness and maintainability. In this
> > FLIP, we aim to refine the runtime configuration.
> >
> > Looking forward to everyone's feedback and suggestions. Thank you!
> >
> > Best regards,
> > Xuannan
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-450%3A+Improve+Runtime+Configuration+for+Flink+2.0
> >


[DISCUSSION] FLIP-450: Improve Runtime Configuration for Flink 2.0

2024-04-25 Thread Xuannan Su
Hi all,

I'd like to start a discussion on FLIP-450: Improve Runtime
Configuration for Flink 2.0 [1]. As Flink moves toward 2.0, we have
revisited all runtime configurations and identified several
improvements to enhance user-friendliness and maintainability. In this
FLIP, we aim to refine the runtime configuration.

Looking forward to everyone's feedback and suggestions. Thank you!

Best regards,
Xuannan

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-450%3A+Improve+Runtime+Configuration+for+Flink+2.0


[RESULT][VOTE] FLIP-442: General Improvement to Configuration for Flink 2.0

2024-04-22 Thread Xuannan Su
Hi devs,

I'm happy to announce that FLIP-442: General Improvement to
Configuration for Flink 2.0[1] has been accepted with 9 approving
votes (4 bindings) [2]:

- Rui Fan (binding)
- Zakelly Lan (binding)
- Xintong Song (binding)
- Yuxin Tan (non-binding)
- Zhu Zhu (binding)
- Jeyhun Karimov (non-binding)
- Ahmed Hamdy (non-binding)
- Muhammet Orazov (non-binding)
- Zhongqiang Gong (non-binding)


There are no disapproving votes.

Thanks again to everyone who participated in the discussion and voting.

Best regards,
Xuannan

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-442%3A+General+Improvement+to+Configuration+for+Flink+2.0
[2] https://lists.apache.org/thread/sx8v66w6rogj8v3sfm4wfmb6tj5dw89y


Re: [VOTE] FLIP-442: General Improvement to Configuration for Flink 2.0

2024-04-22 Thread Xuannan Su
Hi all,

Thank you all! Closing the vote. The result will be announced in a
separate email.

Best regards,
Xuannan

On Fri, Apr 19, 2024 at 10:58 AM gongzhongqiang
 wrote:
>
> +1 (non-binding)
>
>
> Best,
> Zhongqiang Gong
>
> Xuannan Su  于2024年4月17日周三 13:02写道:
>
> > Hi everyone,
> >
> > Thanks for all the feedback about the FLIP-442: General Improvement to
> > Configuration for Flink 2.0 [1] [2].
> >
> > I'd like to start a vote for it. The vote will be open for at least 72
> > hours(excluding weekends,until APR 22, 12:00AM GMT) unless there is an
> > objection or an insufficient number of votes.
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-442%3A+General+Improvement+to+Configuration+for+Flink+2.0
> > [2] https://lists.apache.org/thread/15k0stwyoknhxvd643ctwjw3fd17pqwk
> >
> >
> > Best regards,
> > Xuannan
> >


[VOTE] FLIP-442: General Improvement to Configuration for Flink 2.0

2024-04-16 Thread Xuannan Su
Hi everyone,

Thanks for all the feedback about the FLIP-442: General Improvement to
Configuration for Flink 2.0 [1] [2].

I'd like to start a vote for it. The vote will be open for at least 72
hours(excluding weekends,until APR 22, 12:00AM GMT) unless there is an
objection or an insufficient number of votes.

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-442%3A+General+Improvement+to+Configuration+for+Flink+2.0
[2] https://lists.apache.org/thread/15k0stwyoknhxvd643ctwjw3fd17pqwk


Best regards,
Xuannan


Re: [ANNOUNCE] New Apache Flink Committer - Zakelly Lan

2024-04-15 Thread Xuannan Su
Congratulations Zakelly!

Best regards,
Xuannan

On Mon, Apr 15, 2024 at 4:31 PM Jing Ge  wrote:
>
> Congratulations Zakelly!
>
> Best regards,
> Jing
>
> On Mon, Apr 15, 2024 at 4:26 PM Xia Sun  wrote:
>
> > Congratulations Zakelly!
> >
> >  Best,
> >  Xia
> >
> > Leonard Xu  于2024年4月15日周一 16:16写道:
> >
> > > Congratulations Zakelly!
> > >
> > >
> > > Best,
> > > Leonard
> > > > 2024年4月15日 下午3:56,Samrat Deb  写道:
> > > >
> > > > Congratulations Zakelly!
> > >
> > >
> >


Re: [ANNOUNCE] New Apache Flink PMC Member - Lincoln Lee

2024-04-15 Thread Xuannan Su
Congratulations, Lincoln!

Best regards,
Xuannan

On Tue, Apr 16, 2024 at 10:04 AM Hang Ruan  wrote:
>
> Congratulations, Lincoln!
>
> Best,
> Hang
>
> yh z  于2024年4月16日周二 09:14写道:
>
> > Congratulations, Lincoln!
> >
> > Best,
> > Yunhong (Swuferhong)
> >
> >
> > Swapnal Varma  于2024年4月15日周一 18:50写道:
> >
> > > Congratulations, Lincoln!
> > >
> > > Best,
> > > Swapnal
> > >
> > >
> > > On Mon, 15 Apr 2024, 15:16 Jacky Lau,  wrote:
> > >
> > > > Congratulations, Lincoln!
> > > >
> > > > Best,
> > > > Jacky Lau
> > > >
> > > > Jinzhong Li  于2024年4月15日周一 15:45写道:
> > > >
> > > > > Congratulations, Lincoln!
> > > > >
> > > > > Best,
> > > > > Jinzhong Li
> > > > >
> > > > > On Mon, Apr 15, 2024 at 2:56 PM Hangxiang Yu 
> > > > wrote:
> > > > >
> > > > > > Congratulations, Lincoln!
> > > > > >
> > > > > > On Mon, Apr 15, 2024 at 10:17 AM Zakelly Lan <
> > zakelly@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Congratulations, Lincoln!
> > > > > > >
> > > > > > >
> > > > > > > Best,
> > > > > > > Zakelly
> > > > > > >
> > > > > > > On Sat, Apr 13, 2024 at 12:48 AM Ferenc Csaky
> > > > >  > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Congratulations, Lincoln!
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Ferenc
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Friday, April 12th, 2024 at 15:54,
> > > > lorenzo.affe...@ververica.com
> > > > > > > .INVALID
> > > > > > > >  wrote:
> > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Huge congrats! Well done!
> > > > > > > > > On Apr 12, 2024 at 13:56 +0200, Ron liu ron9@gmail.com,
> > > > wrote:
> > > > > > > > >
> > > > > > > > > > Congratulations, Lincoln!
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Ron
> > > > > > > > > >
> > > > > > > > > > Junrui Lee jrlee@gmail.com 于2024年4月12日周五 18:54写道:
> > > > > > > > > >
> > > > > > > > > > > Congratulations, Lincoln!
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Junrui
> > > > > > > > > > >
> > > > > > > > > > > Aleksandr Pilipenko z3d...@gmail.com 于2024年4月12日周五
> > > 18:29写道:
> > > > > > > > > > >
> > > > > > > > > > > > > Congratulations, Lincoln!
> > > > > > > > > > > > >
> > > > > > > > > > > > > Best Regards
> > > > > > > > > > > > > Aleksandr
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Best,
> > > > > > Hangxiang.
> > > > > >
> > > > >
> > > >
> > >
> >


Re: [DISCUSSION] FLIP-442: FLIP-442: General Improvement to Configuration for Flink 2.0

2024-04-14 Thread Xuannan Su
Hi all,

Thank you for all the comments and suggestions! If there are no
further comments, I'd like to close the discussion and start the
voting in two days.

Best regards,
Xuannan

On Fri, Apr 12, 2024 at 9:49 AM Rui Fan <1996fan...@gmail.com> wrote:
>
> Hi Xuannan,
>
> > Regarding the new classes, we propose updating the
> > `ConfigOptionsDocGenerator` to throw an exception and fail the build
> > if it detects any new class missing the proper annotation in the
> > future.
>
> Thanks for your feedback, it sounds good to me.
>
> Best,
> Rui
>
> On Fri, Apr 12, 2024 at 9:35 AM Xuannan Su  wrote:
>>
>> Hi Rui,
>>
>> If I understand correctly, all classes without annotations are
>> non-public by default. I'm concerned that adding an exception to this
>> rule will make it harder to understand.
>>
>> Regarding the new classes, we propose updating the
>> `ConfigOptionsDocGenerator` to throw an exception and fail the build
>> if it detects any new class missing the proper annotation in the
>> future.
>>
>> Best regards,
>> Xuannan
>>
>>
>> On Wed, Apr 10, 2024 at 2:09 PM Rui Fan <1996fan...@gmail.com> wrote:
>> >
>> > Thanks Xuannan for driving this proposal!
>> >
>> > > Ensure all the ConfigOptions are properly annotated as PublicEvolving
>> >
>> > Could we add a specification directly? All XxxOptions classes are
>> > PublicEvolving by default. I'm afraid some new classes still miss
>> > PublicEvolving in the future.
>> >
>> > If we have a specification, it will be clear. And we don't need to
>> > add PublicEvolving for each XxxOptions.
>> >
>> > Best,
>> > Rui
>> >
>> > On Wed, Apr 10, 2024 at 1:54 PM Muhammet Orazov 
>> >  wrote:
>> >>
>> >> Hey Xuannan,
>> >>
>> >> Thanks for the FLIP and your efforts!
>> >>
>> >> Minor clarification from my side:
>> >>
>> >> > We will relocate these ConfigOptions to a class that is included
>> >> > in the documentation generation.
>> >>
>> >> Would it make sense to define also in the FLIP the options class for
>> >> these variables? For example, GPUDriverOptions?
>> >>
>> >> Best,
>> >> Muhammet
>> >>
>> >> On 2024-04-09 08:20, Xuannan Su wrote:
>> >> > Hi all,
>> >> >
>> >> > I'd like to start a discussion on FLIP-442: General Improvement to
>> >> > Configuration for Flink 2.0 [1]. As Flink moves toward 2.0, we aim to
>> >> > provide users with a better experience with the existing
>> >> > configuration. This FLIP proposes several general improvements to the
>> >> > current configuration.
>> >> >
>> >> > Looking forward to everyone's feedback and suggestions. Thank you!
>> >> >
>> >> > Best regards,
>> >> > Xuannan
>> >> >
>> >> > [1]
>> >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-442%3A+General+Improvement+to+Configuration+for+Flink+2.0


[jira] [Created] (FLINK-35089) Two input AbstractStreamOperator may throw NPE when receiving RecordAttributes

2024-04-11 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-35089:
--

 Summary: Two input AbstractStreamOperator may throw NPE when 
receiving RecordAttributes
 Key: FLINK-35089
 URL: https://issues.apache.org/jira/browse/FLINK-35089
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.19.0
Reporter: Xuannan Su


Currently the `lastRecordAttributes1` and `lastRecordAttributes2` in the 
`AbstractStreamOperator` are transient. The two fields will be null when it is 
deserialized in TaskManager, which may cause an NPE.

To fix it, we propose to make the RecordAttributes serialization and these 
fields non-transient.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSSION] FLIP-442: FLIP-442: General Improvement to Configuration for Flink 2.0

2024-04-10 Thread Xuannan Su
Hi Muhammet,

Thanks for the suggestion. I updated the FLIP to include the options class.

Best regards,
Xuannan

On Wed, Apr 10, 2024 at 1:56 PM Muhammet Orazov
 wrote:
>
> Hey Xuannan,
>
> Thanks for the FLIP and your efforts!
>
> Minor clarification from my side:
>
> > We will relocate these ConfigOptions to a class that is included
> > in the documentation generation.
>
> Would it make sense to define also in the FLIP the options class for
> these variables? For example, GPUDriverOptions?
>
> Best,
> Muhammet
>
> On 2024-04-09 08:20, Xuannan Su wrote:
> > Hi all,
> >
> > I'd like to start a discussion on FLIP-442: General Improvement to
> > Configuration for Flink 2.0 [1]. As Flink moves toward 2.0, we aim to
> > provide users with a better experience with the existing
> > configuration. This FLIP proposes several general improvements to the
> > current configuration.
> >
> > Looking forward to everyone's feedback and suggestions. Thank you!
> >
> > Best regards,
> > Xuannan
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-442%3A+General+Improvement+to+Configuration+for+Flink+2.0


Re: [DISCUSSION] FLIP-442: FLIP-442: General Improvement to Configuration for Flink 2.0

2024-04-10 Thread Xuannan Su
Hi Zakelly,

Thanks for the comments. I updated the FLIP accordingly.

Best regards,
Xuannan

On Wed, Apr 10, 2024 at 11:12 AM Zakelly Lan  wrote:
>
> Thanks Xuannan for driving this! +1 for cleaning these up.
>
> And minor comments: It seems the StateBackendOptions is already annotated
> with @PublicEvolving.
>
>
> Best,
> Zakelly
>
>
> On Tue, Apr 9, 2024 at 4:21 PM Xuannan Su  wrote:
>
> > Hi all,
> >
> > I'd like to start a discussion on FLIP-442: General Improvement to
> > Configuration for Flink 2.0 [1]. As Flink moves toward 2.0, we aim to
> > provide users with a better experience with the existing
> > configuration. This FLIP proposes several general improvements to the
> > current configuration.
> >
> > Looking forward to everyone's feedback and suggestions. Thank you!
> >
> > Best regards,
> > Xuannan
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-442%3A+General+Improvement+to+Configuration+for+Flink+2.0
> >


[DISCUSSION] FLIP-442: FLIP-442: General Improvement to Configuration for Flink 2.0

2024-04-09 Thread Xuannan Su
Hi all,

I'd like to start a discussion on FLIP-442: General Improvement to
Configuration for Flink 2.0 [1]. As Flink moves toward 2.0, we aim to
provide users with a better experience with the existing
configuration. This FLIP proposes several general improvements to the
current configuration.

Looking forward to everyone's feedback and suggestions. Thank you!

Best regards,
Xuannan

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-442%3A+General+Improvement+to+Configuration+for+Flink+2.0


Re: [VOTE] FLIP-425: Asynchronous Execution Model

2024-03-28 Thread Xuannan Su
+1 (non-binding)

Best regards,
Xuannan

On Wed, Mar 27, 2024 at 6:28 PM Yanfei Lei  wrote:
>
> Hi everyone,
>
> Thanks for all the feedback about the FLIP-425: Asynchronous Execution
> Model [1]. The discussion thread is here [2].
>
> The vote will be open for at least 72 hours unless there is an
> objection or insufficient votes.
>
> [1] https://cwiki.apache.org/confluence/x/S4p3EQ
> [2] https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h
>
> Best regards,
> Yanfei


Re: [VOTE] FLIP-424: Asynchronous State APIs

2024-03-28 Thread Xuannan Su
+1 (non-binding)

Best,
Xuannan


On Wed, Mar 27, 2024 at 6:23 PM Zakelly Lan  wrote:
>
> Hi devs,
>
> I'd like to start a vote on the FLIP-424: Asynchronous State APIs [1]. The
> discussion thread is here [2].
>
> The vote will be open for at least 72 hours unless there is an objection or
> insufficient votes.
>
> [1] https://cwiki.apache.org/confluence/x/SYp3EQ
> [2] https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864
>
>
> Best,
> Zakelly


Re: [ANNOUNCE] Donation Flink CDC into Apache Flink has Completed

2024-03-21 Thread Xuannan Su
Congratulations!

Best regards,
Xuannan

On Fri, Mar 22, 2024 at 9:17 AM Charles Zhang  wrote:
>
> Congratulations!
>
> Best wishes,
> Charles Zhang
> from Apache InLong
>
>
> Jeyhun Karimov  于2024年3月22日周五 04:16写道:
>
> > Great news! Congratulations!
> >
> > Regards,
> > Jeyhun
> >
> > On Thu, Mar 21, 2024 at 2:00 PM Yuxin Tan  wrote:
> >
> > > Congratulations! Thanks for the efforts.
> > >
> > >
> > > Best,
> > > Yuxin
> > >
> > >
> > > Samrat Deb  于2024年3月21日周四 20:28写道:
> > >
> > > > Congratulations !
> > > >
> > > > Bests
> > > > Samrat
> > > >
> > > > On Thu, 21 Mar 2024 at 5:52 PM, Ahmed Hamdy 
> > > wrote:
> > > >
> > > > > Congratulations, great work and great news.
> > > > > Best Regards
> > > > > Ahmed Hamdy
> > > > >
> > > > >
> > > > > On Thu, 21 Mar 2024 at 11:41, Benchao Li 
> > wrote:
> > > > >
> > > > > > Congratulations, and thanks for the great work!
> > > > > >
> > > > > > Yuan Mei  于2024年3月21日周四 18:31写道:
> > > > > > >
> > > > > > > Thanks for driving these efforts!
> > > > > > >
> > > > > > > Congratulations
> > > > > > >
> > > > > > > Best
> > > > > > > Yuan
> > > > > > >
> > > > > > > On Thu, Mar 21, 2024 at 4:35 PM Yu Li  wrote:
> > > > > > >
> > > > > > > > Congratulations and look forward to its further development!
> > > > > > > >
> > > > > > > > Best Regards,
> > > > > > > > Yu
> > > > > > > >
> > > > > > > > On Thu, 21 Mar 2024 at 15:54, ConradJam 
> > > > wrote:
> > > > > > > > >
> > > > > > > > > Congrattulations!
> > > > > > > > >
> > > > > > > > > Leonard Xu  于2024年3月20日周三 21:36写道:
> > > > > > > > >
> > > > > > > > > > Hi devs and users,
> > > > > > > > > >
> > > > > > > > > > We are thrilled to announce that the donation of Flink CDC
> > > as a
> > > > > > > > > > sub-project of Apache Flink has completed. We invite you to
> > > > > explore
> > > > > > > > the new
> > > > > > > > > > resources available:
> > > > > > > > > >
> > > > > > > > > > - GitHub Repository: https://github.com/apache/flink-cdc
> > > > > > > > > > - Flink CDC Documentation:
> > > > > > > > > > https://nightlies.apache.org/flink/flink-cdc-docs-stable
> > > > > > > > > >
> > > > > > > > > > After Flink community accepted this donation[1], we have
> > > > > completed
> > > > > > > > > > software copyright signing, code repo migration, code
> > > cleanup,
> > > > > > website
> > > > > > > > > > migration, CI migration and github issues migration etc.
> > > > > > > > > > Here I am particularly grateful to Hang Ruan, Zhongqaing
> > > Gong,
> > > > > > > > Qingsheng
> > > > > > > > > > Ren, Jiabao Sun, LvYanquan, loserwang1024 and other
> > > > contributors
> > > > > > for
> > > > > > > > their
> > > > > > > > > > contributions and help during this process!
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > For all previous contributors: The contribution process has
> > > > > > slightly
> > > > > > > > > > changed to align with the main Flink project. To report
> > bugs
> > > or
> > > > > > > > suggest new
> > > > > > > > > > features, please open tickets
> > > > > > > > > > Apache Jira (https://issues.apache.org/jira).  Note that
> > we
> > > > will
> > > > > > no
> > > > > > > > > > longer accept GitHub issues for these purposes.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Welcome to explore the new repository and documentation.
> > Your
> > > > > > feedback
> > > > > > > > and
> > > > > > > > > > contributions are invaluable as we continue to improve
> > Flink
> > > > CDC.
> > > > > > > > > >
> > > > > > > > > > Thanks everyone for your support and happy exploring Flink
> > > CDC!
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Leonard
> > > > > > > > > > [1]
> > > > > > https://lists.apache.org/thread/cw29fhsp99243yfo95xrkw82s5s418ob
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > Best
> > > > > > > > >
> > > > > > > > > ConradJam
> > > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > >
> > > > > > Best,
> > > > > > Benchao Li
> > > > > >
> > > > >
> > > >
> > >
> >


Re: [ANNOUNCE] Apache Flink 1.19.0 released

2024-03-19 Thread Xuannan Su
Congratulations! Thanks for all the great work!

Best regards,
Xuannan

On Tue, Mar 19, 2024 at 1:31 PM Yu Li  wrote:
>
> Congrats and thanks all for the efforts!
>
> Best Regards,
> Yu
>
> On Tue, 19 Mar 2024 at 11:51, gongzhongqiang  
> wrote:
> >
> > Congrats! Thanks to everyone involved!
> >
> > Best,
> > Zhongqiang Gong
> >
> > Lincoln Lee  于2024年3月18日周一 16:27写道:
> >>
> >> The Apache Flink community is very happy to announce the release of Apache
> >> Flink 1.19.0, which is the fisrt release for the Apache Flink 1.19 series.
> >>
> >> Apache Flink® is an open-source stream processing framework for
> >> distributed, high-performing, always-available, and accurate data streaming
> >> applications.
> >>
> >> The release is available for download at:
> >> https://flink.apache.org/downloads.html
> >>
> >> Please check out the release blog post for an overview of the improvements
> >> for this bugfix release:
> >> https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19/
> >>
> >> The full release notes are available in Jira:
> >> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353282
> >>
> >> We would like to thank all contributors of the Apache Flink community who
> >> made this release possible!
> >>
> >>
> >> Best,
> >> Yun, Jing, Martijn and Lincoln


Re: [VOTE] Release 1.19.0, release candidate #2

2024-03-12 Thread Xuannan Su
+1 (non-binding)

- Verified signature and checksum
- Verified that source distribution does not contain binaries
- Built from source code successfully
- Reviewed the release announcement PR

Best regards,
Xuannan

On Tue, Mar 12, 2024 at 2:18 PM Hang Ruan  wrote:
>
> +1 (non-binding)
>
> - Verified signatures and checksums
> - Verified that source does not contain binaries
> - Build source code successfully
> - Reviewed the release note and left a comment
>
> Best,
> Hang
>
> Feng Jin  于2024年3月12日周二 11:23写道:
>
> > +1 (non-binding)
> >
> > - Verified signatures and checksums
> > - Verified that source does not contain binaries
> > - Build source code successfully
> > - Run a simple sql query successfully
> >
> > Best,
> > Feng Jin
> >
> >
> > On Tue, Mar 12, 2024 at 11:09 AM Ron liu  wrote:
> >
> > > +1 (non binding)
> > >
> > > quickly verified:
> > > - verified that source distribution does not contain binaries
> > > - verified checksums
> > > - built source code successfully
> > >
> > >
> > > Best,
> > > Ron
> > >
> > > Jeyhun Karimov  于2024年3月12日周二 01:00写道:
> > >
> > > > +1 (non binding)
> > > >
> > > > - verified that source distribution does not contain binaries
> > > > - verified signatures and checksums
> > > > - built source code successfully
> > > >
> > > > Regards,
> > > > Jeyhun
> > > >
> > > >
> > > > On Mon, Mar 11, 2024 at 3:08 PM Samrat Deb 
> > > wrote:
> > > >
> > > > > +1 (non binding)
> > > > >
> > > > > - verified signatures and checksums
> > > > > - ASF headers are present in all expected file
> > > > > - No unexpected binaries files found in the source
> > > > > - Build successful locally
> > > > > - tested basic word count example
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > Bests,
> > > > > Samrat
> > > > >
> > > > > On Mon, 11 Mar 2024 at 7:33 PM, Ahmed Hamdy 
> > > > wrote:
> > > > >
> > > > > > Hi Lincoln
> > > > > > +1 (non-binding) from me
> > > > > >
> > > > > > - Verified Checksums & Signatures
> > > > > > - Verified Source dists don't contain binaries
> > > > > > - Built source successfully
> > > > > > - reviewed web PR
> > > > > >
> > > > > >
> > > > > > Best Regards
> > > > > > Ahmed Hamdy
> > > > > >
> > > > > >
> > > > > > On Mon, 11 Mar 2024 at 15:18, Lincoln Lee 
> > > > > wrote:
> > > > > >
> > > > > > > Hi Robin,
> > > > > > >
> > > > > > > Thanks for helping verifying the release note[1], FLINK-14879
> > > should
> > > > > not
> > > > > > > have been included, after confirming this
> > > > > > > I moved all unresolved non-blocker issues left over from 1.19.0
> > to
> > > > > 1.20.0
> > > > > > > and reconfigured the release note [1].
> > > > > > >
> > > > > > > Best,
> > > > > > > Lincoln Lee
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353282
> > > > > > >
> > > > > > >
> > > > > > > Robin Moffatt  于2024年3月11日周一
> > 19:36写道:
> > > > > > >
> > > > > > > > Looking at the release notes [1] it lists `DESCRIBE DATABASE`
> > > > > > > (FLINK-14879)
> > > > > > > > and `DESCRIBE CATALOG` (FLINK-14690).
> > > > > > > > When I try these in 1.19 RC2 the behaviour is as in 1.18.1,
> > i.e.
> > > it
> > > > > is
> > > > > > > not
> > > > > > > > supported:
> > > > > > > >
> > > > > > > > ```
> > > > > > > > [INFO] Execute statement succeed.
> > > > > > > >
> > > > > > > > Flink SQL> show catalogs;
> > > > > > > > +-+
> > > > > > > > |catalog name |
> > > > > > > > +-+
> > > > > > > > |   c_new |
> > > > > > > > | default_catalog |
> > > > > > > > +-+
> > > > > > > > 2 rows in set
> > > > > > > >
> > > > > > > > Flink SQL> DESCRIBE CATALOG c_new;
> > > > > > > > [ERROR] Could not execute SQL statement. Reason:
> > > > > > > > org.apache.calcite.sql.validate.SqlValidatorException: Column
> > > > 'c_new'
> > > > > > not
> > > > > > > > found in any table
> > > > > > > >
> > > > > > > > Flink SQL> show databases;
> > > > > > > > +--+
> > > > > > > > |database name |
> > > > > > > > +--+
> > > > > > > > | default_database |
> > > > > > > > +--+
> > > > > > > > 1 row in set
> > > > > > > >
> > > > > > > > Flink SQL> DESCRIBE DATABASE default_database;
> > > > > > > > [ERROR] Could not execute SQL statement. Reason:
> > > > > > > > org.apache.calcite.sql.validate.SqlValidatorException: Column
> > > > > > > > 'default_database' not found in
> > > > > > > > any table
> > > > > > > > ```
> > > > > > > >
> > > > > > > > Is this an error in the release notes, or my mistake in
> > > > interpreting
> > > > > > > them?
> > > > > > > >
> > > > > > > > thanks, Robin.
> > > > > > > >
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353282
> > > > > > > >
> > > > > > > > On Thu, 7 Mar 2024 at 10:01, 

Re: [DISCUSS] FLIP-424: Asynchronous State APIs

2024-03-11 Thread Xuannan Su
Hi Zakelly,

Thanks for the quick response.

> It will be used in callback chaining cases where some branch within one
> callback does nothing. I'm in favor of short phrases to express the
> functionalities. Thus I suggest `completedVoidFuture` or `voidFuture`, WDTY?

I'd prefer `completedVoidFuture` for consistency.

> Yes, this will be added in implementation, I just omitted them for easy
> reading.

I think the JavaDoc is as important as the method itself, so it's
better we also review the JavaDoc as part of the API.

Best regards,
Xuannan



On Mon, Mar 11, 2024 at 5:34 PM Zakelly Lan  wrote:
>
> Hi Xuannan,
>
> Thanks for your comments!
>
> 1. The name `emptyFuture` seems a little unintuitive, and it is hard
> > to understand in what use case the `emptyFuture` should be used. If I
> > understand correctly, it is similar to the
> > FutureUtils#completedVoidFuture. How about naming it
> > completedVoidStateFuture?
>
> It will be used in callback chaining cases where some branch within one
> callback does nothing. I'm in favor of short phrases to express the
> functionalities. Thus I suggest `completedVoidFuture` or `voidFuture`, WDTY?
>
> 2. IIUC, the `FutureUtils` is intended to be used by the user. If
> > that's the case, `FutureUtils` should be annotated as a public
> > interface, such as `PublicEvolving`.
> >
> Yes I missed that, thanks for the reminder.
>
> 3. The state classes, such as `ValueState`, `ListState`, etc., are
> > essential for users, and we should add JavaDocs to those classes and
> > their methods.
>
> Yes, this will be added in implementation, I just omitted them for easy
> reading.
>
>
> Thanks & Best,
> Zakelly
>
> On Mon, Mar 11, 2024 at 5:25 PM Zakelly Lan  wrote:
>
> > Hi Yunfeng,
> >
> > Thanks for your comments!
> >
> > +1 for JingGe's suggestion to introduce an AsyncState API, instead of
> >> having both get() and asyncGet() in the same State class. As a
> >> supplement to its benefits, this design could help avoid having users
> >> to use sync and async API in a mixed way (unless they create both a
> >> State and an AsyncState from the same state descriptor), which is
> >> supposed to bring suboptimal performance according to the FLIP's
> >> description.
> >
> >
> > Actually splitting APIs into two sets of classes also brings some
> > difficulties. In this case, users must explicitly define their usage before
> > actually doing state access. It is a little strange that the user can
> > define a sync and an async version of State with the same name, while they
> > cannot allocate two async States with the same name.
> > Another reason for distinguishing API by their method name instead of
> > class name is that users typically use the State instances to access state
> > but forget their type/class. For example:
> > ```
> > SyncState a = getState(xxx);
> > AsyncState b = getAsyncState(xxx);
> > //...
> > a.update(1);
> > b.update(1);
> > ```
> > Users are likely to think there is no difference between the `a.update(1)`
> > and `b.update(1)`, since they may forget the type for `a` and `b`. Thus I
> > proposed to distinguish the behavior in method names.
> > As for the suboptimal performance with mixed usage of sync and async, my
> > proposal is to warn them in runtime.
> >
> > I noticed that the FLIP proposes to place the newly introduced API in
> >> the package "org.apache.flink.api.common.state.v2", which seems a
> >> little strange to me as there has not been such a naming pattern
> >> ".v2." for packages in Flink.
> >
> >
> > In fact, there are some similar existing patterns, like
> > `org.apache.flink.streaming.api.functions.sink.v2` and
> > `org.apache.flink.streaming.api.connector.sink2`.
> >
> >  I would suggest discussing this topic
> >> with the main authors of Datastream V2, like Weijie Guo, so that the
> >> newly introduced APIs from both sides comply with a unified naming
> >> style.
> >
> > I'm afraid we are facing a different situation with the Datastream V2. For
> > total reconstruction of Datastream API, it is big enough to build a
> > seperate module and keep good package names. While for state APIs, we
> > should stay in the flink-core(-api) module alongside with other
> > apis, currently I tend to compromise at the expense of naming style.
> >
> >
> > Looking forward to hearing from you again!
> >
> > Thanks & Best,
> > Zakelly
> >
> > On Mon, Mar 11, 2024 at 4:20 PM Yunfeng Zhou 
> > wrote:
> >
> >> Hi Zakelly,
> >>
> >> Thanks for the proposal! The structure of the Async API generally
> >> looks good to me. Some comments on the details of the design are as
> >> follows.
> >>
> >> +1 for JingGe's suggestion to introduce an AsyncState API, instead of
> >> having both get() and asyncGet() in the same State class. As a
> >> supplement to its benefits, this design could help avoid having users
> >> to use sync and async API in a mixed way (unless they create both a
> >> State and an AsyncState from the same state descriptor), which is
> >> supposed to bring 

Re: [DISCUSS] FLIP-424: Asynchronous State APIs

2024-03-11 Thread Xuannan Su
Hi Zakelly,

Thank you for the proposal! Please find my comments below.

1. The name `emptyFuture` seems a little unintuitive, and it is hard
to understand in what use case the `emptyFuture` should be used. If I
understand correctly, it is similar to the
FutureUtils#completedVoidFuture. How about naming it
completedVoidStateFuture?

2. IIUC, the `FutureUtils` is intended to be used by the user. If
that's the case, `FutureUtils` should be annotated as a public
interface, such as `PublicEvolving`.

3. The state classes, such as `ValueState`, `ListState`, etc., are
essential for users, and we should add JavaDocs to those classes and
their methods.

Best regards,
Xuanna

On Mon, Mar 11, 2024 at 4:21 PM Yunfeng Zhou
 wrote:
>
> Hi Zakelly,
>
> Thanks for the proposal! The structure of the Async API generally
> looks good to me. Some comments on the details of the design are as
> follows.
>
> +1 for JingGe's suggestion to introduce an AsyncState API, instead of
> having both get() and asyncGet() in the same State class. As a
> supplement to its benefits, this design could help avoid having users
> to use sync and async API in a mixed way (unless they create both a
> State and an AsyncState from the same state descriptor), which is
> supposed to bring suboptimal performance according to the FLIP's
> description.
>
> I noticed that the FLIP proposes to place the newly introduced API in
> the package "org.apache.flink.api.common.state.v2", which seems a
> little strange to me as there has not been such a naming pattern
> ".v2." for packages in Flink. I would suggest discussing this topic
> with the main authors of Datastream V2, like Weijie Guo, so that the
> newly introduced APIs from both sides comply with a unified naming
> style. If we reach an agreement on the first comment, my personal idea
> is that we can place the AsyncState interfaces to
> "org.apache.flink.api.common.state.async", and the existing state APIs
> to "org.apache.flink.api.common.state" or
> "org.apache.flink.api.common.state.sync".
>
> Best regards,
> Yunfeng Zhou
>
> On Thu, Mar 7, 2024 at 4:48 PM Zakelly Lan  wrote:
> >
> > Hi devs,
> >
> > I'd like to start a discussion on a sub-FLIP of FLIP-423: Disaggregated
> > State Storage and Management[1], which is a joint work of Yuan Mei, Zakelly
> > Lan, Jinzhong Li, Hangxiang Yu, Yanfei Lei and Feng Wang:
> >
> >  - FLIP-424: Asynchronous State APIs [2]
> >
> > This FLIP introduces new APIs for asynchronous state access.
> >
> > Please make sure you have read the FLIP-423[1] to know the whole story, and
> > we'll discuss the details of FLIP-424[2] under this mail. For the
> > discussion of overall architecture or topics related with multiple
> > sub-FLIPs, please post in the previous mail[3].
> >
> > Looking forward to hearing from you!
> >
> > [1] https://cwiki.apache.org/confluence/x/R4p3EQ
> > [2] https://cwiki.apache.org/confluence/x/SYp3EQ
> > [3] https://lists.apache.org/thread/ct8smn6g9y0b8730z7rp9zfpnwmj8vf0
> >
> >
> > Best,
> > Zakelly


Re: [VOTE] FLIP-410: Config, Context and Processing Timer Service of DataStream API V2

2024-02-26 Thread Xuannan Su
+1 (non-binding)

Best,
Xuannan

On Tue, Feb 27, 2024 at 9:37 AM Xintong Song  wrote:
>
> +1 (binding)
>
> Best,
>
> Xintong
>
>
>
> On Mon, Feb 26, 2024 at 6:10 PM weijie guo 
> wrote:
>
> > Hi everyone,
> >
> >
> > Thanks for all the feedback about the FLIP-410: Config, Context and
> > Processing Timer Service of DataStream API V2 [1]. The discussion
> > thread is here [2].
> >
> >
> > The vote will be open for at least 72 hours unless there is an
> > objection or insufficient votes.
> >
> >
> > [1]
> >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-410%3A++Config%2C+Context+and+Processing+Timer+Service+of+DataStream+API+V2
> >
> > [2] https://lists.apache.org/thread/70gf028c5gsdb9qhsgpht0chzyp9nogc
> >
> >
> > Best regards,
> >
> > Weijie
> >


Re: [VOTE] FLIP-409: DataStream V2 Building Blocks: DataStream, Partitioning and ProcessFunction

2024-02-26 Thread Xuannan Su
+1 (non-binding)

Best,
Xuannan

On Tue, Feb 27, 2024 at 9:38 AM Xintong Song  wrote:
>
> +1 (binding)
>
> Best,
>
> Xintong
>
>
>
> On Mon, Feb 26, 2024 at 6:09 PM weijie guo 
> wrote:
>
> > Hi everyone,
> >
> >
> > Thanks for all the feedback about the FLIP-409: DataStream V2 Building
> > Blocks: DataStream, Partitioning and ProcessFunction [1]. The
> > discussion thread is here [2].
> >
> >
> > The vote will be open for at least 72 hours unless there is an
> > objection or insufficient votes.
> >
> >
> > [1]
> >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-409%3A+DataStream+V2+Building+Blocks%3A+DataStream%2C+Partitioning+and+ProcessFunction
> >
> > [2] https://lists.apache.org/thread/cwds0bwbgy3lfdgnlqbfhm6lfvx2qbrv
> >
> >
> > Best regards,
> >
> > Weijie
> >


Re: [VOTE] FLIP-408: [Umbrella] Introduce DataStream API V2

2024-02-26 Thread Xuannan Su
+1 (non-binding)

Best,
Xuannan


On Tue, Feb 27, 2024 at 9:36 AM Xintong Song  wrote:
>
> +1 (binding)
>
> Best,
>
> Xintong
>
>
>
> On Mon, Feb 26, 2024 at 6:08 PM weijie guo 
> wrote:
>
> > Hi everyone,
> >
> >
> > Thanks for all the feedback about the FLIP-408: [Umbrella] Introduce
> > DataStream API V2 [1]. The discussion thread is here [2].
> >
> >
> > The vote will be open for at least 72 hours unless there is an
> > objection or insufficient votes.
> >
> >
> > [1]
> >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2
> >
> > [2] https://lists.apache.org/thread/w8olky9s7fo5h8fl3nj3qbym307zk2l0
> >
> > Best regards,
> >
> > Weijie
> >


[RESULT][VOTE] FLIP-331: Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment

2024-02-05 Thread Xuannan Su
Hi devs,

I'm happy to announce that FLIP-331: Support EndOfStreamTrigger and
isOutputOnlyAfterEndOfStream operator attribute to optimize task
deployment [1] has been accepted with 6 approving votes (4 binding)
[2]:

- Xintong Song (binding)
- Rui Fan (binding)
- Weijie Guo (binding)
- Dong Lin (binding)
- Hang Ruan (non-binding)
- Yuxin Tan (non-binding)

There are no disapproving votes.

Thanks again to everyone who participated in the discussion and voting.

Best regards,
Xuannan

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-331%3A+Support+EndOfStreamTrigger+and+isOutputOnlyAfterEndOfStream+operator+attribute+to+optimize+task+deployment
[2] https://lists.apache.org/thread/oy9mdmh6gk8pc0wjdk5kg8dz3jllz9ow


Re: [VOTE] FLIP-331: Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment

2024-02-05 Thread Xuannan Su
Hi all,

Thank you all! Closing the vote. The result will be announced in a
separate email.

Best regards,
Xuannan

On Mon, Feb 5, 2024 at 12:11 PM Yuxin Tan  wrote:
>
> +1 (non-binding)
>
> Best,
> Yuxin
>
>
> Hang Ruan  于2024年2月5日周一 11:22写道:
>
> >  +1 (non-binding)
> >
> > Best,
> > Hang
> >
> > Dong Lin  于2024年2月5日周一 11:08写道:
> >
> > > Thanks for the FLIP.
> > >
> > > +1 (binding)
> > >
> > > Best,
> > > Dong
> > >
> > > On Wed, Jan 31, 2024 at 11:41 AM Xuannan Su 
> > wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > Thanks for all the feedback about the FLIP-331: Support
> > > > EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute
> > > > to optimize task deployment [1] [2].
> > > >
> > > > I'd like to start a vote for it. The vote will be open for at least 72
> > > > hours(excluding weekends,until Feb 5, 12:00AM GMT) unless there is an
> > > > objection or an insufficient number of votes.
> > > >
> > > > [1]
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-331%3A+Support+EndOfStreamTrigger+and+isOutputOnlyAfterEndOfStream+operator+attribute+to+optimize+task+deployment
> > > > [2] https://lists.apache.org/thread/qq39rmg3f23ysx5m094s4c4cq0m4tdj5
> > > >
> > > >
> > > > Best,
> > > > Xuannan
> > > >
> > >
> >


Re: [DISCUSS] FLIP-409: DataStream V2 Building Blocks: DataStream, Partitioning and ProcessFunction

2024-01-31 Thread Xuannan Su
treamProcessFunction? It would be
> > > > helpful in online machine learning cases, where a process function
> > > > needs to receive the first machine learning model before it can start
> > > > predictions on input data. Similar requirements might also exist in
> > > > Flink CEP, where a rule set needs to be consumed by the process
> > > > function first before it can start matching the event stream against
> > > > CEP patterns.
> > > >
> > > > Good point! I think we can provide a `nextInputSelection()` method for
> > > > `TwoInputStreamProcessFunction`.  It returns a ·First/Second· enum that
> > > > determines which Input the mailbox thread will read next. But I'm
> > > > considering putting it in the sub-FLIP related to Join, since features
> > > like
> > > > HashJoin have a more specific need for this.
> > > >
> > > > > A typo might exist in the current FLIP describing the API to
> > > > generate a global stream, as I can see either global() or coalesce()
> > > > in different places of the FLIP. These two methods might need to be
> > > > unified into one method.
> > > >
> > > > Good catch! I have updated this FLIP to fix this typo.
> > > >
> > > > > The order of parameters in the current ProcessFunction is (record,
> > > > context, output), while this FLIP proposes to change the order into
> > > > (record, output, context). Is there any reason to make this change?
> > > >
> > > > No, it's just the order we decide. But please note that there is no
> > > > relationship between the two ProcessFunction's anyway. I think it's
> > okay
> > > to
> > > > use our own order of parameters in new API.
> > > >
> > > > 4. Why does this FLIP propose to use connectAndProcess() instead of
> > > > connect() (+ keyBy()) + process()? The latter looks simpler to me.
> > > >
> > > > > I actually also considered this way at first, but it would have to
> > > > introduce some concepts like ConnectedStreams. But we hope that streams
> > > > will be more clearly defined in the DataStream API, otherwise we will
> > end
> > > > up going the same way as the original API, which you have to understand
> > > > `JoinedStreams/ConnectedStreams` and so on.
> > > >
> > > >
> > > >
> > > > Best regards,
> > > >
> > > > Weijie
> > > >
> > > >
> > > > weijie guo  于2024年1月30日周二 11:20写道:
> > > >
> > > >> Hi Wencong:
> > > >>
> > > >> Thank you for your attention
> > > >>
> > > >> > Q1. Other DataStream types are converted into
> > > >> Non-Keyed DataStreams by using a "shuffle" operation
> > > >> to convert Input into output. Does this "shuffle" include the
> > > >> various repartition operations (rebalance/rescale/shuffle)
> > > >> from DataStream V1?
> > > >>
> > > >> Yes, The name `shuffle` is used only to represent the transformation
> > of
> > > >> an arbitrary stream into a non-keyed partitioned stream and does not
> > > >> restrict how the data is partitioned.
> > > >>
> > > >>
> > > >> > Q2. Why is the design for TwoOutputStreamProcessFunction,
> > > >> when dealing with a KeyedStream, only outputting combinations
> > > >> of (Keyed + Keyed) and (Non-Keyed + Non-Keyed)?
> > > >>
> > > >> In theory, we could only provide functions that return Non-Keyed
> > > streams.
> > > >> If you do want a KeyedStream, you explicitly convert it to a
> > KeyedStream
> > > >> via keyBy. However, because sometimes data is processed without
> > changing
> > > >> the partition, we choose to provide an additional KeyedStream
> > > counterpart
> > > >> to reduce the shuffle overhead. We didn't introduce the non-keyed +
> > > >> keyed combo here simply because it's not very common, and if we really
> > > see
> > > >> a lot of users asking for it later on, it's easy to support it then.
> > > >>
> > > >>
> > > >> Best regards,
> > > >>
> > > >> Weijie
> > > >>
> > > >>
> > > >> Xuannan Su  于2024年1月29日周一 18:28写道:
>

[VOTE] FLIP-331: Support EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute to optimize task deployment

2024-01-30 Thread Xuannan Su
Hi everyone,

Thanks for all the feedback about the FLIP-331: Support
EndOfStreamTrigger and isOutputOnlyAfterEndOfStream operator attribute
to optimize task deployment [1] [2].

I'd like to start a vote for it. The vote will be open for at least 72
hours(excluding weekends,until Feb 5, 12:00AM GMT) unless there is an
objection or an insufficient number of votes.

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-331%3A+Support+EndOfStreamTrigger+and+isOutputOnlyAfterEndOfStream+operator+attribute+to+optimize+task+deployment
[2] https://lists.apache.org/thread/qq39rmg3f23ysx5m094s4c4cq0m4tdj5


Best,
Xuannan


Re: Re: [DISCUSS] FLIP-331: Support EndOfStreamWindows and isOutputOnEOF operator attribute to optimize task deployment

2024-01-29 Thread Xuannan Su
Hi all,

Thanks for the comments and suggestions. If there are no further
comments, I will open the voting thread tomorrow.

Best regards,
Xuannan
On Fri, Jan 26, 2024 at 3:16 PM Dong Lin  wrote:
>
> Thanks Xuannan for the update!
>
> +1 (binding)
>
> On Wed, Jan 10, 2024 at 5:54 PM Xuannan Su  wrote:
>
> > Hi all,
> >
> > After several rounds of offline discussions with Xingtong and Jinhao,
> > we have decided to narrow the scope of the FLIP. It will now focus on
> > introducing OperatorAttributes that indicate whether an operator emits
> > records only after inputs have ended. We will also use the attribute
> > to optimize task scheduling for better resource utilization. Setting
> > the backlog status and optimizing the operator implementation during
> > the backlog will be deferred to future work.
> >
> > In addition to the change above, we also make the following changes to
> > the FLIP to address the problems mentioned by Dong:
> > - Public interfaces are updated to reuse the GlobalWindows.
> > - Instead of making all outputs of the upstream operators of the
> > "isOutputOnlyAfterEndOfStream=true" operator blocking, we only make
> > the output of the operator with "isOutputOnlyAfterEndOfStream=true"
> > blocking. This can prevent the second problem Dong mentioned. In the
> > future, we may introduce an extra OperatorAttributes to indicate if an
> > operator has any side output.
> >
> > I would greatly appreciate any comment or feedback you may have on the
> > updated FLIP.
> >
> > Best regards,
> > Xuannan
> >
> > On Tue, Sep 26, 2023 at 11:24 AM Dong Lin  wrote:
> > >
> > > Hi all,
> > >
> > > Thanks for the review!
> > >
> > > Becket and I discussed this FLIP offline and we agreed on several things
> > > that need to be improved with this FLIP. I will summarize our discussion
> > > with the problems and TODOs. We will update the FLIP and let you know
> > once
> > > the FLIP is ready for review again.
> > >
> > > 1) Investigate whether it is possible to update the existing
> > GlobalWindows
> > > in a backward-compatible way and re-use it for the same purpose
> > > as EndOfStreamWindows, without introducing EndOfStreamWindows as a new
> > > class.
> > >
> > > Note that GlobalWindows#getDefaultTrigger returns a NeverTrigger instance
> > > which will not trigger window's computation even on end-of-inputs. We
> > will
> > > need to investigate its existing usage and see if we can re-use it in a
> > > backward-compatible way.
> > >
> > > 2) Let JM know whether any operator in the upstream of the operator with
> > > "isOutputOnEOF=true" will emit output via any side channel. The FLIP
> > should
> > > update the execution mode of those operators *only if* all outputs from
> > > those operators are emitted only at the end of input.
> > >
> > > More specifically, the upstream operator might involve a user-defined
> > > operator that might emit output directly to an external service, where
> > the
> > > emission operation is not explicitly expressed as an operator's output
> > edge
> > > and thus not visible to JM. Similarly, it is also possible for the
> > > user-defined operator to register a timer
> > > via InternalTimerService#registerEventTimeTimer and emit output to an
> > > external service inside Triggerable#onEventTime. There is a chance that
> > > users still need related logic to output data in real-time, even if the
> > > downstream operators have isOutputOnEOF=true.
> > >
> > > One possible solution to address this problem is to add an extra
> > > OperatorAttribute to specify whether this operator might output records
> > in
> > > such a way that does not go through operator's output (e.g. side output).
> > > Then the JM can safely enable the runtime optimization currently
> > described
> > > in the FLIP when there is no such operator.
> > >
> > > 3) Create a follow-up FLIP that allows users to specify whether a source
> > > with Boundedness=bounded should have isProcessingBacklog=true.
> > >
> > > This capability would effectively introduce a 3rd strategy to set backlog
> > > status (in addition to FLIP-309 and FLIP-328). It might be useful to note
> > > that, even though the data in bounded sources are backlog data in most
> > > practical use-cases, it is not necessarily true. For example, users might
> > &

Re: [DISCUSS] FLIP-410: Config, Context and Processing Timer Service of DataStream API V2

2024-01-29 Thread Xuannan Su
Hi Weijie,

Thanks for the FLIP! I have a few questions regarding the FLIP.

1. +1 to only use XXXParititionStream if users only need to use the
configurable PartitionStream.  If there are use cases for both,
perhaps we could use `ProcessConfigurableNonKeyedPartitionStream` or
`ConfigurableNonKeyedPartitionStream` for simplicity.

2. Should we allow users to set custom configurations through the
`ProcessConfigurable` interface and access these configurations in the
`ProcessFunction` via `RuntimeContext`? I believe it would be useful
for process function developers to be able to define custom
configurations.

3. How can users define custom metrics within the `ProcessFunction`?
Will there be a method like `getMetricGroup` available in the
`RuntimeContext`?

Best,
Xuannan


On Fri, Jan 26, 2024 at 2:38 PM Yunfeng Zhou
 wrote:
>
> Hi Weijie,
>
> Thanks for introducing this FLIP! I have a few questions about the
> designs proposed.
>
> 1. Would it be better to have all XXXPartitionStream classes implement
> ProcessConfigurable, instead of defining both XXXPartitionStream and
> ProcessConfigurableAndXXXPartitionStream? I wonder whether users would
> need to operate on a non-configurable PartitionStream.
>
> 2. The name "ProcessConfigurable" seems a little ambiguous to me. Will
> there be classes other than XXXPartitionStream that implement this
> interface? Will "Process" be accurate enough to describe
> PartitionStream and those classes?
>
> 3. Apart from the detailed withConfigFoo(foo)/withConfigBar(bar)
> methods, would it be better to also add a general
> withConfig(configKey, configValue) method to the ProcessConfigurable
> interface? Adding a method for each configuration might harm the
> readability and compatibility of configurations.
>
> Looking forward to your response.
>
> Best regards,
> Yunfeng Zhou
>
> On Tue, Dec 26, 2023 at 2:47 PM weijie guo  wrote:
> >
> > Hi devs,
> >
> >
> > I'd like to start a discussion about FLIP-410: Config, Context and
> > Processing Timer Service of DataStream API V2 [1]. This is the second
> > sub-FLIP of DataStream API V2.
> >
> >
> > In FLIP-409 [2], we have defined the most basic primitive of
> > DataStream V2. On this basis, this FLIP will further answer several
> > important questions closely related to it:
> >
> >1.
> >How to configure the processing over the datastreams, such as
> > setting the parallelism.
> >2.
> >How to get access to the runtime contextual information and
> > services from inside the process functions.
> >3. How to work with processing-time timers.
> >
> > You can find more details in this FLIP. Its relationship with other
> > sub-FLIPs can be found in the umbrella FLIP
> > [3].
> >
> >
> > Looking forward to hearing from you, thanks!
> >
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-410%3A++Config%2C+Context+and+Processing+Timer+Service+of+DataStream+API+V2
> >
> > [2]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-409%3A+DataStream+V2+Building+Blocks%3A+DataStream%2C+Partitioning+and+ProcessFunction
> >
> > [3]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2


Re: [DISCUSS] FLIP-409: DataStream V2 Building Blocks: DataStream, Partitioning and ProcessFunction

2024-01-29 Thread Xuannan Su
Hi Weijie,

Thank you for driving the design of the new DataStream API. I have a
few questions regarding the FLIP:

1. In the partitioning section, it says that "broadcast can only be
used as a side-input of other Inputs." Could you clarify what is meant
by "side-input"? If I understand correctly, it refer to one of the
inputs of the `TwoInputStreamProcessFunction`. If that is the case,
the term "side-input" may not be accurate.

2. Is there a particular reason we do not support a
`TwoInputProcessFunction` to combine a KeyedStream with a
BroadcastStream to result in a KeyedStream? There seems to be a valid
use case where a KeyedStream is enriched with a BroadcastStream and
returns a Stream that is partitioned in the same way.

3. There appears to be a typo in the example code. The
`SingleStreamProcessFunction` should probably be
`OneInputStreamProcessFunction`.

4. How do we set the global configuration for the
ExecutionEnvironment? Currently, we have the
StreamExecutionEnvironment.getExecutionEnvironment(Configuration)
method to provide the global configuration in the API.

5. I noticed that there are two `collect` methods in the Collector,
one with a timestamp and one without. Could you elaborate on the
differences between them? Additionally, in what use case would one use
the method that includes the timestamp?

Best regards,
Xuannan



On Fri, Jan 26, 2024 at 2:21 PM Yunfeng Zhou
 wrote:
>
> Hi Weijie,
>
> Thanks for raising discussions about the new DataStream API. I have a
> few questions about the content of the FLIP.
>
> 1. Will we provide any API to support choosing which input to consume
> between the two inputs of TwoInputStreamProcessFunction? It would be
> helpful in online machine learning cases, where a process function
> needs to receive the first machine learning model before it can start
> predictions on input data. Similar requirements might also exist in
> Flink CEP, where a rule set needs to be consumed by the process
> function first before it can start matching the event stream against
> CEP patterns.
>
> 2. A typo might exist in the current FLIP describing the API to
> generate a global stream, as I can see either global() or coalesce()
> in different places of the FLIP. These two methods might need to be
> unified into one method.
>
> 3. The order of parameters in the current ProcessFunction is (record,
> context, output), while this FLIP proposes to change the order into
> (record, output, context). Is there any reason to make this change?
>
> 4. Why does this FLIP propose to use connectAndProcess() instead of
> connect() (+ keyBy()) + process()? The latter looks simpler to me.
>
> Looking forward to discussing these questions with you.
>
> Best regards,
> Yunfeng Zhou
>
> On Tue, Dec 26, 2023 at 2:44 PM weijie guo  wrote:
> >
> > Hi devs,
> >
> >
> > I'd like to start a discussion about FLIP-409: DataStream V2 Building
> > Blocks: DataStream, Partitioning and ProcessFunction [1].
> >
> >
> > As the first sub-FLIP for DataStream API V2, we'd like to discuss and
> > try to answer some of the most fundamental questions in stream
> > processing:
> >
> >1. What kinds of data streams do we have?
> >2. How to partition data over the streams?
> >3. How to define a processing on the data stream?
> >
> > The answer to these questions involve three core concepts: DataStream,
> > Partitioning and ProcessFunction. In this FLIP, we will discuss the
> > definitions and related API primitives of these concepts in detail.
> >
> >
> > You can find more details in FLIP-409 [1]. This sub-FLIP is at the
> > heart of the entire DataStream API V2, and its relationship with other
> > sub-FLIPs can be found in the umbrella FLIP [2].
> >
> >
> > Looking forward to hearing from you, thanks!
> >
> >
> > Best regards,
> >
> > Weijie
> >
> >
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-409%3A+DataStream+V2+Building+Blocks%3A+DataStream%2C+Partitioning+and+ProcessFunction
> >
> > [2]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2


Re: [DISCUSS] FLIP-408: [Umbrella] Introduce DataStream API V2

2024-01-29 Thread Xuannan Su
Hi Weijie,

Thanks for driving the work! There are indeed many pain points in the
current DataStream API, which are challenging to resolve with its
existing design. It is a great opportunity to propose a new DataStream
API that tackles these issues. I like the way we've divided the FLIP
into multiple sub-FLIPs; the roadmap is clear and comprehensible. +1
for the umbrella FLIP. I am eager to see the sub-FLIPs!

Best regards,
Xuannan




On Wed, Jan 24, 2024 at 8:55 PM Wencong Liu  wrote:
>
> Hi Weijie,
>
>
> Thank you for the effort you've put into the DataStream API ! By reorganizing 
> and
> redesigning the DataStream API, as well as addressing some of the unreasonable
> designs within it, we can enhance the efficiency of job development for 
> developers.
> It also allows developers to design more flexible Flink jobs to meet business 
> requirements.
>
>
> I have conducted a comprehensive review of the DataStream API design in 
> versions
> 1.18 and 1.19. I found quite a few functional defects in the DataStream API, 
> such as the
> lack of corresponding APIs in batch processing scenarios. In the upcoming 
> 1.20 version,
> I will further improve the DataStream API in batch computing scenarios.
>
>
> The issues existing in the old DataStream API (which can be referred to as 
> V1) can be
> addressed from a design perspective in the initial version of V2. I hope to 
> also have the
>  opportunity to participate in the development of DataStream V2 and make my 
> contribution.
>
>
> Regarding FLIP-408, I have a question: The Processing TimerService is 
> currently
> defined as one of the basic primitives, partly because it's understood that
> you have to choose between processing time and event time.
> The other part of the reason is that it needs to work based on the task's
> mailbox thread model to avoid concurrency issues. Could you clarify the second
> part of the reason?
>
> Best,
> Wencong Liu
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> At 2023-12-26 14:42:20, "weijie guo"  wrote:
> >Hi devs,
> >
> >
> >I'd like to start a discussion about FLIP-408: [Umbrella] Introduce
> >DataStream API V2 [1].
> >
> >
> >The DataStream API is one of the two main APIs that Flink provides for
> >writing data processing programs. As an API that was introduced
> >practically since day-1 of the project and has been evolved for nearly
> >a decade, we are observing more and more problems of it. Improvements
> >on these problems require significant breaking changes, which makes
> >in-place refactor impractical. Therefore, we propose to introduce a
> >new set of APIs, the DataStream API V2, to gradually replace the
> >original DataStream API.
> >
> >
> >The proposal to introduce a whole set new API is complex and includes
> >massive changes. We are planning  to break it down into multiple
> >sub-FLIPs for incremental discussion. This FLIP is only used as an
> >umbrella, mainly focusing on motivation, goals, and overall planning.
> >That is to say, more design and implementation details  will be
> >discussed in other FLIPs.
> >
> >
> >Given that it's hard to imagine the detailed design of the new API if
> >we're just talking about this umbrella FLIP, and we probably won't be
> >able to give an opinion on it. Therefore, I have prepared two
> >sub-FLIPs [2][3] at the same time, and the discussion of them will be
> >posted later in separate threads.
> >
> >
> >Looking forward to hearing from you, thanks!
> >
> >
> >Best regards,
> >
> >Weijie
> >
> >
> >
> >[1]
> >https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2
> >
> >[2]
> >https://cwiki.apache.org/confluence/display/FLINK/FLIP-409%3A+DataStream+V2+Building+Blocks%3A+DataStream%2C+Partitioning+and+ProcessFunction
> >
> >
> >[3]
> >https://cwiki.apache.org/confluence/display/FLINK/FLIP-410%3A++Config%2C+Context+and+Processing+Timer+Service+of+DataStream+API+V2


Re: [VOTE] FLIP-406: Reorganize State & Checkpointing & Recovery Configuration

2024-01-24 Thread Xuannan Su
+1 (non-binding)

Best,
Xuannan

On Thu, Jan 25, 2024 at 10:15 AM Lijie Wang  wrote:
>
> +1 (binding)
>
> Best,
> Lijie
>
> Yanfei Lei  于2024年1月25日周四 10:06写道:
>
> > +1 (binding)
> >
> > Hangxiang Yu  于2024年1月25日周四 10:00写道:
> > >
> > > +1 (binding)
> > >
> > > On Thu, Jan 25, 2024 at 8:49 AM Rui Fan <1996fan...@gmail.com> wrote:
> > >
> > > > +1(binding)
> > > >
> > > > Best,
> > > > Rui
> > > >
> > > > On Wed, 24 Jan 2024 at 21:50, Zakelly Lan 
> > wrote:
> > > >
> > > > > Hi everyone,
> > > > >
> > > > > I'd like to start a vote on the FLIP-406: Reorganize State &
> > > > Checkpointing
> > > > > & Recovery Configuration [1]. The discussion thread is here [2].
> > > > >
> > > > > The vote will be open for at least 72 hours unless there is an
> > objection
> > > > or
> > > > > insufficient votes.
> > > > >
> > > > > [1]
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=284789560
> > > > > [2] https://lists.apache.org/thread/0oc10cr2q2ms855dbo29s7v08xs3bvqg
> > > > >
> > > > >
> > > > > Best,
> > > > > Zakelly
> > > > >
> > > >
> > >
> > >
> > > --
> > > Best,
> > > Hangxiang.
> >
> >
> >
> > --
> > Best,
> > Yanfei
> >


[jira] [Created] (FLINK-34085) Remove deprecated string configuration keys in Flink 2.0

2024-01-14 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-34085:
--

 Summary: Remove deprecated string configuration keys in Flink 2.0
 Key: FLINK-34085
 URL: https://issues.apache.org/jira/browse/FLINK-34085
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Configuration
Reporter: Xuannan Su
 Fix For: 2.0.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34084) Deprecate unused configuration in BinaryInput/OutputFormat and FileInput/OutputFormat

2024-01-14 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-34084:
--

 Summary: Deprecate unused configuration in 
BinaryInput/OutputFormat and FileInput/OutputFormat
 Key: FLINK-34084
 URL: https://issues.apache.org/jira/browse/FLINK-34084
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Configuration
Reporter: Xuannan Su
 Fix For: 1.19.0


Update FileInputFormat.java, FileOutputFormat.java, BinaryInputFormat.java, and 
BinaryOutputFormat.java to deprecate unused string configuration keys.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34083) Deprecate string configuration keys and unused constants in ConfigConstants

2024-01-14 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-34083:
--

 Summary: Deprecate string configuration keys and unused constants 
in ConfigConstants
 Key: FLINK-34083
 URL: https://issues.apache.org/jira/browse/FLINK-34083
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Configuration
Reporter: Xuannan Su
 Fix For: 1.19.0


* Update ConfigConstants.java to deprecate and replace string configuration keys
 * Mark unused constants in ConfigConstants.java as deprecated



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[RESULT][VOTE] FLIP-405: Migrate string configuration key to ConfigOption

2024-01-14 Thread Xuannan Su
Hi devs,

I'm happy to announce that FLIP-405: Migrate string configuration key
to ConfigOption [1] has been accepted with 4 approving votes (3
binding) [2]:

- Rui Fan (binding)
- Hang Ruan (non-binding)
- Xintong Song (binding)
- Zhu Zhu (binding)

There are no disapproving votes.

Thanks again to everyone who participated in the discussion and voting.

Best regards,
Xuannan

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-405%3A+Migrate+string+configuration+key+to+ConfigOption
[2] https://lists.apache.org/thread/joyr7bxpo0lcj1zfzdj5nv0lrhb303rx


Re: [VOTE] FLIP-405: Migrate string configuration key to ConfigOption

2024-01-14 Thread Xuannan Su
Hi all,

Thank you all! Closing the vote. The result will be announced in a
separate email.

Best regards,
Xuannan

On Fri, Jan 12, 2024 at 2:44 PM Zhu Zhu  wrote:
>
> +1 (binding)
>
> Thanks,
> Zhu
>
> Xuannan Su  于2024年1月12日周五 14:24写道:
>
> > Hi all,
> >
> > I would like to clarify the statement regarding the first improvement
> > from the previous email, as it was incomplete. To be more specific, we
> > will also deprecate the getClass(String key, Class
> > defaultValue, ClassLoader classLoader) and setClass(String key,
> > Class klazz), as they are intended for internal use only and are no
> > longer in use.
> >
> > To summarize, the first improvement should be as follows:
> >
> > We will mark the getBytes(String key, byte[] defaultValue) and
> > setBytes(String key, byte[] bytes) methods as @Internal, as they are
> > intended for internal use only. Additionally, we will
> > deprecatesgetClass(String key, Class defaultValue,
> > ClassLoader classLoader) and setClass(String key, Class klazz)
> > methods, as they are intended for internal use only and are no longer
> > in use.
> >
> > I apologize for any confusion caused by the incomplete information.
> >
> > Best regards,
> > Xuannan
> >
> > On Fri, Jan 12, 2024 at 1:36 PM Xuannan Su  wrote:
> > >
> > > Hi all,
> > >
> > > During voting, we identified two improvements we'd like to make to the
> > FLIP:
> > >
> > > - We will mark the getBytes(String key, byte[] defaultValue) and
> > > setBytes(String key, byte[] bytes) methods as @Internal, as they are
> > > intended for internal use only.
> > > - In addition to marking all getXxx(ConfigOption configOption) methods
> > > as @Deprecated, we will also mark the getXxx(ConfigOption
> > > configOption, Xxx overrideDefault) methods as @Deprecated. These will
> > > be replaced with T get(ConfigOption configOption, T overrideDefault).
> > >
> > > We had an offline discussion with all the voters and reached a
> > > consensus on the above changes. Therefore, we will not initiate
> > > another voting thread unless there are objections.
> > >
> > > Best regards,
> > > Xuannan
> > >
> > >
> > > On Tue, Jan 9, 2024 at 11:43 AM Xintong Song 
> > wrote:
> > > >
> > > > +1 (binding)
> > > >
> > > > Best,
> > > >
> > > > Xintong
> > > >
> > > >
> > > >
> > > > On Mon, Jan 8, 2024 at 1:48 PM Hang Ruan 
> > wrote:
> > > >
> > > > > +1(non-binding)
> > > > >
> > > > > Best,
> > > > > Hang
> > > > >
> > > > > Rui Fan <1996fan...@gmail.com> 于2024年1月8日周一 13:04写道:
> > > > >
> > > > > > +1(binding)
> > > > > >
> > > > > > Best,
> > > > > > Rui
> > > > > >
> > > > > > On Mon, Jan 8, 2024 at 1:00 PM Xuannan Su 
> > wrote:
> > > > > >
> > > > > > > Hi everyone,
> > > > > > >
> > > > > > > Thanks for all the feedback about the FLIP-405: Migrate string
> > > > > > > configuration key to ConfigOption [1] [2].
> > > > > > >
> > > > > > > I'd like to start a vote for it. The vote will be open for at
> > least 72
> > > > > > > hours(excluding weekends,until Jan 11, 12:00AM GMT) unless there
> > is an
> > > > > > > objection or an insufficient number of votes.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > >
> > > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-405%3A+Migrate+string+configuration+key+to+ConfigOption
> > > > > > > [2]
> > https://lists.apache.org/thread/zfw1b1g3679yn0ppjbsokfrsx9k7ybg0
> > > > > > >
> > > > > > >
> > > > > > > Best,
> > > > > > > Xuannan
> > > > > > >
> > > > > >
> > > > >
> >


Re: [DISCUSS] FLIP-416: Deprecate and remove the RestoreMode#LEGACY

2024-01-14 Thread Xuannan Su
Hi Zakelly,

Thanks for driving this. +1 to removing the LEGACY mode.

Best regards,
Xuannan

On Mon, Jan 15, 2024 at 3:22 AM Danny Cranmer  wrote:
>
> +1 to removing LEGACY mode in Flink 2.0. Thanks for driving.
>
> Danny,
>
> On Sat, 13 Jan 2024, 08:20 Yanfei Lei,  wrote:
>
> > Thanks Zakelly for starting this discussion.
> >
> > Regardless of whether it is for users or developers, deprecating
> > RestoreMode#LEGACY makes the semantics clearer and lower maintenance
> > costs, and Flink 2.0 is a good time point to do this.
> > So +1 for the overall idea.
> >
> > Best,
> > Yanfei
> >
> > Zakelly Lan  于2024年1月11日周四 14:57写道:
> >
> > >
> > > Hi devs,
> > >
> > > I'd like to start a discussion on FLIP-416: Deprecate and remove the
> > > RestoreMode#LEGACY[1].
> > >
> > > The FLIP-193[2] introduced two modes of state file ownership during
> > > checkpoint restoration: RestoreMode#CLAIM and RestoreMode#NO_CLAIM. The
> > > LEGACY mode, which was how Flink worked until 1.15, has been superseded
> > by
> > > NO_CLAIM as the default mode. The main drawback of LEGACY mode is that
> > the
> > > new job relies on artifacts from the old job without cleaning them up,
> > > leaving users uncertain about when it is safe to delete the old
> > checkpoint
> > > directories. This leads to the accumulation of unnecessary checkpoint
> > files
> > > that are never cleaned up. Considering cluster availability and job
> > > maintenance, it is not recommended to use LEGACY mode. Users could choose
> > > the other two modes to get a clear semantic for the state file ownership.
> > >
> > > This FLIP proposes to deprecate the LEGACY mode and remove it completely
> > in
> > > the upcoming Flink 2.0. This will make the semantic clear as well as
> > > eliminate many bugs caused by mode transitions involving LEGACY mode
> > (e.g.
> > > FLINK-27114 [3]) and enhance code maintainability.
> > >
> > > Looking forward to hearing from you!
> > >
> > > [1] https://cwiki.apache.org/confluence/x/ookkEQ
> > > [2] https://cwiki.apache.org/confluence/x/bIyqCw
> > > [3] https://issues.apache.org/jira/browse/FLINK-27114
> > >
> > > Best,
> > > Zakelly
> >


Re: [VOTE] FLIP-405: Migrate string configuration key to ConfigOption

2024-01-11 Thread Xuannan Su
Hi all,

I would like to clarify the statement regarding the first improvement
from the previous email, as it was incomplete. To be more specific, we
will also deprecate the getClass(String key, Class
defaultValue, ClassLoader classLoader) and setClass(String key,
Class klazz), as they are intended for internal use only and are no
longer in use.

To summarize, the first improvement should be as follows:

We will mark the getBytes(String key, byte[] defaultValue) and
setBytes(String key, byte[] bytes) methods as @Internal, as they are
intended for internal use only. Additionally, we will
deprecatesgetClass(String key, Class defaultValue,
ClassLoader classLoader) and setClass(String key, Class klazz)
methods, as they are intended for internal use only and are no longer
in use.

I apologize for any confusion caused by the incomplete information.

Best regards,
Xuannan

On Fri, Jan 12, 2024 at 1:36 PM Xuannan Su  wrote:
>
> Hi all,
>
> During voting, we identified two improvements we'd like to make to the FLIP:
>
> - We will mark the getBytes(String key, byte[] defaultValue) and
> setBytes(String key, byte[] bytes) methods as @Internal, as they are
> intended for internal use only.
> - In addition to marking all getXxx(ConfigOption configOption) methods
> as @Deprecated, we will also mark the getXxx(ConfigOption
> configOption, Xxx overrideDefault) methods as @Deprecated. These will
> be replaced with T get(ConfigOption configOption, T overrideDefault).
>
> We had an offline discussion with all the voters and reached a
> consensus on the above changes. Therefore, we will not initiate
> another voting thread unless there are objections.
>
> Best regards,
> Xuannan
>
>
> On Tue, Jan 9, 2024 at 11:43 AM Xintong Song  wrote:
> >
> > +1 (binding)
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Mon, Jan 8, 2024 at 1:48 PM Hang Ruan  wrote:
> >
> > > +1(non-binding)
> > >
> > > Best,
> > > Hang
> > >
> > > Rui Fan <1996fan...@gmail.com> 于2024年1月8日周一 13:04写道:
> > >
> > > > +1(binding)
> > > >
> > > > Best,
> > > > Rui
> > > >
> > > > On Mon, Jan 8, 2024 at 1:00 PM Xuannan Su  wrote:
> > > >
> > > > > Hi everyone,
> > > > >
> > > > > Thanks for all the feedback about the FLIP-405: Migrate string
> > > > > configuration key to ConfigOption [1] [2].
> > > > >
> > > > > I'd like to start a vote for it. The vote will be open for at least 72
> > > > > hours(excluding weekends,until Jan 11, 12:00AM GMT) unless there is an
> > > > > objection or an insufficient number of votes.
> > > > >
> > > > >
> > > > >
> > > > > [1]
> > > > >
> > > >
> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-405%3A+Migrate+string+configuration+key+to+ConfigOption
> > > > > [2] https://lists.apache.org/thread/zfw1b1g3679yn0ppjbsokfrsx9k7ybg0
> > > > >
> > > > >
> > > > > Best,
> > > > > Xuannan
> > > > >
> > > >
> > >


Re: [VOTE] FLIP-405: Migrate string configuration key to ConfigOption

2024-01-11 Thread Xuannan Su
Hi all,

During voting, we identified two improvements we'd like to make to the FLIP:

- We will mark the getBytes(String key, byte[] defaultValue) and
setBytes(String key, byte[] bytes) methods as @Internal, as they are
intended for internal use only.
- In addition to marking all getXxx(ConfigOption configOption) methods
as @Deprecated, we will also mark the getXxx(ConfigOption
configOption, Xxx overrideDefault) methods as @Deprecated. These will
be replaced with T get(ConfigOption configOption, T overrideDefault).

We had an offline discussion with all the voters and reached a
consensus on the above changes. Therefore, we will not initiate
another voting thread unless there are objections.

Best regards,
Xuannan


On Tue, Jan 9, 2024 at 11:43 AM Xintong Song  wrote:
>
> +1 (binding)
>
> Best,
>
> Xintong
>
>
>
> On Mon, Jan 8, 2024 at 1:48 PM Hang Ruan  wrote:
>
> > +1(non-binding)
> >
> > Best,
> > Hang
> >
> > Rui Fan <1996fan...@gmail.com> 于2024年1月8日周一 13:04写道:
> >
> > > +1(binding)
> > >
> > > Best,
> > > Rui
> > >
> > > On Mon, Jan 8, 2024 at 1:00 PM Xuannan Su  wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > Thanks for all the feedback about the FLIP-405: Migrate string
> > > > configuration key to ConfigOption [1] [2].
> > > >
> > > > I'd like to start a vote for it. The vote will be open for at least 72
> > > > hours(excluding weekends,until Jan 11, 12:00AM GMT) unless there is an
> > > > objection or an insufficient number of votes.
> > > >
> > > >
> > > >
> > > > [1]
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-405%3A+Migrate+string+configuration+key+to+ConfigOption
> > > > [2] https://lists.apache.org/thread/zfw1b1g3679yn0ppjbsokfrsx9k7ybg0
> > > >
> > > >
> > > > Best,
> > > > Xuannan
> > > >
> > >
> >


Re: Re: Re: [VOTE] Accept Flink CDC into Apache Flink

2024-01-10 Thread Xuannan Su
+1 (non-binding)

Best,
Xuannan

On Thu, Jan 11, 2024 at 10:28 AM Xuyang  wrote:
>
> +1 (non-binding)--
>
> Best!
> Xuyang
>
>
>
>
>
> 在 2024-01-11 10:00:11,"Yang Wang"  写道:
> >+1 (binding)
> >
> >
> >Best,
> >Yang
> >
> >On Thu, Jan 11, 2024 at 9:53 AM liu ron  wrote:
> >
> >> +1 non-binding
> >>
> >> Best
> >> Ron
> >>
> >> Matthias Pohl  于2024年1月10日周三 23:05写道:
> >>
> >> > +1 (binding)
> >> >
> >> > On Wed, Jan 10, 2024 at 3:35 PM ConradJam  wrote:
> >> >
> >> > > +1 non-binding
> >> > >
> >> > > Dawid Wysakowicz  于2024年1月10日周三 21:06写道:
> >> > >
> >> > > > +1 (binding)
> >> > > > Best,
> >> > > > Dawid
> >> > > >
> >> > > > On Wed, 10 Jan 2024 at 11:54, Piotr Nowojski 
> >> > > wrote:
> >> > > >
> >> > > > > +1 (binding)
> >> > > > >
> >> > > > > śr., 10 sty 2024 o 11:25 Martijn Visser 
> >> > > > > napisał(a):
> >> > > > >
> >> > > > > > +1 (binding)
> >> > > > > >
> >> > > > > > On Wed, Jan 10, 2024 at 4:43 AM Xingbo Huang  >> >
> >> > > > wrote:
> >> > > > > > >
> >> > > > > > > +1 (binding)
> >> > > > > > >
> >> > > > > > > Best,
> >> > > > > > > Xingbo
> >> > > > > > >
> >> > > > > > > Dian Fu  于2024年1月10日周三 11:35写道:
> >> > > > > > >
> >> > > > > > > > +1 (binding)
> >> > > > > > > >
> >> > > > > > > > Regards,
> >> > > > > > > > Dian
> >> > > > > > > >
> >> > > > > > > > On Wed, Jan 10, 2024 at 5:09 AM Sharath <
> >> dsaishar...@gmail.com
> >> > >
> >> > > > > wrote:
> >> > > > > > > > >
> >> > > > > > > > > +1 (non-binding)
> >> > > > > > > > >
> >> > > > > > > > > Best,
> >> > > > > > > > > Sharath
> >> > > > > > > > >
> >> > > > > > > > > On Tue, Jan 9, 2024 at 1:02 PM Venkata Sanath Muppalla <
> >> > > > > > > > sanath...@gmail.com>
> >> > > > > > > > > wrote:
> >> > > > > > > > >
> >> > > > > > > > > > +1 (non-binding)
> >> > > > > > > > > >
> >> > > > > > > > > > Thanks,
> >> > > > > > > > > > Sanath
> >> > > > > > > > > >
> >> > > > > > > > > > On Tue, Jan 9, 2024 at 11:16 AM Peter Huang <
> >> > > > > > > > huangzhenqiu0...@gmail.com>
> >> > > > > > > > > > wrote:
> >> > > > > > > > > >
> >> > > > > > > > > > > +1 (non-binding)
> >> > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > > > Best Regards
> >> > > > > > > > > > > Peter Huang
> >> > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > > > On Tue, Jan 9, 2024 at 5:26 AM Jane Chan <
> >> > > > > qingyue@gmail.com>
> >> > > > > > > > wrote:
> >> > > > > > > > > > >
> >> > > > > > > > > > > > +1 (non-binding)
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > Best,
> >> > > > > > > > > > > > Jane
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > On Tue, Jan 9, 2024 at 8:41 PM Lijie Wang <
> >> > > > > > > > wangdachui9...@gmail.com>
> >> > > > > > > > > > > > wrote:
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > > +1 (non-binding)
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > Best,
> >> > > > > > > > > > > > > Lijie
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > Jiabao Sun 
> >> > > > 于2024年1月9日周二
> >> > > > > > > > 19:28写道:
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > > +1 (non-binding)
> >> > > > > > > > > > > > > >
> >> > > > > > > > > > > > > > Best,
> >> > > > > > > > > > > > > > Jiabao
> >> > > > > > > > > > > > > >
> >> > > > > > > > > > > > > >
> >> > > > > > > > > > > > > > On 2024/01/09 09:58:04 xiangyu feng wrote:
> >> > > > > > > > > > > > > > > +1 (non-binding)
> >> > > > > > > > > > > > > > >
> >> > > > > > > > > > > > > > > Regards,
> >> > > > > > > > > > > > > > > Xiangyu Feng
> >> > > > > > > > > > > > > > >
> >> > > > > > > > > > > > > > > Danny Cranmer  于2024年1月9日周二
> >> > > > 17:50写道:
> >> > > > > > > > > > > > > > >
> >> > > > > > > > > > > > > > > > +1 (binding)
> >> > > > > > > > > > > > > > > >
> >> > > > > > > > > > > > > > > > Thanks,
> >> > > > > > > > > > > > > > > > Danny
> >> > > > > > > > > > > > > > > >
> >> > > > > > > > > > > > > > > > On Tue, Jan 9, 2024 at 9:31 AM Feng Jin <
> >> > > > > > ji...@gmail.com>
> >> > > > > > > > > > wrote:
> >> > > > > > > > > > > > > > > >
> >> > > > > > > > > > > > > > > > > +1 (non-binding)
> >> > > > > > > > > > > > > > > > >
> >> > > > > > > > > > > > > > > > > Best,
> >> > > > > > > > > > > > > > > > > Feng Jin
> >> > > > > > > > > > > > > > > > >
> >> > > > > > > > > > > > > > > > > On Tue, Jan 9, 2024 at 5:29 PM Yuxin Tan <
> >> > > > > > > > ta...@gmail.com>
> >> > > > > > > > > > > > wrote:
> >> > > > > > > > > > > > > > > > >
> >> > > > > > > > > > > > > > > > > > +1 (non-binding)
> >> > > > > > > > > > > > > > > > > >
> >> > > > > > > > > > > > > > > > > > Best,
> >> > > > > > > > > > > > > > > > > > Yuxin
> >> > > > > > > > > > > > > > > > > >
> >> > > > > > > > > > > > > > > > > >
> >> > > > > > > > > > > > > > > > > > Márton Balassi 
> >> > > 于2024年1月9日周二
> >> > > > > > 17:25写道:
> >> > > > > > > > > > > > > > > > > >
> >> > > > > > > > > > > > > > > > > > > +1 (binding)
> >> > > > > > > > > > > > > > > > > > >
> >> > > > > > > > > > > > > > > > > > > On Tue, Jan 9, 

Re: [DISCUSS] FLIP-406: Reorganize State & Checkpointing & Recovery Configuration

2024-01-10 Thread Xuannan Su
our comments!
> > >> > > > >
> > >> > > > > IMO, given that the state backend can be plugably loaded (as you
> > >> can
> > >> > > > > specify a state backend factory), I prefer not providing state
> > >> > backend
> > >> > > > > specified options in the framework.
> > >> > > > >
> > >> > > > > Secondly, the incremental checkpoint is actually a sharing file
> > >> > > strategy
> > >> > > > > across checkpoints, which means the state backend *could* reuse
> > >> files
> > >> > > > from
> > >> > > > > previous cp but not *must* do so. When the state backend could
> > not
> > >> > > reuse
> > >> > > > > the files, it is reasonable to fallback to a full checkpoint.
> > >> > > > >
> > >> > > > > Thus, I suggest we make it `execution.checkpointing.incremental`
> > >> and
> > >> > > > enable
> > >> > > > > it by default. For those state backends not supporting this,
> > they
> > >> > > perform
> > >> > > > > full checkpoints and print a warning to inform users. Users do
> > not
> > >> > need
> > >> > > > to
> > >> > > > > pay special attention to different options to control this
> > across
> > >> > > > different
> > >> > > > > state backends. This is more user-friendly in my opinion. WDYT?
> > >> > > > >
> > >> > > > > On Tue, Jan 2, 2024 at 10:49 AM Rui Fan <1996fan...@gmail.com>
> > >> > wrote:
> > >> > > > >
> > >> > > > > > Hi Zakelly,
> > >> > > > > >
> > >> > > > > > I'm not sure whether we could add the state backend type in
> > the
> > >> > > > > > new key name of state.backend.incremental. It means we use
> > >> > > > > > `execution.checkpointing.rocksdb-incremental` or
> > >> > > > > > `execution.checkpointing.rocksdb-incremental.enabled`.
> > >> > > > > >
> > >> > > > > > So far, state.backend.incremental only works for rocksdb state
> > >> > > backend.
> > >> > > > > > And this feature or optimization is very valuable and huge for
> > >> > large
> > >> > > > > > state flink jobs. I believe it's enabled for most production
> > >> flink
> > >> > > jobs
> > >> > > > > > with large rocksdb state.
> > >> > > > > >
> > >> > > > > > If this option isn't generic for all state backend types, I
> > >> guess
> > >> > we
> > >> > > > > > can enable
> > `execution.checkpointing.rocksdb-incremental.enabled`
> > >> > > > > > by default in Flink 2.0.
> > >> > > > > >
> > >> > > > > > But if it works for all state backends, it's hard to enable it
> > >> by
> > >> > > > > default.
> > >> > > > > > Enabling great and valuable features or improvements are
> > useful
> > >> > > > > > for users, especially a lot of new flink users. Out-of-the-box
> > >> > > options
> > >> > > > > > are good for users.
> > >> > > > > >
> > >> > > > > > WDYT?
> > >> > > > > >
> > >> > > > > > Best,
> > >> > > > > > Rui
> > >> > > > > >
> > >> > > > > > On Fri, Dec 29, 2023 at 1:45 PM Zakelly Lan <
> > >> zakelly@gmail.com
> > >> > >
> > >> > > > > wrote:
> > >> > > > > >
> > >> > > > > > > Hi everyone,
> > >> > > > > > >
> > >> > > > > > > Thanks all for your comments!
> > >> > > > > > >
> > >> > > > > > > As many of you have questions about the names for boolean
> 

Re: Re: [DISCUSS] FLIP-331: Support EndOfStreamWindows and isOutputOnEOF operator attribute to optimize task deployment

2024-01-10 Thread Xuannan Su
Hi all,

After several rounds of offline discussions with Xingtong and Jinhao,
we have decided to narrow the scope of the FLIP. It will now focus on
introducing OperatorAttributes that indicate whether an operator emits
records only after inputs have ended. We will also use the attribute
to optimize task scheduling for better resource utilization. Setting
the backlog status and optimizing the operator implementation during
the backlog will be deferred to future work.

In addition to the change above, we also make the following changes to
the FLIP to address the problems mentioned by Dong:
- Public interfaces are updated to reuse the GlobalWindows.
- Instead of making all outputs of the upstream operators of the
"isOutputOnlyAfterEndOfStream=true" operator blocking, we only make
the output of the operator with "isOutputOnlyAfterEndOfStream=true"
blocking. This can prevent the second problem Dong mentioned. In the
future, we may introduce an extra OperatorAttributes to indicate if an
operator has any side output.

I would greatly appreciate any comment or feedback you may have on the
updated FLIP.

Best regards,
Xuannan

On Tue, Sep 26, 2023 at 11:24 AM Dong Lin  wrote:
>
> Hi all,
>
> Thanks for the review!
>
> Becket and I discussed this FLIP offline and we agreed on several things
> that need to be improved with this FLIP. I will summarize our discussion
> with the problems and TODOs. We will update the FLIP and let you know once
> the FLIP is ready for review again.
>
> 1) Investigate whether it is possible to update the existing GlobalWindows
> in a backward-compatible way and re-use it for the same purpose
> as EndOfStreamWindows, without introducing EndOfStreamWindows as a new
> class.
>
> Note that GlobalWindows#getDefaultTrigger returns a NeverTrigger instance
> which will not trigger window's computation even on end-of-inputs. We will
> need to investigate its existing usage and see if we can re-use it in a
> backward-compatible way.
>
> 2) Let JM know whether any operator in the upstream of the operator with
> "isOutputOnEOF=true" will emit output via any side channel. The FLIP should
> update the execution mode of those operators *only if* all outputs from
> those operators are emitted only at the end of input.
>
> More specifically, the upstream operator might involve a user-defined
> operator that might emit output directly to an external service, where the
> emission operation is not explicitly expressed as an operator's output edge
> and thus not visible to JM. Similarly, it is also possible for the
> user-defined operator to register a timer
> via InternalTimerService#registerEventTimeTimer and emit output to an
> external service inside Triggerable#onEventTime. There is a chance that
> users still need related logic to output data in real-time, even if the
> downstream operators have isOutputOnEOF=true.
>
> One possible solution to address this problem is to add an extra
> OperatorAttribute to specify whether this operator might output records in
> such a way that does not go through operator's output (e.g. side output).
> Then the JM can safely enable the runtime optimization currently described
> in the FLIP when there is no such operator.
>
> 3) Create a follow-up FLIP that allows users to specify whether a source
> with Boundedness=bounded should have isProcessingBacklog=true.
>
> This capability would effectively introduce a 3rd strategy to set backlog
> status (in addition to FLIP-309 and FLIP-328). It might be useful to note
> that, even though the data in bounded sources are backlog data in most
> practical use-cases, it is not necessarily true. For example, users might
> want to start a Flink job to consume real-time data from a Kafka topic and
> specify that the job stops after 24 hours, which means the source is
> technically bounded while the data is fresh/real-time.
>
> This capability is more generic and can cover more use-case than
> EndOfStreamWindows. On the other hand, EndOfStreamWindows will still be
> useful in cases where users already need to specify this window assigner in
> a DataStream program, without bothering users to decide whether it is safe
> to treat data in a bounded source as backlog data.
>
>
> Regards,
> Dong
>
>
>
>
>
>
> On Mon, Sep 18, 2023 at 2:56 PM Yuxin Tan  wrote:
>
> > Hi, Dong,
> > Thanks for your efforts.
> >
> > +1 to this proposal,
> > I believe this will improve the performance in some mixture circumstances
> > of bounded and unbounded workloads.
> >
> > Best,
> > Yuxin
> >
> >
> > Xintong Song  于2023年9月18日周一 10:56写道:
> >
> > > Thanks for addressing my comments, Dong.
> > >
> > > LGTM.
> > >
> > > Best,
> > >
> > > Xintong
> > >
> > >
> > >
> > > On Sat, Sep 16, 2023 at 3:34 PM Wencong Liu 
> > wrote:
> > >
> > > > Hi Dong & Jinhao,
> > > >
> > > > Thanks for your clarification! +1
> > > >
> > > > Best regards,
> > > > Wencong
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > 

Re: [DISCUSS] FLIP-329: Add operator attribute to specify support for object-reuse

2024-01-09 Thread Xuannan Su
Hi Lu,

You can find information about the FLIP process on our wiki[1]. Please
let me know if you have any questions.

Best regards,
Xuannan

[1] 
https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals

On Tue, Jan 9, 2024 at 5:36 AM Lu Niu  wrote:
>
> sounds good. Is the requirement to send an email thread about the voting? 
> What else is needed? What's the passing criteria?
>
> Best
> Lu
>
> On Sun, Jan 7, 2024 at 5:41 PM Xuannan Su  wrote:
>>
>> Hi Liu,
>>
>> The voting thread has been open for a long time. We may want to start
>> a new voting thread. WDYT?
>>
>> Best,
>> Xuannan
>>
>> On Sat, Jan 6, 2024 at 1:51 AM Lu Niu  wrote:
>> >
>> > Thank you Dong and Xuannan!
>> >
>> > Yes. We can take on this task. Any help during bootstrapping would be 
>> > greatly appreciated! I realize there is already a voting thread "[VOTE] 
>> > FLIP-329: Add operator attribute to specify support for object-reuse". 
>> > What else do we need?
>> >
>> > Best
>> > Lu
>> >
>> > On Fri, Jan 5, 2024 at 12:46 AM Xuannan Su  wrote:
>> >>
>> >> Hi Lu,
>> >>
>> >> I believe this feature is very useful. However, I currently lack the
>> >> capacity to work on it in the near future. I think it would be great
>> >> if you could take on the task. I am willing to offer assistance if
>> >> there are any questions about the FLIP, or to review the PR if needed.
>> >>
>> >> Please let me know if you are interested in taking over this task. And
>> >> also think that we should start the voting thread if no future
>> >> comments on this FLIP.
>> >>
>> >> Best,
>> >> Xuannan
>> >>
>> >>
>> >>
>> >> On Fri, Jan 5, 2024 at 2:23 PM Dong Lin  wrote:
>> >> >
>> >> > Hi Lu,
>> >> >
>> >> > I am not actively working on Flink and this JIRA recently. If Xuannan 
>> >> > does not plan to work on this anytime soon, I personally think it will 
>> >> > be great if you can help work on this FLIP. Maybe we can start the 
>> >> > voting thread if there is no further comment on this FLIP.
>> >> >
>> >> > Xuannan, what do you think?
>> >> >
>> >> > Thanks,
>> >> > Dong
>> >> >
>> >> >
>> >> > On Fri, Jan 5, 2024 at 2:03 AM Lu Niu  wrote:
>> >> >>
>> >> >> Hi,
>> >> >>
>> >> >> Is this still under active development? I notice 
>> >> >> https://issues.apache.org/jira/browse/FLINK-32476 is labeled as 
>> >> >> deprioritized. If this is the case, would it be acceptable for us to 
>> >> >> take on the task?
>> >> >>
>> >> >> Best
>> >> >> Lu
>> >> >>
>> >> >>
>> >> >>
>> >> >> On Thu, Oct 19, 2023 at 4:26 PM Ken Krugler 
>> >> >>  wrote:
>> >> >>>
>> >> >>> Hi Dong,
>> >> >>>
>> >> >>> Sorry for not seeing this initially. I did have one question about 
>> >> >>> the description of the issue in the FLIP:
>> >> >>>
>> >> >>> > However, in cases where the upstream and downstream operators do 
>> >> >>> > not store or access references to the input or output records, this 
>> >> >>> > deep-copy overhead becomes unnecessary
>> >> >>>
>> >> >>> I was interested in getting clarification as to what you meant by “or 
>> >> >>> access references…”, to see if it covered this situation:
>> >> >>>
>> >> >>> StreamX —forward--> operator1
>> >> >>> StreamX —forward--> operator2
>> >> >>>
>> >> >>> If operator1 modifies the record, and object re-use is enabled, then 
>> >> >>> operator2 will see the modified version, right?
>> >> >>>
>> >> >>> Thanks,
>> >> >>>
>> >> >>> — Ken
>> >> >>>
>> >> >>> > On Jul 2, 2023, at 7:24 PM, Xuannan Su  
>> >> >>> > wrote:
>> >> >>> >
>> >> >>> > Hi all,
>> >> >>> >
>> >> >>> > Dong(cc'ed) and I are opening this thread to discuss our proposal to
>> >> >>> > add operator attribute to allow operator to specify support for
>> >> >>> > object-reuse [1].
>> >> >>> >
>> >> >>> > Currently, the default configuration for pipeline.object-reuse is 
>> >> >>> > set
>> >> >>> > to false to avoid data corruption, which can result in suboptimal
>> >> >>> > performance. We propose adding APIs that operators can utilize to
>> >> >>> > inform the Flink runtime whether it is safe to reuse the emitted
>> >> >>> > records. This enhancement would enable Flink to maximize its
>> >> >>> > performance using the default configuration.
>> >> >>> >
>> >> >>> > Please refer to the FLIP document for more details about the 
>> >> >>> > proposed
>> >> >>> > design and implementation. We welcome any feedback and opinions on
>> >> >>> > this proposal.
>> >> >>> >
>> >> >>> > Best regards,
>> >> >>> >
>> >> >>> > Dong and Xuannan
>> >> >>> >
>> >> >>> > [1] 
>> >> >>> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255073749
>> >> >>>
>> >> >>> --
>> >> >>> Ken Krugler
>> >> >>> http://www.scaleunlimited.com
>> >> >>> Custom big data solutions
>> >> >>> Flink & Pinot
>> >> >>>
>> >> >>>
>> >> >>>


[VOTE] FLIP-405: Migrate string configuration key to ConfigOption

2024-01-07 Thread Xuannan Su
Hi everyone,

Thanks for all the feedback about the FLIP-405: Migrate string
configuration key to ConfigOption [1] [2].

I'd like to start a vote for it. The vote will be open for at least 72
hours(excluding weekends,until Jan 11, 12:00AM GMT) unless there is an
objection or an insufficient number of votes.



[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-405%3A+Migrate+string+configuration+key+to+ConfigOption
[2] https://lists.apache.org/thread/zfw1b1g3679yn0ppjbsokfrsx9k7ybg0


Best,
Xuannan


Re: [DISCUSS] FLIP-405: Migrate string configuration key to ConfigOption

2024-01-07 Thread Xuannan Su
Hi all,

Thanks for the discussion. I think all the comments and questions have
been addressed. I will open the voting thread today.

Best,
Xuannan


On Tue, Jan 2, 2024 at 11:59 AM Xuannan Su  wrote:
>
> Hi all,
>
> Thank you for all your comments! The FLIP has been updated
> accordingly. Please let me know if you have any further questions or
> comments.
>
> Also, note that many people are on Christmas break, so we will keep
> the discussion open for another week.
>
> Best,
> Xuannan
>
> On Wed, Dec 27, 2023 at 5:20 PM Rui Fan <1996fan...@gmail.com> wrote:
> >
> > > After some investigation, it turns out those options of input/output
> > > format are only publicly exposed in the DataSet docs[2], which is
> > > deprecated. Thus, marking them as deprecated and removed in Flink 2.0
> > > looks fine to me.
> >
> > Thanks Xuannan for the detailed investigation, if so, deprecate them
> > and removing them in Flink 2.0 looks good to me.
> >
> > > I think the key of LOCAL_NUMBER_TASK_MANAGER is better as
> > > 'minicluster.number-of-taskmanagers' or 'minicluster.taskmanager-number'
> > > instead of 'minicluster.number-taskmanager'.
> >
> > Thanks Hang for the good suggestion! 'minicluster.number-of-taskmanagers'
> > sounds good to me, it's similar to taskmanager.numberOfTaskSlots.
> >
> > Best,
> > Rui
> >
> > On Wed, Dec 27, 2023 at 1:56 PM Hang Ruan  wrote:
> >>
> >> Hi, Rui Fan.
> >>
> >> Thanks for this FLIP.
> >>
> >> I think the key of LOCAL_NUMBER_TASK_MANAGER is better as
> >> 'minicluster.number-of-taskmanagers' or 'minicluster.taskmanager-number'
> >> instead of 'minicluster.number-taskmanager'.
> >>
> >> Best,
> >> Hang
> >>
> >> Xuannan Su  于2023年12月27日周三 12:40写道:
> >>
> >> > Hi Xintong and Rui,
> >> >
> >> > Thanks for the quick feedback and the suggestions.
> >> >
> >> > > 1. I think the default value for `TASK_MANAGER_LOG_PATH_KEY` should be
> >> > "no
> >> > > default".
> >> >
> >> > I have considered both ways of describing the default value. However,
> >> > I found out that some of the configurations, such as `web.tmpdir`, put
> >> > `System.getProperty()` in the default value [1]. Some are putting the
> >> > description in the default value column[2]. So I just picked the first
> >> > one. I am fine with either way, so long as they are consistent. WDYT?
> >> >
> >> > > 3. Simply saying "getting / setting value with string key is 
> >> > > discouraged"
> >> > > in JavaDoc of get/setString is IMHO a bit confusing. People may have 
> >> > > the
> >> > > question why would we keep the discouraged interfaces at all. I would
> >> > > suggest the following:
> >> > > ```
> >> > > We encourage users and developers to always use ConfigOption for 
> >> > > getting
> >> > /
> >> > > setting the configurations if possible, for its rich description, type,
> >> > > default-value and other supports. The string-key-based getter / setter
> >> > > should only be used when ConfigOption is not applicable, e.g., the key 
> >> > > is
> >> > > programmatically generated in runtime.
> >> > > ```
> >> >
> >> > The suggested comment looks good to me. Thanks for the suggestion. I
> >> > will update the comment in the FLIP.
> >> >
> >> > > 2. So I wonder if we can simply mark them as deprecated and remove in
> >> > 2.0.
> >> >
> >> > After some investigation, it turns out those options of input/output
> >> > format are only publicly exposed in the DataSet docs[2], which is
> >> > deprecated. Thus, marking them as deprecated and removed in Flink 2.0
> >> > looks fine to me.
> >> >
> >> >
> >> > @Rui
> >> >
> >> > > Configuration has a `public  T get(ConfigOption option)` method.
> >> > > Could we remove all `Xxx getXxx(ConfigOption configOption)` 
> >> > > methods?
> >> >
> >> > +1 Only keep the get(ConfigOption option),
> >> > getOptional(ConfigOption option), and set(ConfigOption option, T
> >> > value).
> >> >
> >> > Best,
> >> > Xuannan
> >> >
> >> > [1]
&

Re: [DISCUSS] FLIP-329: Add operator attribute to specify support for object-reuse

2024-01-07 Thread Xuannan Su
Hi Liu,

The voting thread has been open for a long time. We may want to start
a new voting thread. WDYT?

Best,
Xuannan

On Sat, Jan 6, 2024 at 1:51 AM Lu Niu  wrote:
>
> Thank you Dong and Xuannan!
>
> Yes. We can take on this task. Any help during bootstrapping would be greatly 
> appreciated! I realize there is already a voting thread "[VOTE] FLIP-329: Add 
> operator attribute to specify support for object-reuse". What else do we need?
>
> Best
> Lu
>
> On Fri, Jan 5, 2024 at 12:46 AM Xuannan Su  wrote:
>>
>> Hi Lu,
>>
>> I believe this feature is very useful. However, I currently lack the
>> capacity to work on it in the near future. I think it would be great
>> if you could take on the task. I am willing to offer assistance if
>> there are any questions about the FLIP, or to review the PR if needed.
>>
>> Please let me know if you are interested in taking over this task. And
>> also think that we should start the voting thread if no future
>> comments on this FLIP.
>>
>> Best,
>> Xuannan
>>
>>
>>
>> On Fri, Jan 5, 2024 at 2:23 PM Dong Lin  wrote:
>> >
>> > Hi Lu,
>> >
>> > I am not actively working on Flink and this JIRA recently. If Xuannan does 
>> > not plan to work on this anytime soon, I personally think it will be great 
>> > if you can help work on this FLIP. Maybe we can start the voting thread if 
>> > there is no further comment on this FLIP.
>> >
>> > Xuannan, what do you think?
>> >
>> > Thanks,
>> > Dong
>> >
>> >
>> > On Fri, Jan 5, 2024 at 2:03 AM Lu Niu  wrote:
>> >>
>> >> Hi,
>> >>
>> >> Is this still under active development? I notice 
>> >> https://issues.apache.org/jira/browse/FLINK-32476 is labeled as 
>> >> deprioritized. If this is the case, would it be acceptable for us to take 
>> >> on the task?
>> >>
>> >> Best
>> >> Lu
>> >>
>> >>
>> >>
>> >> On Thu, Oct 19, 2023 at 4:26 PM Ken Krugler  
>> >> wrote:
>> >>>
>> >>> Hi Dong,
>> >>>
>> >>> Sorry for not seeing this initially. I did have one question about the 
>> >>> description of the issue in the FLIP:
>> >>>
>> >>> > However, in cases where the upstream and downstream operators do not 
>> >>> > store or access references to the input or output records, this 
>> >>> > deep-copy overhead becomes unnecessary
>> >>>
>> >>> I was interested in getting clarification as to what you meant by “or 
>> >>> access references…”, to see if it covered this situation:
>> >>>
>> >>> StreamX —forward--> operator1
>> >>> StreamX —forward--> operator2
>> >>>
>> >>> If operator1 modifies the record, and object re-use is enabled, then 
>> >>> operator2 will see the modified version, right?
>> >>>
>> >>> Thanks,
>> >>>
>> >>> — Ken
>> >>>
>> >>> > On Jul 2, 2023, at 7:24 PM, Xuannan Su  wrote:
>> >>> >
>> >>> > Hi all,
>> >>> >
>> >>> > Dong(cc'ed) and I are opening this thread to discuss our proposal to
>> >>> > add operator attribute to allow operator to specify support for
>> >>> > object-reuse [1].
>> >>> >
>> >>> > Currently, the default configuration for pipeline.object-reuse is set
>> >>> > to false to avoid data corruption, which can result in suboptimal
>> >>> > performance. We propose adding APIs that operators can utilize to
>> >>> > inform the Flink runtime whether it is safe to reuse the emitted
>> >>> > records. This enhancement would enable Flink to maximize its
>> >>> > performance using the default configuration.
>> >>> >
>> >>> > Please refer to the FLIP document for more details about the proposed
>> >>> > design and implementation. We welcome any feedback and opinions on
>> >>> > this proposal.
>> >>> >
>> >>> > Best regards,
>> >>> >
>> >>> > Dong and Xuannan
>> >>> >
>> >>> > [1] 
>> >>> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255073749
>> >>>
>> >>> --
>> >>> Ken Krugler
>> >>> http://www.scaleunlimited.com
>> >>> Custom big data solutions
>> >>> Flink & Pinot
>> >>>
>> >>>
>> >>>


Re: [DISCUSS] FLIP-329: Add operator attribute to specify support for object-reuse

2024-01-05 Thread Xuannan Su
Hi Lu,

I believe this feature is very useful. However, I currently lack the
capacity to work on it in the near future. I think it would be great
if you could take on the task. I am willing to offer assistance if
there are any questions about the FLIP, or to review the PR if needed.

Please let me know if you are interested in taking over this task. And
also think that we should start the voting thread if no future
comments on this FLIP.

Best,
Xuannan



On Fri, Jan 5, 2024 at 2:23 PM Dong Lin  wrote:
>
> Hi Lu,
>
> I am not actively working on Flink and this JIRA recently. If Xuannan does 
> not plan to work on this anytime soon, I personally think it will be great if 
> you can help work on this FLIP. Maybe we can start the voting thread if there 
> is no further comment on this FLIP.
>
> Xuannan, what do you think?
>
> Thanks,
> Dong
>
>
> On Fri, Jan 5, 2024 at 2:03 AM Lu Niu  wrote:
>>
>> Hi,
>>
>> Is this still under active development? I notice 
>> https://issues.apache.org/jira/browse/FLINK-32476 is labeled as 
>> deprioritized. If this is the case, would it be acceptable for us to take on 
>> the task?
>>
>> Best
>> Lu
>>
>>
>>
>> On Thu, Oct 19, 2023 at 4:26 PM Ken Krugler  
>> wrote:
>>>
>>> Hi Dong,
>>>
>>> Sorry for not seeing this initially. I did have one question about the 
>>> description of the issue in the FLIP:
>>>
>>> > However, in cases where the upstream and downstream operators do not 
>>> > store or access references to the input or output records, this deep-copy 
>>> > overhead becomes unnecessary
>>>
>>> I was interested in getting clarification as to what you meant by “or 
>>> access references…”, to see if it covered this situation:
>>>
>>> StreamX —forward--> operator1
>>> StreamX —forward--> operator2
>>>
>>> If operator1 modifies the record, and object re-use is enabled, then 
>>> operator2 will see the modified version, right?
>>>
>>> Thanks,
>>>
>>> — Ken
>>>
>>> > On Jul 2, 2023, at 7:24 PM, Xuannan Su  wrote:
>>> >
>>> > Hi all,
>>> >
>>> > Dong(cc'ed) and I are opening this thread to discuss our proposal to
>>> > add operator attribute to allow operator to specify support for
>>> > object-reuse [1].
>>> >
>>> > Currently, the default configuration for pipeline.object-reuse is set
>>> > to false to avoid data corruption, which can result in suboptimal
>>> > performance. We propose adding APIs that operators can utilize to
>>> > inform the Flink runtime whether it is safe to reuse the emitted
>>> > records. This enhancement would enable Flink to maximize its
>>> > performance using the default configuration.
>>> >
>>> > Please refer to the FLIP document for more details about the proposed
>>> > design and implementation. We welcome any feedback and opinions on
>>> > this proposal.
>>> >
>>> > Best regards,
>>> >
>>> > Dong and Xuannan
>>> >
>>> > [1] 
>>> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255073749
>>>
>>> --
>>> Ken Krugler
>>> http://www.scaleunlimited.com
>>> Custom big data solutions
>>> Flink & Pinot
>>>
>>>
>>>


Re: [DISCUSS] FLIP-405: Migrate string configuration key to ConfigOption

2024-01-01 Thread Xuannan Su
Hi all,

Thank you for all your comments! The FLIP has been updated
accordingly. Please let me know if you have any further questions or
comments.

Also, note that many people are on Christmas break, so we will keep
the discussion open for another week.

Best,
Xuannan

On Wed, Dec 27, 2023 at 5:20 PM Rui Fan <1996fan...@gmail.com> wrote:
>
> > After some investigation, it turns out those options of input/output
> > format are only publicly exposed in the DataSet docs[2], which is
> > deprecated. Thus, marking them as deprecated and removed in Flink 2.0
> > looks fine to me.
>
> Thanks Xuannan for the detailed investigation, if so, deprecate them
> and removing them in Flink 2.0 looks good to me.
>
> > I think the key of LOCAL_NUMBER_TASK_MANAGER is better as
> > 'minicluster.number-of-taskmanagers' or 'minicluster.taskmanager-number'
> > instead of 'minicluster.number-taskmanager'.
>
> Thanks Hang for the good suggestion! 'minicluster.number-of-taskmanagers'
> sounds good to me, it's similar to taskmanager.numberOfTaskSlots.
>
> Best,
> Rui
>
> On Wed, Dec 27, 2023 at 1:56 PM Hang Ruan  wrote:
>>
>> Hi, Rui Fan.
>>
>> Thanks for this FLIP.
>>
>> I think the key of LOCAL_NUMBER_TASK_MANAGER is better as
>> 'minicluster.number-of-taskmanagers' or 'minicluster.taskmanager-number'
>> instead of 'minicluster.number-taskmanager'.
>>
>> Best,
>> Hang
>>
>> Xuannan Su  于2023年12月27日周三 12:40写道:
>>
>> > Hi Xintong and Rui,
>> >
>> > Thanks for the quick feedback and the suggestions.
>> >
>> > > 1. I think the default value for `TASK_MANAGER_LOG_PATH_KEY` should be
>> > "no
>> > > default".
>> >
>> > I have considered both ways of describing the default value. However,
>> > I found out that some of the configurations, such as `web.tmpdir`, put
>> > `System.getProperty()` in the default value [1]. Some are putting the
>> > description in the default value column[2]. So I just picked the first
>> > one. I am fine with either way, so long as they are consistent. WDYT?
>> >
>> > > 3. Simply saying "getting / setting value with string key is discouraged"
>> > > in JavaDoc of get/setString is IMHO a bit confusing. People may have the
>> > > question why would we keep the discouraged interfaces at all. I would
>> > > suggest the following:
>> > > ```
>> > > We encourage users and developers to always use ConfigOption for getting
>> > /
>> > > setting the configurations if possible, for its rich description, type,
>> > > default-value and other supports. The string-key-based getter / setter
>> > > should only be used when ConfigOption is not applicable, e.g., the key is
>> > > programmatically generated in runtime.
>> > > ```
>> >
>> > The suggested comment looks good to me. Thanks for the suggestion. I
>> > will update the comment in the FLIP.
>> >
>> > > 2. So I wonder if we can simply mark them as deprecated and remove in
>> > 2.0.
>> >
>> > After some investigation, it turns out those options of input/output
>> > format are only publicly exposed in the DataSet docs[2], which is
>> > deprecated. Thus, marking them as deprecated and removed in Flink 2.0
>> > looks fine to me.
>> >
>> >
>> > @Rui
>> >
>> > > Configuration has a `public  T get(ConfigOption option)` method.
>> > > Could we remove all `Xxx getXxx(ConfigOption configOption)` methods?
>> >
>> > +1 Only keep the get(ConfigOption option),
>> > getOptional(ConfigOption option), and set(ConfigOption option, T
>> > value).
>> >
>> > Best,
>> > Xuannan
>> >
>> > [1]
>> > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#web-tmpdir
>> > [2]
>> > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#kubernetes-container-image-ref
>> > [3]
>> > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/dataset/overview/#data-sources
>> >
>> >
>> >
>> >
>> > On Tue, Dec 26, 2023 at 8:47 PM Xintong Song 
>> > wrote:
>> > >
>> > > >
>> > > > Configuration has a `public  T get(ConfigOption option)` method.
>> > > > Could we remove all `Xxx getXxx(ConfigOption configOption)`
>> > methods?
>> > >
>> > >
>> > >
>> >

Re: [DISCUSS] FLIP-406: Reorganize State & Checkpointing & Recovery Configuration

2023-12-28 Thread Xuannan Su
Hi Zakelly,

Thanks for driving this! The organization of the configuration option
in the FLIP looks much cleaner and easier to understand. +1 to the
FLIP.

Just some questions from me.

1. I think the change to the ConfigOptions should be put in the
`Public Interface` section, instead of `Proposed Changed`, as those
configuration options are public interface.

2. The key `state.checkpoint.cleaner.parallel-mode` seems confusing.
It feels like it is used to choose different modes. In fact, it is a
boolean flag to indicate whether to enable parallel clean. How about
making it `state.checkpoint.cleaner.parallel-mode.enabled`?

3. The `execution.checkpointing.write-buffer` may better be
`execution.checkpointing.write-buffer-size` so that we know it is
configuring the size of the buffer.

Best,
Xuannan


On Wed, Dec 27, 2023 at 7:17 PM Yanfei Lei  wrote:
>
> Hi Zakelly,
>
> > Considering the name occupation, how about naming it as 
> > `execution.checkpointing.type`?
>
> `Checkpoint Type`[1,2] is used to describe aligned/unaligned
> checkpoint, I am inclined to make a choice between
> `execution.checkpointing.incremental` and
> `execution.checkpointing.incremental.enabled`.
>
>
> [1] 
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/monitoring/checkpoint_monitoring/
> [2] 
> https://github.com/apache/flink/blob/master/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.html#L27
>
> --
> Best,
> Yanfei
>
> Zakelly Lan  于2023年12月27日周三 14:41写道:
> >
> > Hi Lijie,
> >
> > Thanks for the reminder! I missed this.
> >
> > Considering the name occupation, how about naming it as
> > `execution.checkpointing.type`?
> >
> > Actually I think the current `execution.checkpointing.mode` is confusing in
> > some ways, maybe `execution.checkpointing.data-consistency` is better.
> >
> >
> > Best,
> > Zakelly
> >
> >
> > On Wed, Dec 27, 2023 at 12:59 PM Lijie Wang 
> > wrote:
> >
> > > Hi Zakelly,
> > >
> > > >> I'm wondering if `execution.checkpointing.savepoint-dir` would be
> > > better.
> > >
> > > `execution.checkpointing.dir` and `execution.checkpointing.savepoint-dir`
> > > are also fine for me.
> > >
> > > >> So I think an enumeration option `execution.checkpointing.mode` which
> > > can be 'full' (default) or 'incremental' would be better
> > >
> > > I agree with using an enumeration option. But currently there is already a
> > > configuration option called `execution.checkpointing.mode`, which is used
> > > to choose EXACTLY_ONCE or AT_LEAST_ONCE. Maybe we need to use another name
> > > or merge these two options.
> > >
> > > Best,
> > > Lijie
> > >
> > > Zakelly Lan  于2023年12月27日周三 11:43写道:
> > >
> > > > Hi everyone,
> > > >
> > > > Thanks all for your comments!
> > > >
> > > > @Yanfei
> > > >
> > > > > 1. For some state backends that do not support incremental checkpoint,
> > > > > how does the execution.checkpointing.incrementaloption take effect? Or
> > > > > is it better to put incremental under state.backend.xxx.incremental?
> > > > >
> > > > I'd rather not put the option for incremental checkpoint under the
> > > > 'state.backend', since it is more about the checkpointing instead of
> > > state
> > > > accessing. Of course, the state backend may not necessarily do
> > > incremental
> > > > checkpoint as requested. If the state backend is not capable of taking
> > > > incremental cp, it is better to fallback to the full cp.
> > > >
> > > > 2. I'm a little worried that putting all configurations into
> > > > > `ExecutionCheckpointingOptions` will introduce some dependency
> > > > > problems. Some options would be used by flink-runtime module, but
> > > > > flink-runtime should not depend on flink-streaming-java. e.g.
> > > > > FLINK-28286[1].
> > > > > So, I prefer to move configurations to `CheckpointingOptions`, WDYT?
> > > > >
> > > >
> > > > Yes, that's a very good point.  Moving to
> > > > `CheckpointingOptions`(flink-core) makes sense.
> > > >
> > > > @Lijie
> > > >
> > > > How about
> > > > > state.savepoints.dir -> execution.checkpointing.savepoint.dir
> > > > > state.checkpoints.dir -> execution.checkpointing.checkpoint.dir
> > > >
> > > >
> > > > Actually, I think the `checkpointing.checkpoint` may cause some
> > > confusion.
> > > > But I'm ok if others agree.
> > > > I'm wondering if `execution.checkpointing.savepoint-dir` would be 
> > > > better.
> > > > WDYT?
> > > >
> > > > 2. We changed the execution.checkpointing.local-copy' to
> > > > > 'execution.checkpointing.local-copy.enabled'. Should we also add
> > > > "enabled"
> > > > > suffix for other boolean type configuration options ? For example,
> > > > > execution.checkpointing.incremental ->
> > > > > execution.checkpointing.incremental.enabled
> > > > >
> > > >
> > > > Actually, the incremental cp is something like choosing a mode for doing
> > > > checkpoint instead of enabling a function. So I think an enumeration
> > > option
> > > > `execution.checkpointing.mode` which 

Re: [DISCUSS] FLIP-405: Migrate string configuration key to ConfigOption

2023-12-26 Thread Xuannan Su
Hi Xintong and Rui,

Thanks for the quick feedback and the suggestions.

> 1. I think the default value for `TASK_MANAGER_LOG_PATH_KEY` should be "no
> default".

I have considered both ways of describing the default value. However,
I found out that some of the configurations, such as `web.tmpdir`, put
`System.getProperty()` in the default value [1]. Some are putting the
description in the default value column[2]. So I just picked the first
one. I am fine with either way, so long as they are consistent. WDYT?

> 3. Simply saying "getting / setting value with string key is discouraged"
> in JavaDoc of get/setString is IMHO a bit confusing. People may have the
> question why would we keep the discouraged interfaces at all. I would
> suggest the following:
> ```
> We encourage users and developers to always use ConfigOption for getting /
> setting the configurations if possible, for its rich description, type,
> default-value and other supports. The string-key-based getter / setter
> should only be used when ConfigOption is not applicable, e.g., the key is
> programmatically generated in runtime.
> ```

The suggested comment looks good to me. Thanks for the suggestion. I
will update the comment in the FLIP.

> 2. So I wonder if we can simply mark them as deprecated and remove in 2.0.

After some investigation, it turns out those options of input/output
format are only publicly exposed in the DataSet docs[2], which is
deprecated. Thus, marking them as deprecated and removed in Flink 2.0
looks fine to me.


@Rui

> Configuration has a `public  T get(ConfigOption option)` method.
> Could we remove all `Xxx getXxx(ConfigOption configOption)` methods?

+1 Only keep the get(ConfigOption option),
getOptional(ConfigOption option), and set(ConfigOption option, T
value).

Best,
Xuannan

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#web-tmpdir
[2] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#kubernetes-container-image-ref
[3] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/dataset/overview/#data-sources




On Tue, Dec 26, 2023 at 8:47 PM Xintong Song  wrote:
>
> >
> > Configuration has a `public  T get(ConfigOption option)` method.
> > Could we remove all `Xxx getXxx(ConfigOption configOption)` methods?
>
>
>
> Note: all `public void setXxx(ConfigOption key, Xxx value)` methods
> > can be replaced with `public  Configuration set(ConfigOption option,
> > T value)` as well.
>
>
> +1
>
>
> Best,
>
> Xintong
>
>
>
> On Tue, Dec 26, 2023 at 8:44 PM Xintong Song  wrote:
>
> > These features don't have a public option, but they work. I'm not sure
> >> whether these features are used by some advanced users.
> >> Actually, I think some of them are valuable! For example:
> >>
> >> - ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE
> >>   allows users to define the start command of the yarn container.
> >> - FileInputFormat.ENUMERATE_NESTED_FILES_FLAG allows
> >>   flink job reads all files under the directory even if it has nested
> >> directories.
> >>
> >> This FLIP focuses on the refactor option, I'm afraid these features are
> >> used
> >> in some production and removing these features will affect some flink
> >> jobs.
> >> So I prefer to keep these features, WDTY?
> >>
> >
> > First of all, I don't think we should support any knobs that users can
> > only learn how to use from reading Flink's internal codes. From this
> > perspective, for existing string-keyed knobs that are not mentioned in any
> > public documentation, yes we can argue that they are functioning, but we
> > can also argue that they are not really exposed to users. That means
> > migrating them to ConfigOption is not a pure refactor, but would make
> > something that used to be hidden from users now exposed to users. For such
> > options, I personally would lean toward not exposing them. If we consider
> > them as already exposed, then process-wise there's no problem in
> > deprecating some infrequently-used options and removing them in a major
> > version bump, and if they are proved needed later we can add them back
> > anytime. On the other hand, if we consider them as not yet exposed, then
> > removing them later would be a breaking change.
> >
> >
> > Secondly, I don't really come up with any cases where users need to tune
> > these knobs. E.g., why would we allow users to customize the yarn container
> > start command while we already provide `env.java.opts`? And what would be
> > the problem if Flink just doesn't support nested files? And even worse,
> > such knobs may provide chances for users to shoot themself in the foot.
> > E.g., what if %jvmmem% is missing from a user-provided container start
> > command? Admittedly, there might be a small fraction of advanced users that
> > know how to use these knobs. However, those users usually have their own
> > custom fork of Flink, and it should not be a big problem for them to build
> > such abilities by 

Re: [DISCUSS] Should Configuration support getting value based on String key?

2023-12-19 Thread Xuannan Su
Hi Rui,

I am fine with keeping the `String getString(String key, String
defaultValue)` if more people favor it. However, we should let the
user know that we encourage using ConfigOptions over the string-based
configuration key, as Timo said, we should add the message to `String
getString(String key, String defaultValue)` method.

Best,
Xuannan

On Tue, Dec 19, 2023 at 7:55 PM Rui Fan <1996fan...@gmail.com> wrote:
>
> > > I noticed that Configuration is used in
> > > DistributedCache#writeFileInfoToConfig and readFileInfoFromConfig
> > > to store some cacheFile meta-information. Their keys are
> > > temporary(key name with number) and it is not convenient
> > > to predefine ConfigOption.
> >
> >
> > True, this one requires a bit more effort to migrate from string-key to
> > ConfigOption, but still should be doable. Looking at how the two mentioned
> > methods are implemented and used, it seems what is really needed is
> > serialization and deserialization of `DistributedCacheEntry`-s. And all the
> > entries are always written / read at once. So I think we can serialize the
> > whole set of entries into a JSON string (or something similar), and use one
> > ConfigOption with a deterministic key for it, rather than having one
> > ConfigOption for each field in each entry. WDYT?
>
> Hi Xintong, thanks for the good suggestion! Most of the entries can be
> serialized to a json string, and we can only write/read them at once.
> The CACHE_FILE_BLOB_KEY is special, its type is byte[], we need to
> store it by the setBytes/getBytes.
>
> Also, I have an offline discussion with @Xuannan Su : refactoring all code
> that uses String as key requires a separate FLIP. And we will provide
> detailed FLIP  later.
>
> --
>
> Hi all, thanks everyone for the lively discussion. It's really a trade-off to
> keep "String getString(String key, String defaultValue)" or not.
> (It's not a right or wrong thing.)
>
> Judging from the discussion, most discussants can accept that keeping
> `String getString(String key, String defaultValue)` and depreate the
> rest of `getXxx(String key, Xxx defaultValue)`.
>
> cc @Xintong Song @Xuannan Su , WDYT?
>
> Best,
> Rui
>
> On Fri, Dec 15, 2023 at 11:38 AM Zhu Zhu  wrote:
>>
>> I think it's not clear whether forcing using ConfigOption would hurt
>> the user experience.
>>
>> Maybe it does at the beginning, because using string keys to access
>> Flink configuration can be simpler for new components/jobs.
>> However, problems may happen later if the configuration usages become
>> more complex, like key renaming, using types other than strings, and
>> other problems that ConfigOption was invented to address.
>>
>> Personally I prefer to encourage the usage of ConfigOption.
>> Jobs should use GlobalJobParameter for custom config, which is different
>> from the Configuration interface. Therefore, Configuration is mostly
>> used in other components/plugins, in which case the long-term maintenance
>> can be important.
>>
>> However, since it is not a right or wrong choice, I'd also be fine
>> to keep the `getString()` method if more devs/users are in favor of it.
>>
>> Thanks,
>> Zhu
>>
>> Timo Walther  于2023年12月14日周四 17:41写道:
>>
>> > The configuration in Flink is complicated and I fear we won't have
>> > enough capacity to substantially fix it. The introduction of
>> > ReadableConfig, WritableConfig, and typed ConfigOptions was a right step
>> > into making the code more maintainable. From the Flink side, every read
>> > access should go through ConfigOption.
>> >
>> > However, I also understand Gyula pragmatism here because (practically
>> > speaking) users get access `getString()` via `toMap().get()`. So I'm
>> > fine with removing the deprecation for functionality that is available
>> > anyways. We should, however, add the message to `getString()` that this
>> > method is discouraged and `get(ConfigOption)` should be the preferred
>> > way of accessting Configuration.
>> >
>> > In any case we should remove the getInt and related methods.
>> >
>> > Cheers,
>> > Timo
>> >
>> >
>> > On 14.12.23 09:56, Gyula Fóra wrote:
>> > > I see a strong value for user facing configs to use ConfigOption and this
>> > > should definitely be an enforced convention.
>> > >
>> > > However with the Flink project growing and many other components and even
>>

[jira] [Created] (FLINK-33890) Determine the initial status before receiving the first RecordAttributes

2023-12-19 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-33890:
--

 Summary: Determine the initial status before receiving the first 
RecordAttributes 
 Key: FLINK-33890
 URL: https://issues.apache.org/jira/browse/FLINK-33890
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: Xuannan Su


Currently, all the operators are initialized with non-backlog mode. Ideally, we 
should determine the initial status before receiving the first 
{{RecordAttributes}} so that we don't have to initialize the operator in 
non-backlog mode and immediately switch to backlog mode before processing any 
records.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-396: Trial to test GitHub Actions as an alternative for Flink's current Azure CI infrastructure

2023-12-13 Thread Xuannan Su
Thanks for driving this!

+1(non-binding)

Best,
Xuannan

On Thu, Dec 14, 2023 at 11:04 AM Xintong Song  wrote:
>
> +1 (binding)
>
> Best,
>
> Xintong
>
>
>
> On Thu, Dec 14, 2023 at 10:05 AM Jiabao Sun 
> wrote:
>
> > Thanks Matthias for this hard work!
> >
> > +1(non-binding)
> >
> > Best,
> > Jiabao
> >
> >
> > > 2023年12月14日 09:57,Leonard Xu  写道:
> > >
> > > +1(binding)
> > >
> > >
> > > Best,
> > > Leonard
> > >
> > >> 2023年12月13日 下午10:59,Benchao Li  写道:
> > >>
> > >> +1 (binding)
> > >>
> > >> Thanks Matthias for driving it!
> > >>
> > >> Etienne Chauchot  于2023年12月13日周三 21:35写道:
> > >>>
> > >>> Thanks Matthias for your hard work !
> > >>>
> > >>> +1 (binding)
> > >>>
> > >>> Best
> > >>>
> > >>> Etienne
> > >>>
> > >>> Le 12/12/2023 à 11:23, Lincoln Lee a écrit :
> >  +1 (binding)
> > 
> >  Thanks for driving this!
> > 
> >  Best,
> >  Lincoln Lee
> > 
> > 
> >  Yun Tang  于2023年12月12日周二 17:52写道:
> > 
> > > Thanks for Matthias driving this work!
> > >
> > > +1 (binding)
> > >
> > > Best
> > > Yun Tang
> > > 
> > > From: Yangze Guo
> > > Sent: Tuesday, December 12, 2023 16:12
> > > To:dev@flink.apache.org  
> > > Subject: Re: [VOTE] FLIP-396: Trial to test GitHub Actions as an
> > > alternative for Flink's current Azure CI infrastructure
> > >
> > > +1 (binding)
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Tue, Dec 12, 2023 at 3:51 PM Yuxin Tan
> > wrote:
> > >> +1 (non binding)
> > >> Thanks for the effort.
> > >>
> > >> Best,
> > >> Yuxin
> > >>
> > >>
> > >> Samrat Deb  于2023年12月12日周二 15:25写道:
> > >>
> > >>> +1 (non binding)
> > >>> Thanks for driving
> > >>>
> > >>> On Tue, 12 Dec 2023 at 11:59 AM, Sergey Nuyanzin<
> > snuyan...@gmail.com>
> > >>> wrote:
> > >>>
> >  +1 (binding)
> > 
> >  Thanks for driving this
> > 
> >  On Tue, Dec 12, 2023, 07:22 Rui Fan<1996fan...@gmail.com>  wrote:
> > 
> > > +1(binding)
> > >
> > > Best,
> > > Rui
> > >
> > > On Tue, Dec 12, 2023 at 11:58 AM weijie guo <
> > > guoweijieres...@gmail.com
> > > wrote:
> > >
> > >> Thanks Matthias for this efforts.
> > >>
> > >> +1(binding)
> > >>
> > >>
> > >> Best regards,
> > >>
> > >> Weijie
> > >>
> > >>
> > >> Matthias Pohl  于2023年12月11日周一
> > >>> 21:51写道:
> > >>> Hi everyone,
> > >>> I'd like to start a vote on FLIP-396 [1]. It covers enabling
> > > GitHub
> > >> Actions
> > >>> (GHA) in Apache Flink. This means that GHA workflows will run
> > > aside
> > > from
> > >>> the usual Azure CI workflows in a trial phase (which ends
> > > earliest
> >  with
> > >> the
> > >>> release of Flink 1.19). Azure CI will still serve as the
> > > project's
> > > ground
> > >>> of truth until the community decides in a final vote to switch
> > > to
> > >>> GHA
> > > or
> > >>> stick to Azure CI.
> > >>>
> > >>> The related discussion thread can be found in [2].
> > >>>
> > >>> The vote will remain open for at least 72 hours and only
> > > concluded
> > >>> if
> > >> there
> > >>> are no objections and enough (i.e. at least 3) binding votes.
> > >>>
> > >>> Matthias
> > >>>
> > >>> [1]
> > >>>
> > >>>
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-396%3A+Trial+to+test+GitHub+Actions+as+an+alternative+for+Flink%27s+current+Azure+CI+infrastructure
> > >>> [2]
> > >>> https://lists.apache.org/thread/h4cmv7l3y8mxx2t435dmq4ltco4sbrgb
> > >>> --
> > >>>
> > >>> [image: Aiven]
> > >>>
> > >>> *Matthias Pohl*
> > >>> Opensource Software Engineer, *Aiven*
> > >>> matthias.p...@aiven.io  |  +49 170 9869525
> > >>> aiven.io|   <
> > >> https://www.facebook.com/aivencloud
> > >>>  <
> > https://twitter.com/aiven_io>
> > >>> *Aiven Deutschland GmbH*
> > >>> Alexanderufer 3-7, 10117 Berlin
> > >>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> > >>> Amtsgericht Charlottenburg, HRB 209739 B
> > >>>
> > >>
> > >>
> > >>
> > >> --
> > >>
> > >> Best,
> > >> Benchao Li
> >
> >


Re: [DISCUSS] Should Configuration support getting value based on String key?

2023-12-13 Thread Xuannan Su
Hi Rui,

We are currently revisiting all the configurations for Flink 2.0, and
it turns out that many string-based configurations in
`ConfigConstants` are deprecated and have been replaced by
`ConfigOptions`. Since `ConfigOptions` offers many advantages over
string-based configurations for the end user, I believe we should
encourage users to set and get the Flink configuration exclusively
with `ConfigOption`. And we are going to eventually replace all the
string-based configurations with `ConfigOptions` for this use case.

For the first use case you mentioned, I think they are all internal usage,
and we should aim to replace them with ConfigOptions gradually.
Meanwhile, we may consider making those getters/setters for internal
use only while the replacement is in progress.

For the second use case, IIUC, you need to iterate over all the
configurations to replace some old configuration keys with new ones. I
believe  `toMap` is suitable for this scenario.

Best,
Xuannan



On Wed, Dec 13, 2023 at 9:04 PM Rui Fan <1996fan...@gmail.com> wrote:
>
> Thanks Zhu for the quick response!
>
> > It is not a blocker of the deprecation, epsecially given that they are
> not standard
> > configuration and are just using Configuration class for convenience.
>
> Yes, it's not a blocker of deprecation.
>
> > These are internal usages and we can have an internal getter method for
> them.
>
> For case1, do you mean we reuse the old getString method as the internal
> getter method or add a new getter method?
>
> Anyway, it's fine for me if we have an internal getter method. As I
> understand,
> the public method without any annotation will be the internal method, right?
>
> > Not sure why it's required to convert all keys or values. If it is used
> > to create strings for dynamic properties or config files to deploy jobs,
> > toMap()/toFileWritableMap() may be a better choice. They are already
> > used in this kind of scenarios.
>
> For case2, it's really a special scenario, and toMap() is fine for case2.
> Case2 uses the getString instead of toMap due to getString is easier.
> Also, kubernetes-operator is also a internal usage, if case1 is solved,
> case2 also can use the internal getter method.So we can focus on case1.
>
> Thank you
>
> Best,
> Rui
>
> On Wed, Dec 13, 2023 at 8:01 PM Zhu Zhu  wrote:
>
> > Hi Rui,
> >
> > I'd like to understand why there is a strong requirement for these
> > deprecated
> > methods. The ConfigOption param methods help to do the type conversion so
> > that users do not need to do it by themselves.
> >
> > For the 2 reasons to keep these methods mentioned above:
> >
> > > 1. A lot of scenarios don't define the ConfigOption, they using
> > String as the key and value directly, such as: StreamConfig,
> > TaskConfig, DistributedCache, etc.
> >
> > These are internal usages and we can have an internal getter method for
> > them.
> > It is not a blocker of the deprecation, epsecially given that they are not
> > standard
> > configuration and are just using Configuration class for convenience.
> >
> > > 2. Some code wanna convert all keys or values, this converting
> > is generic, so the getString(String key, String defaultValue) is needed.
> > Such as: kubernetes-operator [3].
> >
> > Not sure why it's required to convert all keys or values. If it is used
> > to create strings for dynamic properties or config files to deploy jobs,
> > toMap()/toFileWritableMap() may be a better choice. They are already
> > used in this kind of scenarios.
> > If it just needs to read some of the config, why not using the proposed
> > way to read and parse the config? Pre-defined ConfigOptions are better
> > for configuration maintenance, compared to arbitrary strings
> >
> > Thanks,
> > Zhu
> >
> > Rui Fan <1996fan...@gmail.com> 于2023年12月13日周三 19:27写道:
> >
> >> Thanks Martijn for the quick clarification!
> >>
> >> I see Zhu Zhu and Junrui Lee are working on configuration related
> >> work of Flink-2.0. I would cc them, and hear some thoughts from them.
> >>
> >> Best,
> >> Rui
> >>
> >> On Wed, Dec 13, 2023 at 7:17 PM Martijn Visser 
> >> wrote:
> >>
> >>> Hi Rui,
> >>>
> >>> I'm more wondering if part of the configuration layer changes would
> >>> mean that these APIs would be removed in 2.0. Because if so, then I
> >>> don't think we should remove the Deprecate annotation. But I have very
> >>> little visibility on the plans for the configuration layer.
> >>>
> >>> Thanks,
> >>>
> >>> Martijn
> >>>
> >>> On Wed, Dec 13, 2023 at 12:15 PM Rui Fan <1996fan...@gmail.com> wrote:
> >>> >
> >>> > Hi Martijn,
> >>> >
> >>> > Thanks for your reply!
> >>> >
> >>> > I noticed the 2.0 is doing some work related to clean configuration.
> >>> > But this discussion is different from other work. Most of other work
> >>> > are deprecate some Apis in Flink-1.19 and remove them in Flink-2.0.
> >>> >
> >>> > This discussion is a series of methods have been marked to @Deprecate,
> >>> > but they are still used so far. I propose remove 

[jira] [Created] (FLINK-33810) Propagate RecordAttributes that contains isProcessingBacklog status

2023-12-12 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-33810:
--

 Summary: Propagate RecordAttributes that contains 
isProcessingBacklog status
 Key: FLINK-33810
 URL: https://issues.apache.org/jira/browse/FLINK-33810
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: Xuannan Su






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33399) Support switching from batch to stream mode for KeyedCoProcessOperator and IntervalJoinOperator

2023-10-30 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-33399:
--

 Summary: Support switching from batch to stream mode for 
KeyedCoProcessOperator and IntervalJoinOperator
 Key: FLINK-33399
 URL: https://issues.apache.org/jira/browse/FLINK-33399
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: Xuannan Su


Support switching from batch to stream mode for KeyedCoProcessOperator and 
IntervalJoinOperator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33398) Support switching from batch to stream mode for one input stream operator

2023-10-30 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-33398:
--

 Summary: Support switching from batch to stream mode for one input 
stream operator
 Key: FLINK-33398
 URL: https://issues.apache.org/jira/browse/FLINK-33398
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: Xuannan Su


Introduce the infra to support switching from batch to stream mode for one 
input stream operator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-10-19 Thread Xuannan Su
Hi all,

There is still a discussion ongoing. I will cancel the vote for now.

Best Regards,
Xuannan

On Thu, Oct 19, 2023 at 2:08 PM Dong Lin  wrote:
>
> Thanks for the FLIP!
>
> +1 (binding)
>
> On Wed, Oct 18, 2023 at 10:25 AM Xuannan Su  wrote:
>
> > Hi all,
> >
> > We would like to start the vote for FLIP-328: Allow source operators
> > to determine isProcessingBacklog based on watermark lag [1]. This FLIP
> > was discussed in this thread [2].
> >
> > The vote will be open until at least Oct 21st (at least 72 hours),
> > following the consensus voting process.
> >
> > Cheers,
> > Xuannan
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > [2] https://lists.apache.org/thread/kmtyqxsp35cobx3g0kwfn6ksxlq57m4t
> >


[VOTE] FLIP-329: Add operator attribute to specify support for object-reuse

2023-10-18 Thread Xuannan Su
Hi all,

We would like to start the vote for FLIP-329: Add operator attribute
to specify support for object-reuse[1]. This FLIP was discussed in
this thread [2].

The vote will be open until at least Oct 22nd (at least 72 hours),
following the consensus voting process.

Cheers,
Xuannan

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255073749
[2] https://lists.apache.org/thread/2h2r68m7bwsnvd8w1m50rktd7w6mr5n4


[VOTE] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-10-17 Thread Xuannan Su
Hi all,

We would like to start the vote for FLIP-328: Allow source operators
to determine isProcessingBacklog based on watermark lag [1]. This FLIP
was discussed in this thread [2].

The vote will be open until at least Oct 21st (at least 72 hours),
following the consensus voting process.

Cheers,
Xuannan

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
[2] https://lists.apache.org/thread/kmtyqxsp35cobx3g0kwfn6ksxlq57m4t


Re: [DISCUSS] FLIP-329: Add operator attribute to specify support for object-reuse

2023-10-16 Thread Xuannan Su
Hi Martijn,

Do you have further comments regarding the FLIP? If not, I'd like to
move forward and start the voting in two days.

Best regards,
Xuannan


On Sat, Oct 7, 2023 at 3:02 PM Xuannan Su  wrote:
>
> Hi Martijn,
>
> Sorry for the late reply. I don't think it is feasible to always
> enable object reuse. If I understand correctly, object reuse is
> disabled by default to guarantee correctness because we cannot assume
> that the custom operator/function is safe to enable object reuse.
>
> The method proposed in the FLIP is to let the operator inform the
> Flink runtime whether it is safe to reuse the emitted records. It
> provides a fine-grained way of controlling the object reuse behavior
> at the operator level. In the long term, instead of always enabling
> object reuse, it is better to remove the object-reuse configuration
> and let the runtime determine whether to enable object reuse for each
> operator.
>
> I hope that addresses your question. Please let me know if you have
> further comments.
>
> Best regards,
> Xuannan
>
>
> On Fri, Sep 29, 2023 at 8:47 AM Martijn Visser  
> wrote:
> >
> > Hi Xuannan,
> >
> > I have one question more from a strategic point of view: given that
> > we're working on Flink 2.0, wouldn't we actually want to be in a
> > situation where object-reuse is always used and don't make it
> > configurable anymore? IIRC, the only reason why it's a configuration
> > is for backward compatibility.
> >
> > Best regards,
> >
> > Martijn
> >
> > On Tue, Sep 26, 2023 at 1:32 AM Xuannan Su  wrote:
> > >
> > > Hi all,
> > >
> > > We would like to revive the discussion and provide a quick update on
> > > the recent work of the FLIP. We have implemented a POC[1], run cases
> > > in the flink-benchmarks[2] against the POC, and verified that many of
> > > the operators in the benchmark will enable object-reuse without code
> > > changes, while the global object-reuse is disabled.
> > >
> > > Please let me know if you have any further comments on the FLIP. If
> > > there are no more comments, we will open the voting in 3 days.
> > >
> > > Best regards,
> > > Xuannan
> > >
> > > [1] https://github.com/apache/flink/pull/22897
> > > [2] https://github.com/apache/flink-benchmarks
> > >
> > >
> > > On Fri, Jul 7, 2023 at 9:18 AM Dong Lin  wrote:
> > > >
> > > > Hi Jing,
> > > >
> > > > Thank you for the suggestion. Yes, we can extend it to support null if 
> > > > in
> > > > the future we find any use-case for this flexibility.
> > > >
> > > > Best,
> > > > Dong
> > > >
> > > > On Thu, Jul 6, 2023 at 7:55 PM Jing Ge  wrote:
> > > >
> > > > > Hi Dong,
> > > > >
> > > > > one scenario I could imagine is that users could enable global object
> > > > > reuse features but force deep copy for some user defined specific 
> > > > > functions
> > > > > because of any limitations. But that is only my gut feeling. And 
> > > > > agree, we
> > > > > could keep the solution simple for now as FLIP described and upgrade 
> > > > > to 3VL
> > > > > once there are such real requirements that are rising.
> > > > >
> > > > > Best regards,
> > > > > Jing
> > > > >
> > > > > On Thu, Jul 6, 2023 at 12:30 PM Dong Lin  wrote:
> > > > >
> > > > >> Hi Jing,
> > > > >>
> > > > >> Thank you for the detailed explanation. Please see my reply inline.
> > > > >>
> > > > >> On Thu, Jul 6, 2023 at 3:17 AM Jing Ge  wrote:
> > > > >>
> > > > >>> Hi Xuannan, Hi Dong,
> > > > >>>
> > > > >>> Thanks for your clarification.
> > > > >>>
> > > > >>> @Xuannan
> > > > >>>
> > > > >>> A Jira ticket has been created for the doc update:
> > > > >>> https://issues.apache.org/jira/browse/FLINK-32546
> > > > >>>
> > > > >>> @Dong
> > > > >>>
> > > > >>> I don't have a concrete example. I just thought about it from a
> > > > >>> conceptual or pattern's perspective. Since we have 1. 
> > > > >>> coarse-grained global
> > > > &g

Re: [ANNOUNCE] New Apache Flink Committer - Jane Chan

2023-10-15 Thread Xuannan Su
Congratulations Jane!

Best,
Xuannan

On Mon, Oct 16, 2023 at 10:21 AM Yun Tang  wrote:
>
> Congratulations, Jane!
>
> Best
> Yun Tang
> 
> From: Rui Fan <1996fan...@gmail.com>
> Sent: Monday, October 16, 2023 10:16
> To: dev@flink.apache.org 
> Cc: qingyue@gmail.com 
> Subject: Re: [ANNOUNCE] New Apache Flink Committer - Jane Chan
>
> Congratulations Jane!
>
> Best,
> Rui
>
> On Mon, Oct 16, 2023 at 10:15 AM yu zelin  wrote:
>
> > Congratulations!
> >
> > Best,
> > Yu Zelin
> >
> > > 2023年10月16日 09:58,Jark Wu  写道:
> > >
> > > Hi, everyone
> > >
> > > On behalf of the PMC, I'm very happy to announce Jane Chan as a new Flink
> > > Committer.
> > >
> > > Jane started code contribution in Jan 2021 and has been active in the
> > Flink
> > > community since. She authored more than 60 PRs and reviewed more than 40
> > > PRs. Her contribution mainly revolves around Flink SQL, including Plan
> > > Advice (FLIP-280), operator-level state TTL (FLIP-292), and ALTER TABLE
> > > statements (FLINK-21634). Jane participated deeply in development
> > > discussions and also helped answer user question emails. Jane was also a
> > > core contributor of Flink Table Store (now Paimon) when the project was
> > in
> > > the early days.
> > >
> > > Please join me in congratulating Jane Chan for becoming a Flink
> > Committer!
> > >
> > > Best,
> > > Jark Wu (on behalf of the Flink PMC)
> >
> >


Re: [DISCUSS] FLIP-329: Add operator attribute to specify support for object-reuse

2023-10-07 Thread Xuannan Su
Hi Martijn,

Sorry for the late reply. I don't think it is feasible to always
enable object reuse. If I understand correctly, object reuse is
disabled by default to guarantee correctness because we cannot assume
that the custom operator/function is safe to enable object reuse.

The method proposed in the FLIP is to let the operator inform the
Flink runtime whether it is safe to reuse the emitted records. It
provides a fine-grained way of controlling the object reuse behavior
at the operator level. In the long term, instead of always enabling
object reuse, it is better to remove the object-reuse configuration
and let the runtime determine whether to enable object reuse for each
operator.

I hope that addresses your question. Please let me know if you have
further comments.

Best regards,
Xuannan


On Fri, Sep 29, 2023 at 8:47 AM Martijn Visser  wrote:
>
> Hi Xuannan,
>
> I have one question more from a strategic point of view: given that
> we're working on Flink 2.0, wouldn't we actually want to be in a
> situation where object-reuse is always used and don't make it
> configurable anymore? IIRC, the only reason why it's a configuration
> is for backward compatibility.
>
> Best regards,
>
> Martijn
>
> On Tue, Sep 26, 2023 at 1:32 AM Xuannan Su  wrote:
> >
> > Hi all,
> >
> > We would like to revive the discussion and provide a quick update on
> > the recent work of the FLIP. We have implemented a POC[1], run cases
> > in the flink-benchmarks[2] against the POC, and verified that many of
> > the operators in the benchmark will enable object-reuse without code
> > changes, while the global object-reuse is disabled.
> >
> > Please let me know if you have any further comments on the FLIP. If
> > there are no more comments, we will open the voting in 3 days.
> >
> > Best regards,
> > Xuannan
> >
> > [1] https://github.com/apache/flink/pull/22897
> > [2] https://github.com/apache/flink-benchmarks
> >
> >
> > On Fri, Jul 7, 2023 at 9:18 AM Dong Lin  wrote:
> > >
> > > Hi Jing,
> > >
> > > Thank you for the suggestion. Yes, we can extend it to support null if in
> > > the future we find any use-case for this flexibility.
> > >
> > > Best,
> > > Dong
> > >
> > > On Thu, Jul 6, 2023 at 7:55 PM Jing Ge  wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > one scenario I could imagine is that users could enable global object
> > > > reuse features but force deep copy for some user defined specific 
> > > > functions
> > > > because of any limitations. But that is only my gut feeling. And agree, 
> > > > we
> > > > could keep the solution simple for now as FLIP described and upgrade to 
> > > > 3VL
> > > > once there are such real requirements that are rising.
> > > >
> > > > Best regards,
> > > > Jing
> > > >
> > > > On Thu, Jul 6, 2023 at 12:30 PM Dong Lin  wrote:
> > > >
> > > >> Hi Jing,
> > > >>
> > > >> Thank you for the detailed explanation. Please see my reply inline.
> > > >>
> > > >> On Thu, Jul 6, 2023 at 3:17 AM Jing Ge  wrote:
> > > >>
> > > >>> Hi Xuannan, Hi Dong,
> > > >>>
> > > >>> Thanks for your clarification.
> > > >>>
> > > >>> @Xuannan
> > > >>>
> > > >>> A Jira ticket has been created for the doc update:
> > > >>> https://issues.apache.org/jira/browse/FLINK-32546
> > > >>>
> > > >>> @Dong
> > > >>>
> > > >>> I don't have a concrete example. I just thought about it from a
> > > >>> conceptual or pattern's perspective. Since we have 1. coarse-grained 
> > > >>> global
> > > >>> switch(CGS as abbreviation), i.e. the pipeline.object-reuse and 2.
> > > >>> fine-grained local switch(FGS as abbreviation), i.e. the
> > > >>> objectReuseCompliant variable for specific operators/functions, there 
> > > >>> will
> > > >>> be the following patterns with appropriate combinations:
> > > >>>
> > > >>> pattern 1: coarse-grained switch only. Local object reuse will be
> > > >>> controlled by the coarse-grained switch:
> > > >>> 1.1 cgs == true -> local object reused enabled
> > > >>> 1.2 cgs == true  -> local object reused enabled
> > > >>

[jira] [Created] (FLINK-33202) FLIP-327: Support switching from batch to stream mode to improve throughput when processing backlog data

2023-10-07 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-33202:
--

 Summary: FLIP-327: Support switching from batch to stream mode to 
improve throughput when processing backlog data
 Key: FLINK-33202
 URL: https://issues.apache.org/jira/browse/FLINK-33202
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Task
Reporter: Xuannan Su
 Fix For: 1.19.0


Umbrella issue for 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data|https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog]
h4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-329: Add operator attribute to specify support for object-reuse

2023-09-26 Thread Xuannan Su
 We have updated the FLIP as suggested.
> >>>>
> >>>>
> >>>>>
> >>>>> 2.
> >>>>>
> >>>>>-
> >>>>>
> >>>>>*If pipeline.object-reuse is set to true, records emitted by this
> >>>>>operator will be re-used.*
> >>>>>-
> >>>>>
> >>>>>*Otherwise, if getIsObjectReuseCompliant() returns true, records
> >>>>>emitted by this operator will be re-used.*
> >>>>>-
> >>>>>
> >>>>>*Otherwise, records emitted by this operator will be deep-copied
> >>>>>before being given to the next operator in the chain.*
> >>>>>
> >>>>>
> >>>>> If I understand you correctly,  the hard coding objectReusedCompliant
> >>>>> should have higher priority over the configuration, the checking logic
> >>>>> should be:
> >>>>>
> >>>>>-
> >>>>>
> >>>>>*If getIsObjectReuseCompliant() returns true, records emitted by
> >>>>>this operator will be re-used.*
> >>>>>-
> >>>>>
> >>>>>*Otherwise, if pipeline.object-reuse is set to true, records
> >>>>>emitted by this operator will be re-used.*
> >>>>>-
> >>>>>
> >>>>>*Otherwise, records emitted by this operator will be deep-copied
> >>>>>before being given to the next operator in the chain.*
> >>>>>
> >>>>>
> >>>>> The results are the same but the checking logics are different, but
> >>>>> there are some additional thoughts, which lead us to the next question.
> >>>>>
> >>>>
> >>>>>
> >>>>
> >>>>> 3. Current design lets specific operators enable object reuse and
> >>>>> ignore the global config. There could be another thought, on the 
> >>>>> contrary:
> >>>>> if an operator has hard coded the objectReuseCompliant as false, i.e.
> >>>>> disable object reuse on purpose, records should not be reused even if 
> >>>>> the
> >>>>> global config pipeline.object-reused is set to true, which turns out 
> >>>>> that
> >>>>> the objectReuseCompliant could be a triple value logic: ture: force 
> >>>>> object
> >>>>> reusing; false: force deep-copying; unknown: depends on
> >>>>> pipeline.object-reuse config.
> >>>>>
> >>>>
> >>>> With the current proposal, if pipeline.object-reused == true and
> >>>> operatorA's objectReuseCompliant == false, Flink will not deep-copy
> >>>> operatorA's output. I think you are suggesting changing the behavior such
> >>>> that Flink should deep-copy the operatorA's output.
> >>>>
> >>>> Could you explain what is the advantage of this approach compared to
> >>>> the approach described in the FLIP?
> >>>>
> >>>> My concern with this approach is that it can cause performance
> >>>> regression. This is an operator's objectReuseCompliant will be false by
> >>>> default unless it is explicitly overridden. For those jobs which are
> >>>> currently configured with pipeline.object-reused = true, these jobs will
> >>>> likely start to have lower performance (due to object deep-copy) after
> >>>> upgrading to the newer Flink version.
> >>>>
> >>>> Best,
> >>>> Dong
> >>>>
> >>>>
> >>>>>
> >>>>> Best regards,
> >>>>> Jing
> >>>>>
> >>>>>
> >>>>> [1] https://en.wikipedia.org/wiki/JavaBeans
> >>>>>
> >>>>> On Mon, Jul 3, 2023 at 4:25 AM Xuannan Su 
> >>>>> wrote:
> >>>>>
> >>>>>> Hi all,
> >>>>>>
> >>>>>> Dong(cc'ed) and I are opening this thread to discuss our proposal to
> >>>>>> add operator attribute to allow operator to specify support for
> >>>>>> object-reuse [1].
> >>>>>>
> >>>>>> Currently, the default configuration for pipeline.object-reuse is set
> >>>>>> to false to avoid data corruption, which can result in suboptimal
> >>>>>> performance. We propose adding APIs that operators can utilize to
> >>>>>> inform the Flink runtime whether it is safe to reuse the emitted
> >>>>>> records. This enhancement would enable Flink to maximize its
> >>>>>> performance using the default configuration.
> >>>>>>
> >>>>>> Please refer to the FLIP document for more details about the proposed
> >>>>>> design and implementation. We welcome any feedback and opinions on
> >>>>>> this proposal.
> >>>>>>
> >>>>>> Best regards,
> >>>>>>
> >>>>>> Dong and Xuannan
> >>>>>>
> >>>>>> [1]
> >>>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255073749
> >>>>>>
> >>>>>


Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-08 Thread Xuannan Su
Hi Jark and Leonard,

Thanks for the comments. Please see my reply below.

@Jark

> I think a better API doesn't compete with itself. Therefore, I'm in favor of
> supporting the watermark lag threshold for each source without introducing
> any framework API and configuration.

I don't think supporting the watermark lag threshold for each source
can avoid the competition problem. In the case where a user wants to
use a CDC source and also determine backlog status based on watermark
lag, we still need to define the rule when that occurs. With that
said, I think it is more intuitive to combine it with the logical OR
operation, as the strategies (FLIP-309, FLIP-328) only determine when
the source's backlog status should be True. What do you think?

> Besides, this can address another concern that the watermark may be
> generated by DataStream#assignTimestampsAnd
> Watermarks which doesn't
> work with the backlog.watermark-lag-threshold job config

The description of the configuration explicitly states that "a source
would report isProcessingBacklog=true if its watermark lag exceeds the
configured value". It should not confuse the user that
DataStream#assignTimestampsAndWatermarks doesn't work with
backlog.watermark-lag-threshold, as it is not a source.

> Does that mean the job can never back to streaming mode once switches into
> backlog mode? It sounds like not a complete FLIP to me. Is it possible to
> support switching back in this FLIP?

I think the description in the FLIP actually means the other way
around, where the job can never switch back to batch mode once it has
switched into streaming mode. This is to align with the current state
of FLIP-327[1], where only switching from batch to stream mode is
supported.

@Leonard

> > The FLIP describe that: And it should report isProcessingBacklog=false at 
> > the beginning of the snapshot stage.
> This should be “changelog stage”

I think the description is in FLIP-309. Thanks for pointing out. I
updated the description.

> I'm not sure if it's enough to support this feature only in FLIP-27 Source. 
> Although we are pushing the sourceFunction API to be removed, these APIs will 
> be survive one or two versions in flink repo before they are actually removed.

I agree that it is good to support the SourceFunction API. However,
given that the SourceFunction API is marked as deprecated, I think I
will prioritize supporting the FLIP-27 Source. We can support the
SourceFunction API after the
FLIP-27 source. What do you think?

Best regards,
Xuannan

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data




On Fri, Sep 8, 2023 at 1:02 AM Leonard Xu  wrote:
>
> Thanks Xuannan for driving this FLIP !
>
> The proposal generally looks good to me, but I still left some comments:
>
> > One more question about the FLIP is that the FLIP says "Note that this
> > config does not support switching source's isProcessingBacklog from false 
> > to true
> > for now.” Does that mean the job can never back to streaming mode once 
> > switches into
> > backlog mode? It sounds like not a complete FLIP to me. Is it possible to
> > support switching back in this FLIP?
> +1 for Jark’s concern, IIUC, the state transition of IsProcessingBacklog 
> depends on whether the data in the source is processing backlog data or not. 
> Different sources will have different backlog status and which may change 
> over time. From a general perspective, we should not have this restriction.
>
> > The FLIP describe that: And it should report isProcessingBacklog=false at 
> > the beginning of the snapshot stage.
> This should be “changelog stage”
>
> I'm not sure if it's enough to support this feature only in FLIP-27 Source. 
> Although we are pushing the sourceFunction API to be removed, these APIs will 
> be survive one or two versions in flink repo before they are actually removed.
>
> Best,
> Leonard
>
> >
> > Best,
> > Jark
> >
> >
> > On Thu, 7 Sept 2023 at 13:51, Xuannan Su  wrote:
> >
> >> Hi all,
> >>
> >> Thank you for all the reviews and suggestions.
> >>
> >> I believe all the comments have been addressed. If there are no
> >> further comments, I plan to open the voting thread for this FLIP early
> >> next week.
> >>
> >> Best regards,
> >> Xuannan
> >>
> >> On Thu, Sep 7, 2023 at 12:09 AM Jing Ge 
> >> wrote:
> >>>
> >>> Hi Xuannan,
> >>>
> >>> I thought FLIP-328 will compete with FLIP-309 while setting the value of
> >>> the backlog. Understood. Thanks for the hint.
> >>>
&

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-06 Thread Xuannan Su
Hi all,

Thank you for all the reviews and suggestions.

I believe all the comments have been addressed. If there are no
further comments, I plan to open the voting thread for this FLIP early
next week.

Best regards,
Xuannan

On Thu, Sep 7, 2023 at 12:09 AM Jing Ge  wrote:
>
> Hi Xuannan,
>
> I thought FLIP-328 will compete with FLIP-309 while setting the value of
> the backlog. Understood. Thanks for the hint.
>
> Best regards,
> Jing
>
> On Wed, Sep 6, 2023 at 12:12 PM Xuannan Su  wrote:
>
> > Hi Jing,
> >
> > Thank you for the clarification.
> >
> > For the use case you mentioned, I believe we can utilize the
> > HybridSource, as updated in FLIP-309[1], to determine the backlog
> > status. For example, if the user wants to process data before time T
> > in batch mode and after time T in stream mode, they can set the first
> > source of the HybridSource to read up to time T and the last source of
> > the HybridSource to read from time T.
> >
> > Best,
> > Xuannan
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> >
> >
> > On Mon, Sep 4, 2023 at 10:36 PM Jing Ge 
> > wrote:
> > >
> > > Hi Xuannan,
> > >
> > > Thanks for the clarification.
> > >
> > > 3. Event time and process time are two different things. It might be
> > rarely
> > > used, but conceptually, users can process data in the past within a
> > > specific time range in the streaming mode. All data before that range
> > will
> > > be considered as backlog and needed to be processed in the batch mode,
> > > like, e.g. the Present Perfect Progressive tense used in English
> > language.
> > >
> > > Best regards,
> > > Jing
> > >
> > > On Thu, Aug 31, 2023 at 4:45 AM Xuannan Su 
> > wrote:
> > >
> > > > Hi Jing,
> > > >
> > > > Thanks for the reply.
> > > >
> > > > 1. You are absolutely right that the watermark lag threshold must be
> > > > carefully set with a thorough understanding of watermark generation.
> > It is
> > > > crucial for users to take into account the WatermarkStrategy when
> > setting
> > > > the watermark lag threshold.
> > > >
> > > > 2. Regarding pure processing-time based stream processing jobs,
> > > > alternative strategies will be implemented to determine whether the
> > job is
> > > > processing backlog data. I have outlined two possible strategies below:
> > > >
> > > > - Based on the source operator's state. For example, when MySQL CDC
> > source
> > > > is reading snapshot, it can claim isBacklog=true.
> > > > - Based on metrics. For example, when busyTimeMsPerSecond (or
> > > > backPressuredTimeMsPerSecond) > user_specified_threshold, then
> > > > isBacklog=true.
> > > >
> > > > As of the strategies proposed in this FLIP, it rely on generated
> > > > watermarks. Therefore, if a user intends for the job to detect backlog
> > > > status based on watermark, it is necessary to generate the watermark.
> > > >
> > > > 3. I'm afraid I'm not fully grasping your question. From my
> > understanding,
> > > > it should work in both cases. When event times are close to the
> > processing
> > > > time, resulting in watermarks close to the processing time, the job is
> > not
> > > > processing backlog data. On the other hand, when event times are far
> > from
> > > > processing time, causing watermarks to also be distant, if the lag
> > > > surpasses the defined threshold, the job is considered processing
> > backlog
> > > > data.
> > > >
> > > > Best,
> > > > Xuannan
> > > >
> > > >
> > > > > On Aug 31, 2023, at 02:56, Jing Ge 
> > wrote:
> > > > >
> > > > > Hi Xuannan,
> > > > >
> > > > > Thanks for the clarification. That is the part where I am trying to
> > > > > understand your thoughts. I have some follow-up questions:
> > > > >
> > > > > 1. It depends strongly on the watermarkStrategy and how customized
> > > > > watermark generation looks like. It mixes business logic with
> > technical
> > > > > implementation and technical data processing mode. The value of the
> &

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-06 Thread Xuannan Su
Hi Jing,

Thank you for the clarification.

For the use case you mentioned, I believe we can utilize the
HybridSource, as updated in FLIP-309[1], to determine the backlog
status. For example, if the user wants to process data before time T
in batch mode and after time T in stream mode, they can set the first
source of the HybridSource to read up to time T and the last source of
the HybridSource to read from time T.

Best,
Xuannan

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog


On Mon, Sep 4, 2023 at 10:36 PM Jing Ge  wrote:
>
> Hi Xuannan,
>
> Thanks for the clarification.
>
> 3. Event time and process time are two different things. It might be rarely
> used, but conceptually, users can process data in the past within a
> specific time range in the streaming mode. All data before that range will
> be considered as backlog and needed to be processed in the batch mode,
> like, e.g. the Present Perfect Progressive tense used in English language.
>
> Best regards,
> Jing
>
> On Thu, Aug 31, 2023 at 4:45 AM Xuannan Su  wrote:
>
> > Hi Jing,
> >
> > Thanks for the reply.
> >
> > 1. You are absolutely right that the watermark lag threshold must be
> > carefully set with a thorough understanding of watermark generation. It is
> > crucial for users to take into account the WatermarkStrategy when setting
> > the watermark lag threshold.
> >
> > 2. Regarding pure processing-time based stream processing jobs,
> > alternative strategies will be implemented to determine whether the job is
> > processing backlog data. I have outlined two possible strategies below:
> >
> > - Based on the source operator's state. For example, when MySQL CDC source
> > is reading snapshot, it can claim isBacklog=true.
> > - Based on metrics. For example, when busyTimeMsPerSecond (or
> > backPressuredTimeMsPerSecond) > user_specified_threshold, then
> > isBacklog=true.
> >
> > As of the strategies proposed in this FLIP, it rely on generated
> > watermarks. Therefore, if a user intends for the job to detect backlog
> > status based on watermark, it is necessary to generate the watermark.
> >
> > 3. I'm afraid I'm not fully grasping your question. From my understanding,
> > it should work in both cases. When event times are close to the processing
> > time, resulting in watermarks close to the processing time, the job is not
> > processing backlog data. On the other hand, when event times are far from
> > processing time, causing watermarks to also be distant, if the lag
> > surpasses the defined threshold, the job is considered processing backlog
> > data.
> >
> > Best,
> > Xuannan
> >
> >
> > > On Aug 31, 2023, at 02:56, Jing Ge  wrote:
> > >
> > > Hi Xuannan,
> > >
> > > Thanks for the clarification. That is the part where I am trying to
> > > understand your thoughts. I have some follow-up questions:
> > >
> > > 1. It depends strongly on the watermarkStrategy and how customized
> > > watermark generation looks like. It mixes business logic with technical
> > > implementation and technical data processing mode. The value of the
> > > watermark lag threshold must be set very carefully. If the value is too
> > > small. any time, when the watermark generation logic is changed(business
> > > logic changes lead to the threshold getting exceeded), the same job might
> > > be running surprisingly in backlog processing mode, i.e. a butterfly
> > > effect. A comprehensive documentation is required to avoid any confusion
> > > for the users.
> > > 2. Like Jark already mentioned, use cases that do not have watermarks,
> > > like pure processing-time based stream processing[1] are not covered. It
> > is
> > > more or less a trade-off solution that does not support such use cases
> > and
> > > appropriate documentation is required. Forcing them to explicitly
> > generate
> > > watermarks that are never needed just because of this does not sound
> > like a
> > > proper solution.
> > > 3. If I am not mistaken, it only works for use cases where event times
> > are
> > > very close to the processing times, because the wall clock is used to
> > > calculate the watermark lag and the watermark is generated based on the
> > > event time.
> > >
> > > Best regards,
> > > Jing
> > >
> > > [1]
> > >
> > https://github.com/apache/flink/blob/2c50b4e956305426f478b726d4de4a640a16b810/flink-core/s

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-31 Thread Xuannan Su
Hi Hang,

Thanks for the review.

This is a good question. It appears that this particular piece of
information is absent from the FLIP document. If I understand
correctly, the term "From the source" refers to a source that
determines the backlog status based on its state. Both the backlog
statuses are determined at the source operator, based on watermark lag
and based on source state. Only when both backlog statuses are false
does the source operator send out the signal "isBacklog=false"
downstream. Essentially, the backlog statuses are combined using a
logical OR operation. I updated the configuration description to
include such behavior.

I hope that addresses your question.

Best,
Xuannan


On Wed, Aug 30, 2023 at 4:31 PM Hang Ruan  wrote:
>
> Hi, Xuannan.
>
> Thanks for preparing the FLIP.
>
> After this FLIP, we will have two ways to report isProcessingBacklog: 1.
> From the source; 2. Judged by the watermark lag. What is the priority
> between them?
> For example, what is the status isProcessingBacklog when the source report
> `isProcessingBacklog=false` and the watermark lag exceeds the threshold?
>
> Best,
> Hang
>
> Xuannan Su  于2023年8月30日周三 10:06写道:
>
> > Hi Jing,
> >
> > Thank you for the suggestion.
> >
> > The definition of watermark lag is the same as the watermarkLag metric in
> > FLIP-33[1]. More specifically, the watermark lag calculation is computed at
> > the time when a watermark is emitted downstream in the following way:
> > watermarkLag = CurrentTime - Watermark. I have added this description to
> > the FLIP.
> >
> > I hope this addresses your concern.
> >
> > Best,
> > Xuannan
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> >
> >
> > > On Aug 28, 2023, at 01:04, Jing Ge  wrote:
> > >
> > > Hi Xuannan,
> > >
> > > Thanks for the proposal. +1 for me.
> > >
> > > There is one tiny thing that I am not sure if I understand it correctly.
> > > Since there will be many different WatermarkStrategies and different
> > > WatermarkGenerators. Could you please update the FLIP and add the
> > > description of how the watermark lag is calculated exactly? E.g.
> > Watermark
> > > lag = A - B with A is the timestamp of the watermark emitted to the
> > > downstream and B is(this is the part I am not really sure after
> > reading
> > > the FLIP).
> > >
> > > Best regards,
> > > Jing
> > >
> > >
> > > On Mon, Aug 21, 2023 at 9:03 AM Xuannan Su 
> > wrote:
> > >
> > >> Hi Jark,
> > >>
> > >> Thanks for the comments.
> > >>
> > >> I agree that the current solution cannot support jobs that cannot define
> > >> watermarks. However, after considering the pending-record-based
> > solution, I
> > >> believe the current solution is superior for the target use case as it
> > is
> > >> more intuitive for users. The backlog status gives users the ability to
> > >> balance between throughput and latency. Making this trade-off decision
> > >> based on the watermark lag is more intuitive from the user's
> > perspective.
> > >> For instance, a user can decide that if the job lags behind the current
> > >> time by more than 1 hour, the result is not usable. In that case, we can
> > >> optimize for throughput when the data lags behind by more than an hour.
> > >> With the pending-record-based solution, it's challenging for users to
> > >> determine when to optimize for throughput and when to prioritize
> > latency.
> > >>
> > >> Regarding the limitations of the watermark-based solution:
> > >>
> > >> 1. The current solution can support jobs with sources that have event
> > >> time. Users can always define a watermark at the source operator, even
> > if
> > >> it's not used by downstream operators, such as streaming join and
> > unbounded
> > >> aggregate.
> > >>
> > >> 2.I don't believe it's accurate to say that the watermark lag will keep
> > >> increasing if no data is generated in Kafka. The watermark lag and
> > backlog
> > >> status are determined at the moment when the watermark is emitted to the
> > >> downstream operator. If no data is emitted from the source, the
> > watermark
> > >> lag and backlog status will not be updated. If the WatermarkStrategy
> > with
> > >> idleness

Re: [DISCUSS] FLIP-327: Support stream-batch unified operator to improve job throughput when processing backlog data

2023-08-31 Thread Xuannan Su
Hi all,

I would like to share some updates on FLIP-327. Dong and I have had a
series of discussions and have made several refinements to the FLIP.

The major change to the FLIP is to allow the input of the one-input
operator to be automatically sorted during backlog processing. When
combined with the state backend optimization introduced in FLIP-325 [1],
all the keyed single-input operators can achieve similar performance as in
batch mode during backlog processing without any code change to the
operator. We also implemented a POC[2] and conducted benchmark[3] using the
KeyedStream#reduce operation. The benchmark results demonstrate the
performance gains that this FLIP can offer.

I am looking forward to any comments or feedback you may have on this FLIP.

Best,
Xuannan

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-325%3A+Introduce+LRU+cache+to+accelerate+state+backend+access
[2] https://github.com/Sxnan/flink/tree/FLIP-327-demo
[3]
https://github.com/Sxnan/flink/blob/d77d0d3fb268de0a1939944ea4796a112e2d68c0/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/backlog/ReduceBacklogBenchmark.java



> On Aug 18, 2023, at 21:28, Dong Lin  wrote:
>
> Hi Piotr,
>
> Thanks for the explanation.
>
> To recap our offline discussion, there is a concern regarding the
> capability to dynamically switch between stream and batch modes. This
> concern is around unforeseen behaviors such as bugs or performance
> regressions, which we might not yet be aware of yet. The reason for this
> concern is that this feature involves a fundamental impact on the Flink
> runtime's behavior.
>
> Due to the above concern, I agree it is reasonable to annotate related
APIs
> as experimental. This step would provide us with the flexibility to modify
> these APIs if issues arise in the future. This annotation also serves as a
> note to users that this functionality might not perform well as expected.
>
> Though I believe that we can ensure the reliability of this feature
through
> good design and code reviews, comprehensive unit tests, and thorough
> integration testing, I agree that it is reasonable to be extra cautious in
> this case. Also, it should be OK to delay making these APIs as
> non-experimental by 1-2 releases.
>
> I have updated FLIP-327, FLIP-328, and FLIP-331 to mark APIs in these docs
> as experimental. Please let me know if you think any other API should also
> be marked as experimental.
>
> Thanks!
> Dong
>
> On Wed, Aug 16, 2023 at 10:39 PM Piotr Nowojski 
> wrote:
>
>> Hi Dong,
>>
>> Operators API is unfortunately also our public facing API and I mean the
>> APIs that we will add there should also be marked `@Experimental` IMO.
>>
>> The config options should also be marked as experimental (both
>> annotated @Experimental and noted the same thing in the docs,
>> if @Experimental annotation is not automatically mentioned in the docs).
>>
>>> Alternatively, how about we add a doc for
>> checkpointing.interval-during-backlog explaining its impact/concern as
>> discussed above?
>>
>> We should do this independently from marking the APIs/config options as
>> `@Experimental`
>>
>> Best,
>> Piotrek
>>
>> pt., 11 sie 2023 o 14:55 Dong Lin  napisał(a):
>>
>>> Hi Piotr,
>>>
>>> Thanks for the reply!
>>>
>>> On Fri, Aug 11, 2023 at 4:44 PM Piotr Nowojski >>
>>> wrote:
>>>
 Hi,

 Sorry for the long delay in responding!

> Given that it is an optional feature that can be
> turned off by users, it might be OK to just let users try it out and
>> we
 can
> fix performance issues once we detect any of them. What do you think?

 I think it's fine. It would be best to mark this feature as
>> experimental,
 and
 we say that the config keys or the default values might change in the
 future.

>>>
>>> In general I agree we can mark APIs that determine "whether to enable
>>> dynamic switching between stream/batch mode" as experimental.
>>>
>>> However, I am not sure we have such an API yet. The APIs added in this
>> FLIP
>>> are intended to be used by operator developers rather than end users.
End
>>> users can enable this capability by setting
>>> execution.checkpointing.interval-during-backlog = Long.MAX and uses a
>>> source which might implicitly set backlog statu (e.g. HybridSource). So
>>> execution.checkpointing.interval-during-backlog is the only user-facing
>>> APIs that can always control whether this feature can be used.
>>>
>>> However, execution.checkpointing.interval-during-backlog itself is not
>> tied
>>> to FLIP-327.
>>>
>>> Do you mean we should set checkpointing.interval-during-backlog as
>>> experimental? Alternatively, how about we add a doc for
>>> checkpointing.interval-during-backlog explaining its impact/concern as
>>> discussed above?
>>>
>>> Best,
>>> Dong
>>>
>>>
> Maybe we can revisit the need for such a config when we
>>> introduce/discuss
> the capability to switch backlog from false to true in the 

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-30 Thread Xuannan Su
Hi Jing,

Thanks for the reply.

1. You are absolutely right that the watermark lag threshold must be carefully 
set with a thorough understanding of watermark generation. It is crucial for 
users to take into account the WatermarkStrategy when setting the watermark lag 
threshold.

2. Regarding pure processing-time based stream processing jobs, alternative 
strategies will be implemented to determine whether the job is processing 
backlog data. I have outlined two possible strategies below:

- Based on the source operator's state. For example, when MySQL CDC source is 
reading snapshot, it can claim isBacklog=true.
- Based on metrics. For example, when busyTimeMsPerSecond (or 
backPressuredTimeMsPerSecond) > user_specified_threshold, then isBacklog=true.

As of the strategies proposed in this FLIP, it rely on generated watermarks. 
Therefore, if a user intends for the job to detect backlog status based on 
watermark, it is necessary to generate the watermark.

3. I'm afraid I'm not fully grasping your question. From my understanding, it 
should work in both cases. When event times are close to the processing time, 
resulting in watermarks close to the processing time, the job is not processing 
backlog data. On the other hand, when event times are far from processing time, 
causing watermarks to also be distant, if the lag surpasses the defined 
threshold, the job is considered processing backlog data.

Best,
Xuannan


> On Aug 31, 2023, at 02:56, Jing Ge  wrote:
> 
> Hi Xuannan,
> 
> Thanks for the clarification. That is the part where I am trying to
> understand your thoughts. I have some follow-up questions:
> 
> 1. It depends strongly on the watermarkStrategy and how customized
> watermark generation looks like. It mixes business logic with technical
> implementation and technical data processing mode. The value of the
> watermark lag threshold must be set very carefully. If the value is too
> small. any time, when the watermark generation logic is changed(business
> logic changes lead to the threshold getting exceeded), the same job might
> be running surprisingly in backlog processing mode, i.e. a butterfly
> effect. A comprehensive documentation is required to avoid any confusion
> for the users.
> 2. Like Jark already mentioned, use cases that do not have watermarks,
> like pure processing-time based stream processing[1] are not covered. It is
> more or less a trade-off solution that does not support such use cases and
> appropriate documentation is required. Forcing them to explicitly generate
> watermarks that are never needed just because of this does not sound like a
> proper solution.
> 3. If I am not mistaken, it only works for use cases where event times are
> very close to the processing times, because the wall clock is used to
> calculate the watermark lag and the watermark is generated based on the
> event time.
> 
> Best regards,
> Jing
> 
> [1]
> https://github.com/apache/flink/blob/2c50b4e956305426f478b726d4de4a640a16b810/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L236
> 
> On Wed, Aug 30, 2023 at 4:06 AM Xuannan Su  wrote:
> 
>> Hi Jing,
>> 
>> Thank you for the suggestion.
>> 
>> The definition of watermark lag is the same as the watermarkLag metric in
>> FLIP-33[1]. More specifically, the watermark lag calculation is computed at
>> the time when a watermark is emitted downstream in the following way:
>> watermarkLag = CurrentTime - Watermark. I have added this description to
>> the FLIP.
>> 
>> I hope this addresses your concern.
>> 
>> Best,
>> Xuannan
>> 
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
>> 
>> 
>>> On Aug 28, 2023, at 01:04, Jing Ge  wrote:
>>> 
>>> Hi Xuannan,
>>> 
>>> Thanks for the proposal. +1 for me.
>>> 
>>> There is one tiny thing that I am not sure if I understand it correctly.
>>> Since there will be many different WatermarkStrategies and different
>>> WatermarkGenerators. Could you please update the FLIP and add the
>>> description of how the watermark lag is calculated exactly? E.g.
>> Watermark
>>> lag = A - B with A is the timestamp of the watermark emitted to the
>>> downstream and B is(this is the part I am not really sure after
>> reading
>>> the FLIP).
>>> 
>>> Best regards,
>>> Jing
>>> 
>>> 
>>> On Mon, Aug 21, 2023 at 9:03 AM Xuannan Su 
>> wrote:
>>> 
>>>> Hi Jark,
>>>> 
>>>> Thanks for the comments.
>>>> 
>>>> I agree that the current solution cannot support jobs that

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-29 Thread Xuannan Su
Hi Jing,

Thank you for the suggestion.

The definition of watermark lag is the same as the watermarkLag metric in 
FLIP-33[1]. More specifically, the watermark lag calculation is computed at the 
time when a watermark is emitted downstream in the following way: watermarkLag 
= CurrentTime - Watermark. I have added this description to the FLIP.

I hope this addresses your concern.

Best, 
Xuannan

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics


> On Aug 28, 2023, at 01:04, Jing Ge  wrote:
> 
> Hi Xuannan,
> 
> Thanks for the proposal. +1 for me.
> 
> There is one tiny thing that I am not sure if I understand it correctly.
> Since there will be many different WatermarkStrategies and different
> WatermarkGenerators. Could you please update the FLIP and add the
> description of how the watermark lag is calculated exactly? E.g. Watermark
> lag = A - B with A is the timestamp of the watermark emitted to the
> downstream and B is(this is the part I am not really sure after reading
> the FLIP).
> 
> Best regards,
> Jing
> 
> 
> On Mon, Aug 21, 2023 at 9:03 AM Xuannan Su  wrote:
> 
>> Hi Jark,
>> 
>> Thanks for the comments.
>> 
>> I agree that the current solution cannot support jobs that cannot define
>> watermarks. However, after considering the pending-record-based solution, I
>> believe the current solution is superior for the target use case as it is
>> more intuitive for users. The backlog status gives users the ability to
>> balance between throughput and latency. Making this trade-off decision
>> based on the watermark lag is more intuitive from the user's perspective.
>> For instance, a user can decide that if the job lags behind the current
>> time by more than 1 hour, the result is not usable. In that case, we can
>> optimize for throughput when the data lags behind by more than an hour.
>> With the pending-record-based solution, it's challenging for users to
>> determine when to optimize for throughput and when to prioritize latency.
>> 
>> Regarding the limitations of the watermark-based solution:
>> 
>> 1. The current solution can support jobs with sources that have event
>> time. Users can always define a watermark at the source operator, even if
>> it's not used by downstream operators, such as streaming join and unbounded
>> aggregate.
>> 
>> 2.I don't believe it's accurate to say that the watermark lag will keep
>> increasing if no data is generated in Kafka. The watermark lag and backlog
>> status are determined at the moment when the watermark is emitted to the
>> downstream operator. If no data is emitted from the source, the watermark
>> lag and backlog status will not be updated. If the WatermarkStrategy with
>> idleness is used, the source becomes non-backlog when it becomes idle.
>> 
>> 3. I think watermark lag is more intuitive to determine if a job is
>> processing backlog data. Even when using pending records, it faces a
>> similar issue. For example, if the source has 1K pending records, those
>> records can span from 1 day  to 1 hour to 1 second. If the records span 1
>> day, it's probably best to optimize for throughput. If they span 1 hour, it
>> depends on the business logic. If they span 1 second, optimizing for
>> latency is likely the better choice.
>> 
>> In summary, I believe the watermark-based solution is a superior choice
>> for the target use case where watermark/event time can be defined.
>> Additionally, I haven't come across a scenario that requires low-latency
>> processing and reads from a source that cannot define watermarks. If we
>> encounter such a use case, we can create another FLIP to address those
>> needs in the future. What do you think?
>> 
>> 
>> Best,
>> Xuannan
>> 
>> 
>> 
>>> On Aug 20, 2023, at 23:27, Jark Wu > imj...@gmail.com>> wrote:
>>> 
>>> Hi Xuannan,
>>> 
>>> Thanks for opening this discussion.
>>> 
>>> This current proposal may work in the mentioned watermark cases.
>>> However, it seems this is not a general solution for sources to determine
>>> "isProcessingBacklog".
>>> From my point of view, there are 3 limitations of the current proposal:
>>> 1. It doesn't cover jobs that don't have watermark/event-time defined,
>>> for example streaming join and unbounded aggregate. We may still need to
>>> figure out solutions for them.
>>> 2. Watermark lag can not be trusted, because it increases unlimited if no
>>> data is generated in the Kafka.
>>> But in this case, there

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-21 Thread Xuannan Su
Hi Jark,

Thanks for the comments.

I agree that the current solution cannot support jobs that cannot define 
watermarks. However, after considering the pending-record-based solution, I 
believe the current solution is superior for the target use case as it is more 
intuitive for users. The backlog status gives users the ability to balance 
between throughput and latency. Making this trade-off decision based on the 
watermark lag is more intuitive from the user's perspective. For instance, a 
user can decide that if the job lags behind the current time by more than 1 
hour, the result is not usable. In that case, we can optimize for throughput 
when the data lags behind by more than an hour. With the pending-record-based 
solution, it's challenging for users to determine when to optimize for 
throughput and when to prioritize latency.

Regarding the limitations of the watermark-based solution:

1. The current solution can support jobs with sources that have event time. 
Users can always define a watermark at the source operator, even if it's not 
used by downstream operators, such as streaming join and unbounded aggregate.

2.I don't believe it's accurate to say that the watermark lag will keep 
increasing if no data is generated in Kafka. The watermark lag and backlog 
status are determined at the moment when the watermark is emitted to the 
downstream operator. If no data is emitted from the source, the watermark lag 
and backlog status will not be updated. If the WatermarkStrategy with idleness 
is used, the source becomes non-backlog when it becomes idle.

3. I think watermark lag is more intuitive to determine if a job is processing 
backlog data. Even when using pending records, it faces a similar issue. For 
example, if the source has 1K pending records, those records can span from 1 
day  to 1 hour to 1 second. If the records span 1 day, it's probably best to 
optimize for throughput. If they span 1 hour, it depends on the business logic. 
If they span 1 second, optimizing for latency is likely the better choice.

In summary, I believe the watermark-based solution is a superior choice for the 
target use case where watermark/event time can be defined. Additionally, I 
haven't come across a scenario that requires low-latency processing and reads 
from a source that cannot define watermarks. If we encounter such a use case, 
we can create another FLIP to address those needs in the future. What do you 
think?


Best,
Xuannan



> On Aug 20, 2023, at 23:27, Jark Wu  <mailto:imj...@gmail.com>> wrote:
> 
> Hi Xuannan,
> 
> Thanks for opening this discussion.
> 
> This current proposal may work in the mentioned watermark cases.
> However, it seems this is not a general solution for sources to determine
> "isProcessingBacklog".
> From my point of view, there are 3 limitations of the current proposal:
> 1. It doesn't cover jobs that don't have watermark/event-time defined,
> for example streaming join and unbounded aggregate. We may still need to
> figure out solutions for them.
> 2. Watermark lag can not be trusted, because it increases unlimited if no
> data is generated in the Kafka.
> But in this case, there is no backlog at all.
> 3. Watermark lag is hard to reflect the amount of backlog. If the watermark
> lag is 1day or 1 hour or 1second,
> there is possibly only 1 pending record there, which means no backlog at
> all.
> 
> Therefore, IMO, watermark maybe not the ideal metric used to determine
> "isProcessingBacklog".
> What we need is something that reflects the number of records unprocessed
> by the job.
> Actually, that is the "pendingRecords" metric proposed in FLIP-33 and has
> been implemented by Kafka source.
> Did you consider using "pendingRecords" metric to determine
> "isProcessingBacklog"?
> 
> Best,
> Jark
> 
> 
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
>  
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics>
> 
> 
> 
> On Tue, 15 Aug 2023 at 12:04, Xintong Song  <mailto:tonysong...@gmail.com>> wrote:
> 
>> Sounds good to me.
>> 
>> It is true that, if we are introducing the generalized watermark, there
>> will be other watermark related concepts / configurations that need to be
>> updated anyway.
>> 
>> 
>> Best,
>> 
>> Xintong
>> 
>> 
>> 
>> On Tue, Aug 15, 2023 at 11:30 AM Xuannan Su > <mailto:suxuanna...@gmail.com>> wrote:
>> 
>>> Hi Xingtong,
>>> 
>>> Thank you for your suggestion.
>>> 
>>> After considering the idea of using a general configuration key, I think
>>> it may not be a good idea for the reasons below.
>>>

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-14 Thread Xuannan Su
Hi Xingtong,

Thank you for your suggestion.

After considering the idea of using a general configuration key, I think it may 
not be a good idea for the reasons below.

While I agree that using a more general configuration key provides us with the 
flexibility to switch to other approaches to calculate the lag in the future, 
the downside is that it may cause confusion for users. We currently have 
fetchEventTimeLag, emitEventTimeLag, and watermarkLag in the source, and it is 
not clear which specific lag we are referring to. With the potential 
introduction of the Generalized Watermark mechanism in the future, if I 
understand correctly, a watermark won't necessarily need to be a timestamp. I 
am concern that the general configuration key may not  be enough to cover all 
the use case and we will need to introduce a general way to determine the 
backlog status regardless.

For the reasons above, I prefer introducing the configuration as is, and change 
it later with the a deprecation process or migration process. What do you think?

Best,
Xuannan
On Aug 14, 2023, 14:09 +0800, Xintong Song , wrote:
> Thanks for the explanation.
>
> I wonder if it makes sense to not expose this detail via the configuration
> option. To be specific, I suggest not mentioning the "watermark" keyword in
> the configuration key and description.
>
> - From the users' perspective, I think they only need to know there's a
> lag higher than the given threshold, Flink will consider latency of
> individual records as less important and prioritize throughput over it.
> They don't really need the details of how the lags are calculated.
> - For the internal implementation, I also think using watermark lags is
> a good idea, for the reasons you've already mentioned. However, it's not
> the only possible option. Hiding this detail from users would give us the
> flexibility to switch to other approaches if needed in future.
> - We are currently working on designing the ProcessFunction API
> (consider it as a DataStream API V2). There's an idea to introduce a
> Generalized Watermark mechanism, where basically the watermark can be
> anything that needs to travel along the data-flow with certain alignment
> strategies, and event time watermark would be one specific case of it. This
> is still an idea and has not been discussed and agreed on by the community,
> and we are preparing a FLIP for it. But if we are going for it, the concept
> "watermark-lag-threshold" could be ambiguous.
>
> I do not intend to block the FLIP on this. I'd also be fine with
> introducing the configuration as is, and changing it later, if needed, with
> a regular deprecation and migration process. Just making my suggestions.
>
>
> Best,
>
> Xintong
>
>
>
> On Mon, Aug 14, 2023 at 12:00 PM Xuannan Su  wrote:
>
> > Hi Xintong,
> >
> > Thanks for the reply.
> >
> > I have considered using the timestamp in the records to determine the
> > backlog status, and decided to use watermark at the end. By definition,
> > watermark is the time progress indication in the data stream. It indicates
> > the stream’s event time has progressed to some specific time. On the other
> > hand, timestamp in the records is usually used to generate the watermark.
> > Therefore, it appears more appropriate and intuitive to calculate the event
> > time lag by watermark and determine the backlog status. And by using the
> > watermark, we can easily deal with the out-of-order and the idleness of the
> > data.
> >
> > Please let me know if you have further questions.
> >
> > Best,
> > Xuannan
> > On Aug 10, 2023, 20:23 +0800, Xintong Song , wrote:
> > > Thanks for preparing the FLIP, Xuannan.
> > >
> > > +1 in general.
> > >
> > > A quick question, could you explain why we are relying on the watermark
> > for
> > > emitting the record attribute? Why not use timestamps in the records? I
> > > don't see any concern in using watermarks. Just wondering if there's any
> > > deep considerations behind this.
> > >
> > > Best,
> > >
> > > Xintong
> > >
> > >
> > >
> > > On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su  wrote:
> > >
> > > > Hi all,
> > > >
> > > > I am opening this thread to discuss FLIP-328: Allow source operators to
> > > > determine isProcessingBacklog based on watermark lag[1]. We had a
> > several
> > > > discussions with Dong Ling about the design, and thanks for all the
> > > > valuable advice.
> > > >
> > > > The FLIP aims to target the use-case where user want to run a Flink
> > job

Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-13 Thread Xuannan Su
Hi Xintong,

Thanks for the reply.

I have considered using the timestamp in the records to determine the backlog 
status, and decided to use watermark at the end. By definition, watermark is 
the time progress indication in the data stream. It indicates the stream’s 
event time has progressed to some specific time. On the other hand, timestamp 
in the records is usually used to generate the watermark. Therefore, it appears 
more appropriate and intuitive to calculate the event time lag by watermark and 
determine the backlog status. And by using the watermark, we can easily deal 
with the out-of-order and the idleness of the data.

Please let me know if you have further questions.

Best,
Xuannan
On Aug 10, 2023, 20:23 +0800, Xintong Song , wrote:
> Thanks for preparing the FLIP, Xuannan.
>
> +1 in general.
>
> A quick question, could you explain why we are relying on the watermark for
> emitting the record attribute? Why not use timestamps in the records? I
> don't see any concern in using watermarks. Just wondering if there's any
> deep considerations behind this.
>
> Best,
>
> Xintong
>
>
>
> On Thu, Aug 3, 2023 at 3:03 PM Xuannan Su  wrote:
>
> > Hi all,
> >
> > I am opening this thread to discuss FLIP-328: Allow source operators to
> > determine isProcessingBacklog based on watermark lag[1]. We had a several
> > discussions with Dong Ling about the design, and thanks for all the
> > valuable advice.
> >
> > The FLIP aims to target the use-case where user want to run a Flink job to
> > backfill historical data in a high throughput manner and continue
> > processing real-time data with low latency. Building upon the backlog
> > concept introduced in FLIP-309[2], this proposal enables sources to report
> > their status of processing backlog based on the watermark lag.
> >
> > We would greatly appreciate any comments or feedback you may have on this
> > proposal.
> >
> > Best,
> > Xuannan
> >
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> > [2]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> >


[DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-03 Thread Xuannan Su
Hi all,

I am opening this thread to discuss FLIP-328: Allow source operators to 
determine isProcessingBacklog based on watermark lag[1]. We had a several 
discussions with Dong Ling about the design, and thanks for all the valuable 
advice.

The FLIP aims to target the use-case where user want to run a Flink job to 
backfill historical data in a high throughput manner and continue processing 
real-time data with low latency. Building upon the backlog concept introduced 
in FLIP-309[2], this proposal enables sources to report their status of 
processing backlog based on the watermark lag.

We would greatly appreciate any comments or feedback you may have on this 
proposal.

Best,
Xuannan


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog


Re: [DISCUSS] FLIP-329: Add operator attribute to specify support for object-reuse

2023-07-05 Thread Xuannan Su
Hi Jing Ge,

Thank you for your valuable comments!

1. I agree with your suggestion regarding following the JavaBean
convention. It would be beneficial to incorporate this convention into
our Code Style Guide [1]. By doing so, we can ensure consistency and
make it easier for developers to adhere to the standard.

2. Yes, you are correct that the results remain the same. From my
understanding, this can be considered an implementation detail.

3. By the current design, when objectReuseCompliant = false, it
actually signifies an unknown state, and the decision to use object
reuse depends on the global configuration, pipeline.object-reuse. And
objectReuseCompliant = false is the default value of a StreamOperator.
On the other hand, when objectReuseCompliant = true, it indicates that
object reuse can be employed. And I don't think we require a value
that specifically enforces deep copying since we will enable object
reuse for all operators when pipeline.object-reuse = true to maintain
the current behavior.

Once again, thank you for your input, and let me know if there's
anything else I can assist you with.

Best regards,
Xuannan

[1] https://flink.apache.org/how-to-contribute/code-style-and-quality-preamble/

On Wed, Jul 5, 2023 at 1:37 AM Jing Ge  wrote:
>
> Hi Xuannan, Hi Dong
>
> Thanks for the Proposal! After reading the FLIP, I'd like to ask some
> questions:
>
> 1. Naming convention for boolean variables. It is recommended to follow
> JavaBean [1], i.e. objectReuseCompliant as the variable name with
> isObjectReuseCompliant() and setObjectReuseCompliant() as the methods' name.
>
>
> 2.
>
>-
>
>*If pipeline.object-reuse is set to true, records emitted by this
>operator will be re-used.*
>-
>
>*Otherwise, if getIsObjectReuseCompliant() returns true, records emitted
>by this operator will be re-used.*
>-
>
>*Otherwise, records emitted by this operator will be deep-copied before
>being given to the next operator in the chain.*
>
>
> If I understand you correctly,  the hard coding objectReusedCompliant
> should have higher priority over the configuration, the checking logic
> should be:
>
>-
>
>*If getIsObjectReuseCompliant() returns true, records emitted by this
>operator will be re-used.*
>-
>
>*Otherwise, if pipeline.object-reuse is set to true, records emitted by
>this operator will be re-used.*
>-
>
>*Otherwise, records emitted by this operator will be deep-copied before
>being given to the next operator in the chain.*
>
>
> The results are the same but the checking logics are different, but there
> are some additional thoughts, which lead us to the next question.
>
> 3. Current design lets specific operators enable object reuse and ignore
> the global config. There could be another thought, on the contrary: if an
> operator has hard coded the objectReuseCompliant as false, i.e. disable
> object reuse on purpose, records should not be reused even if the global
> config pipeline.object-reused is set to true, which turns out that the
> objectReuseCompliant could be a triple value logic: ture: force object
> reusing; false: force deep-copying; unknown: depends on
> pipeline.object-reuse config.
>
>
> Best regards,
> Jing
>
>
> [1] https://en.wikipedia.org/wiki/JavaBeans
>
> On Mon, Jul 3, 2023 at 4:25 AM Xuannan Su  wrote:
>
> > Hi all,
> >
> > Dong(cc'ed) and I are opening this thread to discuss our proposal to
> > add operator attribute to allow operator to specify support for
> > object-reuse [1].
> >
> > Currently, the default configuration for pipeline.object-reuse is set
> > to false to avoid data corruption, which can result in suboptimal
> > performance. We propose adding APIs that operators can utilize to
> > inform the Flink runtime whether it is safe to reuse the emitted
> > records. This enhancement would enable Flink to maximize its
> > performance using the default configuration.
> >
> > Please refer to the FLIP document for more details about the proposed
> > design and implementation. We welcome any feedback and opinions on
> > this proposal.
> >
> > Best regards,
> >
> > Dong and Xuannan
> >
> > [1]
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255073749
> >


[DISCUSS] FLIP-329: Add operator attribute to specify support for object-reuse

2023-07-02 Thread Xuannan Su
Hi all,

Dong(cc'ed) and I are opening this thread to discuss our proposal to
add operator attribute to allow operator to specify support for
object-reuse [1].

Currently, the default configuration for pipeline.object-reuse is set
to false to avoid data corruption, which can result in suboptimal
performance. We propose adding APIs that operators can utilize to
inform the Flink runtime whether it is safe to reuse the emitted
records. This enhancement would enable Flink to maximize its
performance using the default configuration.

Please refer to the FLIP document for more details about the proposed
design and implementation. We welcome any feedback and opinions on
this proposal.

Best regards,

Dong and Xuannan

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255073749


[jira] [Created] (FLINK-32476) Support configuring object-reuse for internal operators

2023-06-28 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-32476:
--

 Summary: Support configuring object-reuse for internal operators
 Key: FLINK-32476
 URL: https://issues.apache.org/jira/browse/FLINK-32476
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Task
Reporter: Xuannan Su


Currently, object reuse is disabled by default for streaming jobs in order to 
prevent unexpected behavior. Object reuse becomes problematic when the upstream 
operator stores its output while the downstream operator modifies the input.

However, many operators implemented by Flink, such as Flink SQL operators, do 
not modify the input. This implies that it is safe to reuse the input object in 
such cases. Therefore, we intend to enable object reuse specifically for 
operators that do not modify the input.

As the first step, we will focus on the operators implemented within Flink. We 
will create the FLIP to introduce the API that allows user-defined operators to 
enable object reuse in the future.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[DISCUSS] FLIP-326: Enhance Watermark to Support Processing-Time Temporal Join

2023-06-25 Thread Xuannan Su
Hi all,

Dong(cc'ed) and I are opening this thread to discuss our proposal to
enhance the watermark to properly support processing-time temporal
join, which has been documented in FLIP-326 [1].

We want to support the use case where the records from the probe side
of the processing-time temporal join need to wait until the build side
finishes the snapshot phrase by enhancing the expressiveness of the
Watermark. Additionally, these changes lay the groundwork for
simplifying the DataStream APIs, eliminating the need for users to
explicitly differentiate between event-time and processing-time,
resulting in a more intuitive user experience.

Please refer to the FLIP document for more details about the proposed
design and implementation. We welcome any feedback and opinions on
this proposal.

Best regards,

Dong and Xuannan

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-326%3A+Enhance+Watermark+to+Support+Processing-Time+Temporal+Join


[jira] [Created] (FLINK-32008) Protobuf format throws exception with Map datatype

2023-05-05 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-32008:
--

 Summary: Protobuf format throws exception with Map datatype
 Key: FLINK-32008
 URL: https://issues.apache.org/jira/browse/FLINK-32008
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.17.0
Reporter: Xuannan Su
 Attachments: flink-protobuf-example.zip

The protobuf format throws exception when working with Map data type. I 
uploaded a example project to reproduce the problem.

 
{code:java}
Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
exception
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:261)
    at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
    at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131)
    at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417)
    at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
unexpected exception while polling the records
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Caused by: java.io.IOException: Failed to deserialize PB object.
    at 
org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:75)
    at 
org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:42)
    at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
    at 
org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.readRecord(DeserializationSchemaAdapter.java:197)
    at 
org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.nextRecord(DeserializationSchemaAdapter.java:210)
    at 
org.apache.flink.connector.file.table.DeserializationSchemaAdapter$Reader.readBatch(DeserializationSchemaAdapter.java:124)
    at 
org.apache.flink.connector.file.src.util.RecordMapperWrapperRecordIterator$1.readBatch(RecordMapperWrapperRecordIterator.java:82)
    at 
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67)
    at 
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
    ... 6 more
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.convertProtoBinaryToRow(ProtoToRowConverter.java:129)
    at 
org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:70)
    ... 15 more
Caused by: com.google.protobuf.InvalidProtocolBufferException: While parsing a 
protocol message, the input ended unexpectedly in the middle of a field.  This 
could mean either that the input has been truncated or that an embedded

[jira] [Created] (FLINK-31944) Protobuf format throw com.google.protobuf.InvalidProtocolBufferException

2023-04-26 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-31944:
--

 Summary: Protobuf format throw 
com.google.protobuf.InvalidProtocolBufferException
 Key: FLINK-31944
 URL: https://issues.apache.org/jira/browse/FLINK-31944
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.17.0
Reporter: Xuannan Su
 Attachments: flink-protobuf-example.zip

It seems that protobuf format throws the following exception when the first 
field of the message is string type. This may also occur for other types. I 
uploaded the maven project to reproduce the problem.

 
{code:java}
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
unexpected exception while polling the records
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Caused by: java.io.IOException: Failed to deserialize PB object.
    at 
org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:75)
    at 
org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:42)
    at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
    at 
org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.readRecord(DeserializationSchemaAdapter.java:197)
    at 
org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.nextRecord(DeserializationSchemaAdapter.java:210)
    at 
org.apache.flink.connector.file.table.DeserializationSchemaAdapter$Reader.readBatch(DeserializationSchemaAdapter.java:124)
    at 
org.apache.flink.connector.file.src.util.RecordMapperWrapperRecordIterator$1.readBatch(RecordMapperWrapperRecordIterator.java:82)
    at 
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67)
    at 
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
    ... 6 more
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.convertProtoBinaryToRow(ProtoToRowConverter.java:129)
    at 
org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:70)
    ... 15 more
Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message 
contained an invalid tag (zero).
    at 
com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:133)
    at 
com.google.protobuf.CodedInputStream$ArrayDecoder.readTag(CodedInputStream.java:633)
    at com.example.proto.Message.(Message.java:47)
    at com.example.proto.Message.(Message.java:9)
    at com.example.proto.Message$1.parsePartialFrom(Message.java:540)
    at com.example.proto.Message$1.parsePartialFrom(Message.java:534)
    at 
com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:158)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:191)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:203)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:208)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48)
    at com.example.proto.Message.parseFrom(Message.java:218)
    ... 21 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31943) Multiple t_env cause class loading problem

2023-04-25 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-31943:
--

 Summary: Multiple t_env cause class loading problem
 Key: FLINK-31943
 URL: https://issues.apache.org/jira/browse/FLINK-31943
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.17.0
Reporter: Xuannan Su
 Attachments: flink-sql-connector-kafka-1.17.0.jar, 
pyflink_classloader.py

When a PyFlink process creates multiple StreamTableEnvironment with different 
EnvironmentSettings and sets the "pipeline.jars" at the first created t_env, it 
appears that the jar is not added to the classloader of the first created t_env.

 

After digging a little bit, the reason may be that when creating the second 
table environment with a new EnvironmentSettings, the context classloader 
overwrites by a new classloader, see `EnvironmentSettings.Builder.build` method.

 

I uploaded the script to reproduce the problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30607) Table.to_pandas doesn't support Map type

2023-01-09 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-30607:
--

 Summary: Table.to_pandas doesn't support Map type
 Key: FLINK-30607
 URL: https://issues.apache.org/jira/browse/FLINK-30607
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.15.3
Reporter: Xuannan Su


It seems that the Table#to_pandas method in PyFlink doesn't support Map type. 
It throws the following exception.
{code:java}
py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame.
: java.lang.UnsupportedOperationException: Python vectorized UDF doesn't 
support logical type MAP currently.
    at 
org.apache.flink.table.runtime.arrow.ArrowUtils$LogicalTypeToArrowTypeConverter.defaultMethod(ArrowUtils.java:743)
    at 
org.apache.flink.table.runtime.arrow.ArrowUtils$LogicalTypeToArrowTypeConverter.defaultMethod(ArrowUtils.java:617)
    at 
org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:167)
    at org.apache.flink.table.types.logical.MapType.accept(MapType.java:115)
    at 
org.apache.flink.table.runtime.arrow.ArrowUtils.toArrowField(ArrowUtils.java:189)
    at 
org.apache.flink.table.runtime.arrow.ArrowUtils.lambda$toArrowSchema$0(ArrowUtils.java:180)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
    at 
org.apache.flink.table.runtime.arrow.ArrowUtils.toArrowSchema(ArrowUtils.java:181)
    at 
org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame(ArrowUtils.java:483)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
    at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748) {code}
This can be reproduced with the following code.
{code:java}
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
table = t_env.from_descriptor(
TableDescriptor.for_connector("datagen")
.schema(
Schema.new_builder()
.column("val", DataTypes.MAP(DataTypes.INT(), DataTypes.INT()))
.build()
)
.option("number-of-rows", "10")
.build()
)
df = table.to_pandas()
print(df) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30258) PyFlink supports closing loop back server

2022-12-01 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-30258:
--

 Summary: PyFlink supports closing loop back server
 Key: FLINK-30258
 URL: https://issues.apache.org/jira/browse/FLINK-30258
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Affects Versions: 1.16.0
Reporter: Xuannan Su


Currently, a loopback server will be started whenever a 
StreamExecutionEnvironment or StreamTableEnvironment is created. The loopback 
server can only be closed after the process exit. This might not be a problem 
for regular uses where only one environment object is used.

However, when running tests, such as the unit tests for PyFlink itself, as the 
environment objects are created, the process starts more and more loopback 
servers and takes more and more resources.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >