Re: [DISCUSS] FLIP-470: Support Adaptive Broadcast Join

2024-08-04 Thread Xia Sun
Hi all,

Thanks for all the feedback and suggestions so far. If there are no
further comments, we will open the voting thread tomorrow, Aug 6, 2024.

Best,
Xia

Lincoln Lee  于2024年8月1日周四 22:18写道:

> Hi Xia,
>
> Thanks for your updates! Looks good to me.
>
> Best,
> Lincoln Lee
>
>
> Xia Sun  于2024年8月1日周四 11:15写道:
>
> > Hi Lincoln,
> >
> > Thanks for your detailed explanation. I understand your concern.
> > Introducing configuration with redundant semantics can indeed confuse
> > users, and the engine should minimize user exposure to these details.
> Based
> > on this premise, while also ensuring that users can choose to enable the
> > broadcast hash join optimization during either the compile-time or
> runtime,
> > I think we can introduce a new configuration
> > `table.optimizer.adaptive-broadcast-join.strategy`, and reuse the
> existing
> > configuration `table.optimizer.join.broadcast-threshold` as a unified
> > threshold for determining broadcast hash join optimization. The
> > `table.optimizer.adaptive-broadcast-join.strategy` configuration would be
> > of an enumeration type with three options:
> >
> > AUTO: Flink will autonomously select the optimal timing for the
> > optimization.
> > RUNTIME_ONLY: The broadcast hash join optimization will only be performed
> > at runtime.
> > NONE: The broadcast hash join optimization will only be performed at
> > compile phase.
> > And AUTO will be the default option.
> >
> > I have also updated this information in FLIP, PTAL.
> >
> > Best,
> > Xia
> >
> > Lincoln Lee  于2024年7月30日周二 23:39写道:
> >
> > > Thanks Xia for your explanation!
> > >
> > > I can understand your concern, but considering the design of this FLIP,
> > > which already covers compile-time inaccurate optimization for runtime
> > > de-optimization, is it necessary to make the user manually turn off
> > > 'table.optimizer.join.broadcast-threshold' and set the new
> > > 'table.optimizer.adaptive.join.broadcast-threshold' again? Another
> option
> > > is that users only need to focus on the existing broadcast size
> > threshold,
> > > and accept the reality that 100% accurate optimization cannot be done
> > > at compile time, and adopt the new capability of dynamic optimization
> at
> > > runtime, and ultimately, users will trust that flink will always
> optimize
> > > accurately, and from this point of view, I would prefer a generic
> > parameter
> > > 'table.optimizer. adaptive-optimization.enabled', which would allow for
> > > more dynamic optimization in the future, not limited to broadcast join
> > > scenarios and will not continuously bring more new options, WDYT?
> > >
> > >
> > > Best,
> > > Lincoln Lee
> > >
> > >
> > > Xia Sun  于2024年7月30日周二 11:27写道:
> > >
> > > > Hi Lincoln,
> > > >
> > > > Thank you for your input and participation in the discussion!
> > > >
> > > > Compared to introducing the 'table.optimizer.adaptive-join.enabled'
> > > option,
> > > > introducing the "table.optimizer.adaptive.join.broadcast-threshold"
> can
> > > > also cover the need to disable static broadcast optimization while
> only
> > > > enabling dynamic broadcast optimization. From this perspective,
> > > introducing
> > > > a new threshold configuration might be more appropriate. What do you
> > > think?
> > > >
> > > > Best,
> > > > Xia
> > > >
> > > > Lincoln Lee  于2024年7月29日周一 23:12写道:
> > > >
> > > > > +1 for this useful optimization!
> > > > >
> > > > > I have a question about the new optoin, do we really need two
> > broadcast
> > > > > join thresholds? IIUC, this adaptive broadcast join is a complement
> > to
> > > > > compile-time optimization, there is no need for the user to
> configure
> > > two
> > > > > different thresholds (not the off represented by -1), so we just
> want
> > > to
> > > > > control the adaptive optimization itself, should we provide a
> > > > configuration
> > > > > option like 'table.optimizer.adaptive-join.enabled' or a more
> general
> > > one
> > > > > 'table.optimizer.adaptive-optimization.enabled' for such related
> > > > > optimizations?
> > > > >
> > > > >
> &

Re: [DISCUSS] FLIP-470: Support Adaptive Broadcast Join

2024-07-31 Thread Xia Sun
Hi Lincoln,

Thanks for your detailed explanation. I understand your concern.
Introducing configuration with redundant semantics can indeed confuse
users, and the engine should minimize user exposure to these details. Based
on this premise, while also ensuring that users can choose to enable the
broadcast hash join optimization during either the compile-time or runtime,
I think we can introduce a new configuration
`table.optimizer.adaptive-broadcast-join.strategy`, and reuse the existing
configuration `table.optimizer.join.broadcast-threshold` as a unified
threshold for determining broadcast hash join optimization. The
`table.optimizer.adaptive-broadcast-join.strategy` configuration would be
of an enumeration type with three options:

AUTO: Flink will autonomously select the optimal timing for the
optimization.
RUNTIME_ONLY: The broadcast hash join optimization will only be performed
at runtime.
NONE: The broadcast hash join optimization will only be performed at
compile phase.
And AUTO will be the default option.

I have also updated this information in FLIP, PTAL.

Best,
Xia

Lincoln Lee  于2024年7月30日周二 23:39写道:

> Thanks Xia for your explanation!
>
> I can understand your concern, but considering the design of this FLIP,
> which already covers compile-time inaccurate optimization for runtime
> de-optimization, is it necessary to make the user manually turn off
> 'table.optimizer.join.broadcast-threshold' and set the new
> 'table.optimizer.adaptive.join.broadcast-threshold' again? Another option
> is that users only need to focus on the existing broadcast size threshold,
> and accept the reality that 100% accurate optimization cannot be done
> at compile time, and adopt the new capability of dynamic optimization at
> runtime, and ultimately, users will trust that flink will always optimize
> accurately, and from this point of view, I would prefer a generic parameter
> 'table.optimizer. adaptive-optimization.enabled', which would allow for
> more dynamic optimization in the future, not limited to broadcast join
> scenarios and will not continuously bring more new options, WDYT?
>
>
> Best,
> Lincoln Lee
>
>
> Xia Sun  于2024年7月30日周二 11:27写道:
>
> > Hi Lincoln,
> >
> > Thank you for your input and participation in the discussion!
> >
> > Compared to introducing the 'table.optimizer.adaptive-join.enabled'
> option,
> > introducing the "table.optimizer.adaptive.join.broadcast-threshold" can
> > also cover the need to disable static broadcast optimization while only
> > enabling dynamic broadcast optimization. From this perspective,
> introducing
> > a new threshold configuration might be more appropriate. What do you
> think?
> >
> > Best,
> > Xia
> >
> > Lincoln Lee  于2024年7月29日周一 23:12写道:
> >
> > > +1 for this useful optimization!
> > >
> > > I have a question about the new optoin, do we really need two broadcast
> > > join thresholds? IIUC, this adaptive broadcast join is a complement to
> > > compile-time optimization, there is no need for the user to configure
> two
> > > different thresholds (not the off represented by -1), so we just want
> to
> > > control the adaptive optimization itself, should we provide a
> > configuration
> > > option like 'table.optimizer.adaptive-join.enabled' or a more general
> one
> > > 'table.optimizer.adaptive-optimization.enabled' for such related
> > > optimizations?
> > >
> > >
> > > Best,
> > > Lincoln Lee
> > >
> > >
> > > Ron Liu  于2024年7月26日周五 11:59写道:
> > >
> > > > Hi, Xia
> > > >
> > > > Thanks for your reply. It looks good to me.
> > > >
> > > >
> > > > Best,
> > > > Ron
> > > >
> > > > Xia Sun  于2024年7月26日周五 10:49写道:
> > > >
> > > > > Hi Ron,
> > > > >
> > > > > Thanks for your feedback!
> > > > >
> > > > > -> creation of the join operators until runtime
> > > > >
> > > > >
> > > > > That means when creating the AdaptiveJoinOperatorFactory, we will
> not
> > > > > immediately create the JoinOperator. Instead, we only pass in the
> > > > necessary
> > > > > parameters for creating the JoinOperator. The appropriate
> > JoinOperator
> > > > will
> > > > > be created during the StreamGraphOptimizationStrategy optimization
> > > phase.
> > > > >
> > > > > You mentioned that the runtime's visibility into the table planner
> is
> > > > > indeed an iss

Re: [DISCUSS] FLIP-470: Support Adaptive Broadcast Join

2024-07-29 Thread Xia Sun
Hi Lincoln,

Thank you for your input and participation in the discussion!

Compared to introducing the 'table.optimizer.adaptive-join.enabled' option,
introducing the "table.optimizer.adaptive.join.broadcast-threshold" can
also cover the need to disable static broadcast optimization while only
enabling dynamic broadcast optimization. From this perspective, introducing
a new threshold configuration might be more appropriate. What do you think?

Best,
Xia

Lincoln Lee  于2024年7月29日周一 23:12写道:

> +1 for this useful optimization!
>
> I have a question about the new optoin, do we really need two broadcast
> join thresholds? IIUC, this adaptive broadcast join is a complement to
> compile-time optimization, there is no need for the user to configure two
> different thresholds (not the off represented by -1), so we just want to
> control the adaptive optimization itself, should we provide a configuration
> option like 'table.optimizer.adaptive-join.enabled' or a more general one
> 'table.optimizer.adaptive-optimization.enabled' for such related
> optimizations?
>
>
> Best,
> Lincoln Lee
>
>
> Ron Liu  于2024年7月26日周五 11:59写道:
>
> > Hi, Xia
> >
> > Thanks for your reply. It looks good to me.
> >
> >
> > Best,
> > Ron
> >
> > Xia Sun  于2024年7月26日周五 10:49写道:
> >
> > > Hi Ron,
> > >
> > > Thanks for your feedback!
> > >
> > > -> creation of the join operators until runtime
> > >
> > >
> > > That means when creating the AdaptiveJoinOperatorFactory, we will not
> > > immediately create the JoinOperator. Instead, we only pass in the
> > necessary
> > > parameters for creating the JoinOperator. The appropriate JoinOperator
> > will
> > > be created during the StreamGraphOptimizationStrategy optimization
> phase.
> > >
> > > You mentioned that the runtime's visibility into the table planner is
> > > indeed an issue. It includes two aspects,
> > > (1) we plan to place both implementations of the
> > > AdaptiveBroadcastJoinOptimizationStrategy and
> AdaptiveJoinOperatorFactory
> > > in the table layer. During the runtime phase, we will obtain the
> > > AdaptiveBroadcastJoinOptimizationStrategy through class loading.
> > Therefore,
> > > the flink-runtime does not need to be aware of the table layer's
> > > implementation.
> > > (2) Since the dynamic codegen in the AdaptiveJoinOperatorFactory needs
> to
> > > be aware of the table planner, we will consider placing the
> > > AdaptiveJoinOperatorFactory in the table planner module as well.
> > >
> > >
> > >  -> When did you configure these optimization strategies uniformly into
> > > > `execution.batch.adaptive.stream-graph-optimization.strategies`
> > >
> > >
> > > Thank you for pointing out this issue. When there are multiple
> > > StreamGraphOptimizationStrategies, the optimization order at the
> runtime
> > > phase will strictly follow the order specified in the configuration
> > option
> > > `execution.batch.adaptive.stream-graph-optimization.strategies`.
> > Therefore,
> > > it is necessary to have a unified configuration during the sql planner
> > > phase to ensure the correct optimization order. Currently, we are
> > > considering performing this unified configuration in
> > > BatchPlanner#afterTranslation().
> > >
> > > For simplicity, as long as the adaptive broadcast join/skewed join
> > > optimization features are enabled (e.g.,
> > > `table.optimizer.adaptive.join.broadcast-threshold` is not -1), the
> > > corresponding strategy will be configured. This optimization is
> > independent
> > > of the specific SQL query, although it might not produce any actual
> > effect.
> > >
> > > Best,
> > > Xia
> > >
> > > Ron Liu  于2024年7月24日周三 14:10写道:
> > >
> > > > Hi, Xia
> > > >
> > > > This FLIP looks good to me, +1.
> > > >
> > > > I've two questions:
> > > >
> > > > 1.
> > > > >> Accordingly, in terms of implementation, we will delay the codegen
> > and
> > > > creation of the join operators until runtime.
> > > >
> > > > How are you delaying codegen to runtime, the current runtime is not
> SQL
> > > > planner aware. in other words, how do I understand this sentence?
> > > >
> > > > 2. FLIP-469 mentions passing StreamGraphOptimizationStrategy to
> runtime
> > &

Re: [DISCUSS] FLIP-470: Support Adaptive Broadcast Join

2024-07-25 Thread Xia Sun
Hi Ron,

Thanks for your feedback!

-> creation of the join operators until runtime


That means when creating the AdaptiveJoinOperatorFactory, we will not
immediately create the JoinOperator. Instead, we only pass in the necessary
parameters for creating the JoinOperator. The appropriate JoinOperator will
be created during the StreamGraphOptimizationStrategy optimization phase.

You mentioned that the runtime's visibility into the table planner is
indeed an issue. It includes two aspects,
(1) we plan to place both implementations of the
AdaptiveBroadcastJoinOptimizationStrategy and AdaptiveJoinOperatorFactory
in the table layer. During the runtime phase, we will obtain the
AdaptiveBroadcastJoinOptimizationStrategy through class loading. Therefore,
the flink-runtime does not need to be aware of the table layer's
implementation.
(2) Since the dynamic codegen in the AdaptiveJoinOperatorFactory needs to
be aware of the table planner, we will consider placing the
AdaptiveJoinOperatorFactory in the table planner module as well.


 -> When did you configure these optimization strategies uniformly into
> `execution.batch.adaptive.stream-graph-optimization.strategies`


Thank you for pointing out this issue. When there are multiple
StreamGraphOptimizationStrategies, the optimization order at the runtime
phase will strictly follow the order specified in the configuration option
`execution.batch.adaptive.stream-graph-optimization.strategies`. Therefore,
it is necessary to have a unified configuration during the sql planner
phase to ensure the correct optimization order. Currently, we are
considering performing this unified configuration in
BatchPlanner#afterTranslation().

For simplicity, as long as the adaptive broadcast join/skewed join
optimization features are enabled (e.g.,
`table.optimizer.adaptive.join.broadcast-threshold` is not -1), the
corresponding strategy will be configured. This optimization is independent
of the specific SQL query, although it might not produce any actual effect.

Best,
Xia

Ron Liu  于2024年7月24日周三 14:10写道:

> Hi, Xia
>
> This FLIP looks good to me, +1.
>
> I've two questions:
>
> 1.
> >> Accordingly, in terms of implementation, we will delay the codegen and
> creation of the join operators until runtime.
>
> How are you delaying codegen to runtime, the current runtime is not SQL
> planner aware. in other words, how do I understand this sentence?
>
> 2. FLIP-469 mentions passing StreamGraphOptimizationStrategy to runtime via
> option `execution.batch.adaptive.stream-graph-optimization.strategies`. In
> SQL planner if you have multiple different optimization strategies like
> broadcast join, skew join, etc...  When did you configure these
> optimization strategies uniformly into
> `execution.batch.adaptive.stream-graph-optimization.strategies`?
>
>
>
> Zhu Zhu  于2024年7月19日周五 17:41写道:
>
> > +1 for the FLIP
> >
> > It's a good start to adaptively optimize the logical execution plan with
> > runtime information.
> >
> > Thanks,
> > Zhu
> >
> > Xia Sun  于2024年7月18日周四 18:23写道:
> >
> > > Hi devs,
> > >
> > > Junrui Lee, Lei Yang, and I would like to initiate a discussion about
> > > FLIP-470: Support Adaptive Broadcast Join[1].
> > >
> > > In general, Broadcast Hash Join is currently the most efficient join
> > > strategy available in Flink. However, its prerequisite is that the
> input
> > > data on one side must be sufficiently small; otherwise, it may lead to
> > > memory overuse or other issues. Currently, due to the lack of precise
> > > statistics, it is difficult to make accurate estimations during the
> Flink
> > > SQL Planning phase. For example, when an upstream Filter operator is
> > > present, it is easy to overestimate the size of the table, whereas with
> > > an expansion operator, the table size tends to be underestimated.
> > Moreover,
> > > once the join operator is determined, it cannot be modified at runtime.
> > >
> > > To address this issue, we plan to introduce Adaptive Broadcast Join
> > > capability based on FLIP-468: Introducing StreamGraph-Based Job
> > > Submission[2]
> > > and FLIP-469: Supports Adaptive Optimization of StreamGraph[3]. This
> will
> > > allow the join operator to be dynamically optimized to Broadcast Join
> > based
> > > on the actual input data volume at runtime and fallback when the
> > > optimization
> > > conditions are not met.
> > >
> > > For more details, please refer to FLIP-470[1]. We look forward to your
> > > feedback.
> > >
> > > Best,
> > > Junrui Lee, Lei Yang and Xia Sun
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-470%3A+Support+Adaptive+Broadcast+Join
> > > [2]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-468%3A+Introducing+StreamGraph-Based+Job+Submission
> > > [3]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-469%3A+Supports+Adaptive+Optimization+of+StreamGraph
> > >
> >
>


[DISCUSS] FLIP-470: Support Adaptive Broadcast Join

2024-07-18 Thread Xia Sun
Hi devs,

Junrui Lee, Lei Yang, and I would like to initiate a discussion about
FLIP-470: Support Adaptive Broadcast Join[1].

In general, Broadcast Hash Join is currently the most efficient join
strategy available in Flink. However, its prerequisite is that the input
data on one side must be sufficiently small; otherwise, it may lead to
memory overuse or other issues. Currently, due to the lack of precise
statistics, it is difficult to make accurate estimations during the Flink
SQL Planning phase. For example, when an upstream Filter operator is
present, it is easy to overestimate the size of the table, whereas with
an expansion operator, the table size tends to be underestimated. Moreover,
once the join operator is determined, it cannot be modified at runtime.

To address this issue, we plan to introduce Adaptive Broadcast Join
capability based on FLIP-468: Introducing StreamGraph-Based Job
Submission[2]
and FLIP-469: Supports Adaptive Optimization of StreamGraph[3]. This will
allow the join operator to be dynamically optimized to Broadcast Join based
on the actual input data volume at runtime and fallback when the
optimization
conditions are not met.

For more details, please refer to FLIP-470[1]. We look forward to your
feedback.

Best,
Junrui Lee, Lei Yang and Xia Sun

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-470%3A+Support+Adaptive+Broadcast+Join
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-468%3A+Introducing+StreamGraph-Based+Job+Submission
[3]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-469%3A+Supports+Adaptive+Optimization+of+StreamGraph


Re: [ANNOUNCE] New Apache Flink PMC Member - Fan Rui

2024-06-06 Thread Xia Sun
Congratulations, Rui!

Best,
Xia

Paul Lam  于2024年6月6日周四 11:59写道:

> Congrats, Rui!
>
> Best,
> Paul Lam
>
> > 2024年6月6日 11:02,Junrui Lee  写道:
> >
> > Congratulations, Rui.
> >
> > Best,
> > Junrui
> >
> > Hang Ruan  于2024年6月6日周四 10:35写道:
> >
> >> Congratulations, Rui!
> >>
> >> Best,
> >> Hang
> >>
> >> Samrat Deb  于2024年6月6日周四 10:28写道:
> >>
> >>> Congratulations Rui
> >>>
> >>> Bests,
> >>> Samrat
> >>>
> >>> On Thu, 6 Jun 2024 at 7:45 AM, Yuxin Tan 
> wrote:
> >>>
> >>>> Congratulations, Rui!
> >>>>
> >>>> Best,
> >>>> Yuxin
> >>>>
> >>>>
> >>>> Xuannan Su  于2024年6月6日周四 09:58写道:
> >>>>
> >>>>> Congratulations!
> >>>>>
> >>>>> Best regards,
> >>>>> Xuannan
> >>>>>
> >>>>> On Thu, Jun 6, 2024 at 9:53 AM Hangxiang Yu 
> >>> wrote:
> >>>>>>
> >>>>>> Congratulations, Rui !
> >>>>>>
> >>>>>> On Thu, Jun 6, 2024 at 9:18 AM Lincoln Lee  >>>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Congratulations, Rui!
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Lincoln Lee
> >>>>>>>
> >>>>>>>
> >>>>>>> Lijie Wang  于2024年6月6日周四 09:11写道:
> >>>>>>>
> >>>>>>>> Congratulations, Rui!
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Lijie
> >>>>>>>>
> >>>>>>>> Rodrigo Meneses  于2024年6月5日周三 21:35写道:
> >>>>>>>>
> >>>>>>>>> All the best
> >>>>>>>>>
> >>>>>>>>> On Wed, Jun 5, 2024 at 5:56 AM xiangyu feng <
> >>>> xiangyu...@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Congratulations, Rui!
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>> Xiangyu Feng
> >>>>>>>>>>
> >>>>>>>>>> Feng Jin  于2024年6月5日周三 20:42写道:
> >>>>>>>>>>
> >>>>>>>>>>> Congratulations, Rui!
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Feng Jin
> >>>>>>>>>>>
> >>>>>>>>>>> On Wed, Jun 5, 2024 at 8:23 PM Yanfei Lei <
> >>>> fredia...@gmail.com
> >>>>>>
> >>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Congratulations, Rui!
> >>>>>>>>>>>>
> >>>>>>>>>>>> Best,
> >>>>>>>>>>>> Yanfei
> >>>>>>>>>>>>
> >>>>>>>>>>>> Luke Chen  于2024年6月5日周三 20:08写道:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Congrats, Rui!
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Luke
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Wed, Jun 5, 2024 at 8:02 PM Jiabao Sun <
> >>>>>>> jiabao...@apache.org>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Congrats, Rui. Well-deserved!
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>> Jiabao
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Zhanghao Chen 
> >>> 于2024年6月5日周三
> >>>>>>>> 19:29写道:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Congrats, Rui!
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>> Zhanghao Chen
> >>>>>>>>>>>>>>> 
> >>>>>>>>>>>>>>> From: Piotr Nowojski 
> >>>>>>>>>>>>>>> Sent: Wednesday, June 5, 2024 18:01
> >>>>>>>>>>>>>>> To: dev ; rui fan <
> >>>>>>>> 1996fan...@gmail.com>
> >>>>>>>>>>>>>>> Subject: [ANNOUNCE] New Apache Flink PMC Member -
> >>> Fan
> >>>>> Rui
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On behalf of the PMC, I'm very happy to announce
> >>>>> another
> >>>>>>> new
> >>>>>>>>>> Apache
> >>>>>>>>>>>> Flink
> >>>>>>>>>>>>>>> PMC Member - Fan Rui.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Rui has been active in the community since August
> >>>> 2019.
> >>>>>>>> During
> >>>>>>>>>> this
> >>>>>>>>>>>> time
> >>>>>>>>>>>>>> he
> >>>>>>>>>>>>>>> has contributed a lot of new features. Among
> >>> others:
> >>>>>>>>>>>>>>>  - Decoupling Autoscaler from Kubernetes
> >> Operator,
> >>>> and
> >>>>>>>>>> supporting
> >>>>>>>>>>>>>>> Standalone Autoscaler
> >>>>>>>>>>>>>>>  - Improvements to checkpointing, flamegraphs,
> >>>> restart
> >>>>>>>>>> strategies,
> >>>>>>>>>>>>>>> watermark alignment, network shuffles
> >>>>>>>>>>>>>>>  - Optimizing the memory and CPU usage of large
> >>>>> operators,
> >>>>>>>>>> greatly
> >>>>>>>>>>>>>>> reducing the risk and probability of TaskManager
> >>> OOM
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> He reviewed a significant amount of PRs and has
> >>> been
> >>>>> active
> >>>>>>>>> both
> >>>>>>>>>> on
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>> mailing lists and in Jira helping to both
> >> maintain
> >>>> and
> >>>>> grow
> >>>>>>>>>> Apache
> >>>>>>>>>>>>>> Flink's
> >>>>>>>>>>>>>>> community. He is also our current Flink 1.20
> >>> release
> >>>>>>> manager.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> In the last 12 months, Rui has been the most
> >> active
> >>>>>>>> contributor
> >>>>>>>>>> in
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>> Flink Kubernetes Operator project, while being
> >> the
> >>>> 2nd
> >>>>> most
> >>>>>>>>>> active
> >>>>>>>>>>>> Flink
> >>>>>>>>>>>>>>> contributor at the same time.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Please join me in welcoming and congratulating
> >> Fan
> >>>> Rui!
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>> Piotrek (on behalf of the Flink PMC)
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> Best,
> >>>>>> Hangxiang.
> >>>>>
> >>>>
> >>>
> >>
>
>


[RESULT][VOTE] FLIP-445: Support dynamic parallelism inference for HiveSource

2024-05-05 Thread Xia Sun
Hi all,

FLIP-445: Support dynamic parallelism inference for HiveSource[1] has been
accepted and voted through this thread [2].

The proposal has been accepted with 6 approving votes (5 binding) and there
is no disapproval:

- Muhammet Orazov (non-binding)
- Rui Fan (binding)
- Ron Liu (binding)
- Zhu Zhu (binding)
- Lijie Wang (binding)
- yuxia (binding)

Thanks to all involved.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-445%3A+Support+dynamic+parallelism+inference+for+HiveSource
[2] https://lists.apache.org/thread/lktnb162l2z3042m76to6xfbsdndy4r7

Best,
Xia


[VOTE] FLIP-445: Support dynamic parallelism inference for HiveSource

2024-04-25 Thread Xia Sun
Hi everyone,

I'd like to start a vote on FLIP-445: Support dynamic parallelism inference
for HiveSource[1] which has been discussed in this thread [2].

The vote will be open for at least 72 hours unless there is an objection or
not enough votes.


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-445%3A+Support+dynamic+parallelism+inference+for+HiveSource
[2] https://lists.apache.org/thread/4k1qx6lodhbkknkhjyl0lq9bx8fcpjvn


Best,
Xia


Re: [DISCUSS] FLIP-445: Support dynamic parallelism inference for HiveSource

2024-04-25 Thread Xia Sun
Hi Venkat,

Thanks for joining the discussion.
Based on our understanding, there are still a significant number of
existing tasks using Hive. Indeed, many companies are now migrating their
data to the lakehouse, but due to historical reasons, a substantial amount
of data still resides in Hive.

Best,
Xia

Venkatakrishnan Sowrirajan  于2024年4月25日周四 11:52写道:

> Hi Xia,
>
> +1 on introducing dynamic parallelism inference for HiveSource.
>
> Orthogonal to this discussion, curious, how commonly HiveSource is used
> these days in the industry given the popularity of table formats/sources
> like Iceberg, Hudi and Delta lake?
>
> Thanks
> Venkat
>
> On Wed, Apr 24, 2024, 7:41 PM Xia Sun  wrote:
>
> > Hi everyone,
> >
> > Thanks for all the feedback!
> >
> > If there are no more comments, I would like to start the vote thread,
> > thanks again!
> >
> > Best,
> > Xia
> >
> > Ahmed Hamdy  于2024年4月18日周四 21:31写道:
> >
> > > Hi Xia,
> > > I have read through the FLIP and discussion and the new version of the
> > FLIP
> > > looks better.
> > > +1 for the proposal.
> > > Best Regards
> > > Ahmed Hamdy
> > >
> > >
> > > On Thu, 18 Apr 2024 at 12:21, Ron Liu  wrote:
> > >
> > > > Hi, Xia
> > > >
> > > > Thanks for updating, looks good to me.
> > > >
> > > > Best,
> > > > Ron
> > > >
> > > > Xia Sun  于2024年4月18日周四 19:11写道:
> > > >
> > > > > Hi Ron,
> > > > > Yes, presenting it in a table might be more intuitive. I have
> already
> > > > added
> > > > > the table in the "Public Interfaces | New Config Option" chapter of
> > > FLIP.
> > > > > PTAL~
> > > > >
> > > > > Ron Liu  于2024年4月18日周四 18:10写道:
> > > > >
> > > > > > Hi, Xia
> > > > > >
> > > > > > Thanks for your reply.
> > > > > >
> > > > > > > That means, in terms
> > > > > > of priority, `table.exec.hive.infer-source-parallelism` >
> > > > > > `table.exec.hive.infer-source-parallelism.mode`.
> > > > > >
> > > > > > I still have some confusion, if the
> > > > > > `table.exec.hive.infer-source-parallelism`
> > > > > > >`table.exec.hive.infer-source-parallelism.mode`, currently
> > > > > > `table.exec.hive.infer-source-parallelism` default value is true,
> > > that
> > > > > > means always static parallelism inference work? Or perhaps after
> > this
> > > > > FLIP,
> > > > > > we changed the default behavior of
> > > > > > `table.exec.hive.infer-source-parallelism` to indicate dynamic
> > > > > parallelism
> > > > > > inference when enabled.
> > > > > > I think you should list the various behaviors of these two
> options
> > > that
> > > > > > coexist in FLIP by a table, only then users can know how the
> > dynamic
> > > > and
> > > > > > static parallelism inference work.
> > > > > >
> > > > > > Best,
> > > > > > Ron
> > > > > >
> > > > > > Xia Sun  于2024年4月18日周四 16:33写道:
> > > > > >
> > > > > > > Hi Ron and Lijie,
> > > > > > > Thanks for joining the discussion and sharing your suggestions.
> > > > > > >
> > > > > > > > the InferMode class should also be introduced in the Public
> > > > > Interfaces
> > > > > > > > section!
> > > > > > >
> > > > > > >
> > > > > > > Thanks for the reminder, I have now added the InferMode class
> to
> > > the
> > > > > > Public
> > > > > > > Interfaces section as well.
> > > > > > >
> > > > > > > > `table.exec.hive.infer-source-parallelism.max` is 1024, I
> > checked
> > > > > > through
> > > > > > > > the code that the default value is 1000?
> > > > > > >
> > > > > > >
> > > > > > > I have checked and the default value of
> > > > > > > `table.exec.hive.infer-source-parallelism.max` is indeed 1000.
> > This
> > > >

Re: [DISCUSS] FLIP-445: Support dynamic parallelism inference for HiveSource

2024-04-24 Thread Xia Sun
Hi everyone,

Thanks for all the feedback!

If there are no more comments, I would like to start the vote thread,
thanks again!

Best,
Xia

Ahmed Hamdy  于2024年4月18日周四 21:31写道:

> Hi Xia,
> I have read through the FLIP and discussion and the new version of the FLIP
> looks better.
> +1 for the proposal.
> Best Regards
> Ahmed Hamdy
>
>
> On Thu, 18 Apr 2024 at 12:21, Ron Liu  wrote:
>
> > Hi, Xia
> >
> > Thanks for updating, looks good to me.
> >
> > Best,
> > Ron
> >
> > Xia Sun  于2024年4月18日周四 19:11写道:
> >
> > > Hi Ron,
> > > Yes, presenting it in a table might be more intuitive. I have already
> > added
> > > the table in the "Public Interfaces | New Config Option" chapter of
> FLIP.
> > > PTAL~
> > >
> > > Ron Liu  于2024年4月18日周四 18:10写道:
> > >
> > > > Hi, Xia
> > > >
> > > > Thanks for your reply.
> > > >
> > > > > That means, in terms
> > > > of priority, `table.exec.hive.infer-source-parallelism` >
> > > > `table.exec.hive.infer-source-parallelism.mode`.
> > > >
> > > > I still have some confusion, if the
> > > > `table.exec.hive.infer-source-parallelism`
> > > > >`table.exec.hive.infer-source-parallelism.mode`, currently
> > > > `table.exec.hive.infer-source-parallelism` default value is true,
> that
> > > > means always static parallelism inference work? Or perhaps after this
> > > FLIP,
> > > > we changed the default behavior of
> > > > `table.exec.hive.infer-source-parallelism` to indicate dynamic
> > > parallelism
> > > > inference when enabled.
> > > > I think you should list the various behaviors of these two options
> that
> > > > coexist in FLIP by a table, only then users can know how the dynamic
> > and
> > > > static parallelism inference work.
> > > >
> > > > Best,
> > > > Ron
> > > >
> > > > Xia Sun  于2024年4月18日周四 16:33写道:
> > > >
> > > > > Hi Ron and Lijie,
> > > > > Thanks for joining the discussion and sharing your suggestions.
> > > > >
> > > > > > the InferMode class should also be introduced in the Public
> > > Interfaces
> > > > > > section!
> > > > >
> > > > >
> > > > > Thanks for the reminder, I have now added the InferMode class to
> the
> > > > Public
> > > > > Interfaces section as well.
> > > > >
> > > > > > `table.exec.hive.infer-source-parallelism.max` is 1024, I checked
> > > > through
> > > > > > the code that the default value is 1000?
> > > > >
> > > > >
> > > > > I have checked and the default value of
> > > > > `table.exec.hive.infer-source-parallelism.max` is indeed 1000. This
> > has
> > > > > been corrected in the FLIP.
> > > > >
> > > > > > how are`table.exec.hive.infer-source-parallelism` and
> > > > > > `table.exec.hive.infer-source-parallelism.mode` compatible?
> > > > >
> > > > >
> > > > > This is indeed a critical point. The current plan is to deprecate
> > > > > `table.exec.hive.infer-source-parallelism` but still utilize it as
> > the
> > > > main
> > > > > switch for enabling automatic parallelism inference. That means, in
> > > terms
> > > > > of priority, `table.exec.hive.infer-source-parallelism` >
> > > > > `table.exec.hive.infer-source-parallelism.mode`. In future
> versions,
> > if
> > > > > `table.exec.hive.infer-source-parallelism` is removed, this logic
> > will
> > > > also
> > > > > need to be revised, leaving only
> > > > > `table.exec.hive.infer-source-parallelism.mode` as the basis for
> > > deciding
> > > > > whether to enable parallelism inference. I have also added this
> > > > description
> > > > > to the FLIP.
> > > > >
> > > > >
> > > > > > In FLIP-367 it is supported to be able to set the Source's
> > > parallelism
> > > > > > individually, if in the future HiveSource also supports this
> > feature,
> > > > > > however, the default value of
> > > > > > `table.exec.hive.infer-source-parallelism.mode` is
> > 

Re: [DISCUSS] FLIP-445: Support dynamic parallelism inference for HiveSource

2024-04-18 Thread Xia Sun
Hi Ron,
Yes, presenting it in a table might be more intuitive. I have already added
the table in the "Public Interfaces | New Config Option" chapter of FLIP.
PTAL~

Ron Liu  于2024年4月18日周四 18:10写道:

> Hi, Xia
>
> Thanks for your reply.
>
> > That means, in terms
> of priority, `table.exec.hive.infer-source-parallelism` >
> `table.exec.hive.infer-source-parallelism.mode`.
>
> I still have some confusion, if the
> `table.exec.hive.infer-source-parallelism`
> >`table.exec.hive.infer-source-parallelism.mode`, currently
> `table.exec.hive.infer-source-parallelism` default value is true, that
> means always static parallelism inference work? Or perhaps after this FLIP,
> we changed the default behavior of
> `table.exec.hive.infer-source-parallelism` to indicate dynamic parallelism
> inference when enabled.
> I think you should list the various behaviors of these two options that
> coexist in FLIP by a table, only then users can know how the dynamic and
> static parallelism inference work.
>
> Best,
> Ron
>
> Xia Sun  于2024年4月18日周四 16:33写道:
>
> > Hi Ron and Lijie,
> > Thanks for joining the discussion and sharing your suggestions.
> >
> > > the InferMode class should also be introduced in the Public Interfaces
> > > section!
> >
> >
> > Thanks for the reminder, I have now added the InferMode class to the
> Public
> > Interfaces section as well.
> >
> > > `table.exec.hive.infer-source-parallelism.max` is 1024, I checked
> through
> > > the code that the default value is 1000?
> >
> >
> > I have checked and the default value of
> > `table.exec.hive.infer-source-parallelism.max` is indeed 1000. This has
> > been corrected in the FLIP.
> >
> > > how are`table.exec.hive.infer-source-parallelism` and
> > > `table.exec.hive.infer-source-parallelism.mode` compatible?
> >
> >
> > This is indeed a critical point. The current plan is to deprecate
> > `table.exec.hive.infer-source-parallelism` but still utilize it as the
> main
> > switch for enabling automatic parallelism inference. That means, in terms
> > of priority, `table.exec.hive.infer-source-parallelism` >
> > `table.exec.hive.infer-source-parallelism.mode`. In future versions, if
> > `table.exec.hive.infer-source-parallelism` is removed, this logic will
> also
> > need to be revised, leaving only
> > `table.exec.hive.infer-source-parallelism.mode` as the basis for deciding
> > whether to enable parallelism inference. I have also added this
> description
> > to the FLIP.
> >
> >
> > > In FLIP-367 it is supported to be able to set the Source's parallelism
> > > individually, if in the future HiveSource also supports this feature,
> > > however, the default value of
> > > `table.exec.hive.infer-source-parallelism.mode` is `InferMode.DYNAMIC`,
> > at
> > > this point will the parallelism be dynamically derived or will the
> > manually
> > > set parallelism take effect, and who has the higher priority?
> >
> >
> > From my understanding, 'manually set parallelism' has the higher
> priority,
> > just like one of the preconditions for the effectiveness of dynamic
> > parallelism inference in the AdaptiveBatchScheduler is that the vertex's
> > parallelism isn't set. I believe whether it's static inference or dynamic
> > inference, the manually set parallelism by the user should be respected.
> >
> > > The `InferMode.NONE` option.
> >
> > Currently, 'adding InferMode.NONE' seems to be the prevailing opinion. I
> > will add InferMode.NONE as one of the Enum options in InferMode class.
> >
> > Best,
> > Xia
> >
> > Lijie Wang  于2024年4月18日周四 13:50写道:
> >
> > > Thanks for driving the discussion.
> > >
> > > +1 for the proposal and +1 for the `InferMode.NONE` option.
> > >
> > > Best,
> > > Lijie
> > >
> > > Ron liu  于2024年4月18日周四 11:36写道:
> > >
> > > > Hi, Xia
> > > >
> > > > Thanks for driving this FLIP.
> > > >
> > > > This proposal looks good to me overall. However, I have the following
> > > minor
> > > > questions:
> > > >
> > > > 1. FLIP introduced `table.exec.hive.infer-source-parallelism.mode`
> as a
> > > new
> > > > parameter, and the value is the enum class `InferMode`, I think the
> > > > InferMode class should also be introduced in the Public Interfaces
> > > section!
> > > > 2. You mentioned in FLIP that the default valu

Re: [DISCUSS] FLIP-445: Support dynamic parallelism inference for HiveSource

2024-04-18 Thread Xia Sun
Hi Ron and Lijie,
Thanks for joining the discussion and sharing your suggestions.

> the InferMode class should also be introduced in the Public Interfaces
> section!


Thanks for the reminder, I have now added the InferMode class to the Public
Interfaces section as well.

> `table.exec.hive.infer-source-parallelism.max` is 1024, I checked through
> the code that the default value is 1000?


I have checked and the default value of
`table.exec.hive.infer-source-parallelism.max` is indeed 1000. This has
been corrected in the FLIP.

> how are`table.exec.hive.infer-source-parallelism` and
> `table.exec.hive.infer-source-parallelism.mode` compatible?


This is indeed a critical point. The current plan is to deprecate
`table.exec.hive.infer-source-parallelism` but still utilize it as the main
switch for enabling automatic parallelism inference. That means, in terms
of priority, `table.exec.hive.infer-source-parallelism` >
`table.exec.hive.infer-source-parallelism.mode`. In future versions, if
`table.exec.hive.infer-source-parallelism` is removed, this logic will also
need to be revised, leaving only
`table.exec.hive.infer-source-parallelism.mode` as the basis for deciding
whether to enable parallelism inference. I have also added this description
to the FLIP.


> In FLIP-367 it is supported to be able to set the Source's parallelism
> individually, if in the future HiveSource also supports this feature,
> however, the default value of
> `table.exec.hive.infer-source-parallelism.mode` is `InferMode.DYNAMIC`, at
> this point will the parallelism be dynamically derived or will the manually
> set parallelism take effect, and who has the higher priority?


>From my understanding, 'manually set parallelism' has the higher priority,
just like one of the preconditions for the effectiveness of dynamic
parallelism inference in the AdaptiveBatchScheduler is that the vertex's
parallelism isn't set. I believe whether it's static inference or dynamic
inference, the manually set parallelism by the user should be respected.

> The `InferMode.NONE` option.

Currently, 'adding InferMode.NONE' seems to be the prevailing opinion. I
will add InferMode.NONE as one of the Enum options in InferMode class.

Best,
Xia

Lijie Wang  于2024年4月18日周四 13:50写道:

> Thanks for driving the discussion.
>
> +1 for the proposal and +1 for the `InferMode.NONE` option.
>
> Best,
> Lijie
>
> Ron liu  于2024年4月18日周四 11:36写道:
>
> > Hi, Xia
> >
> > Thanks for driving this FLIP.
> >
> > This proposal looks good to me overall. However, I have the following
> minor
> > questions:
> >
> > 1. FLIP introduced `table.exec.hive.infer-source-parallelism.mode` as a
> new
> > parameter, and the value is the enum class `InferMode`, I think the
> > InferMode class should also be introduced in the Public Interfaces
> section!
> > 2. You mentioned in FLIP that the default value of
> > `table.exec.hive.infer-source-parallelism.max` is 1024, I checked through
> > the code that the default value is 1000?
> > 3. I also agree with Muhammet's idea that there is no need to introduce
> the
> > option `table.exec.hive.infer-source-parallelism.enabled`, and that
> > expanding the InferMode values will fulfill the need. There is another
> > issue to consider here though, how are
> > `table.exec.hive.infer-source-parallelism` and
> > `table.exec.hive.infer-source-parallelism.mode` compatible?
> > 4. In FLIP-367 it is supported to be able to set the Source's parallelism
> > individually, if in the future HiveSource also supports this feature,
> > however, the default value of
> > `table.exec.hive.infer-source-parallelism.mode` is `InferMode. DYNAMIC`,
> at
> > this point will the parallelism be dynamically derived or will the
> manually
> > set parallelism take effect, and who has the higher priority?
> >
> > Best,
> > Ron
> >
> > Xia Sun  于2024年4月17日周三 12:08写道:
> >
> > > Hi Jeyhun, Muhammet,
> > > Thanks for all the feedback!
> > >
> > > > Could you please mention the default values for the new
> configurations
> > > > (e.g., table.exec.hive.infer-source-parallelism.mode,
> > > > table.exec.hive.infer-source-parallelism.enabled,
> > > > etc) ?
> > >
> > >
> > > Thanks for your suggestion. I have supplemented the explanation
> regarding
> > > the default values.
> > >
> > > > Since we are introducing the mode as a configuration option,
> > > > could it make sense to have `InferMode.NONE` option also?
> > > > The `NONE` option would disable the inference.
> > >
> > >
> > > This is a good idea. Looking ahead, i

Re: [DISCUSS] FLIP-445: Support dynamic parallelism inference for HiveSource

2024-04-16 Thread Xia Sun
Hi Jeyhun, Muhammet,
Thanks for all the feedback!

> Could you please mention the default values for the new configurations
> (e.g., table.exec.hive.infer-source-parallelism.mode,
> table.exec.hive.infer-source-parallelism.enabled,
> etc) ?


Thanks for your suggestion. I have supplemented the explanation regarding
the default values.

> Since we are introducing the mode as a configuration option,
> could it make sense to have `InferMode.NONE` option also?
> The `NONE` option would disable the inference.


This is a good idea. Looking ahead, it could eliminate the need for
introducing
a new configuration option. I haven't identified any potential
compatibility issues
as yet. If there are no further ideas from others, I'll go ahead and update
the FLIP to
introducing InferMode.NONE.

Best,
Xia

Muhammet Orazov  于2024年4月17日周三 10:31写道:

> Hello Xia,
>
> Thanks for the FLIP!
>
> Since we are introducing the mode as a configuration option,
> could it make sense to have `InferMode.NONE` option also?
> The `NONE` option would disable the inference.
>
> This way we deprecate the `table.exec.hive.infer-source-parallelism`
> and no additional `table.exec.hive.infer-source-parallelism.enabled`
> option is required.
>
> What do you think?
>
> Best,
> Muhammet
>
> On 2024-04-16 07:07, Xia Sun wrote:
> > Hi everyone,
> > I would like to start a discussion on FLIP-445: Support dynamic
> > parallelism
> > inference for HiveSource[1].
> >
> > FLIP-379[2] has introduced dynamic source parallelism inference for
> > batch
> > jobs, which can utilize runtime information to more accurately decide
> > the
> > source parallelism. As a follow-up task, we plan to implement the
> > dynamic
> > parallelism inference interface for HiveSource, and also switch the
> > default
> > static parallelism inference to dynamic parallelism inference.
> >
> > Looking forward to your feedback and suggestions, thanks.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-445%3A+Support+dynamic+parallelism+inference+for+HiveSource
> > [2]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs
> >
> > Best regards,
> > Xia
>


[DISCUSS] FLIP-445: Support dynamic parallelism inference for HiveSource

2024-04-16 Thread Xia Sun
Hi everyone,
I would like to start a discussion on FLIP-445: Support dynamic parallelism
inference for HiveSource[1].

FLIP-379[2] has introduced dynamic source parallelism inference for batch
jobs, which can utilize runtime information to more accurately decide the
source parallelism. As a follow-up task, we plan to implement the dynamic
parallelism inference interface for HiveSource, and also switch the default
static parallelism inference to dynamic parallelism inference.

Looking forward to your feedback and suggestions, thanks.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-445%3A+Support+dynamic+parallelism+inference+for+HiveSource
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs

Best regards,
Xia


Re: [ANNOUNCE] New Apache Flink Committer - Zakelly Lan

2024-04-15 Thread Xia Sun
Congratulations Zakelly!

 Best,
 Xia

Leonard Xu  于2024年4月15日周一 16:16写道:

> Congratulations Zakelly!
>
>
> Best,
> Leonard
> > 2024年4月15日 下午3:56,Samrat Deb  写道:
> >
> > Congratulations Zakelly!
>
>


Re: Question around Flink's AdaptiveBatchScheduler

2024-04-15 Thread Xia Sun
Hi Venkat,
I agree that the parallelism of source vertex should not be upper bounded
by the job's global max parallelism. The case you mentioned, >> High filter
selectivity with huge amounts of data to read  excellently supports this
viewpoint. (In fact, in the current implementation, if the source
parallelism is pre-specified at job create stage, rather than relying on
the dynamic parallelism inference of the AdaptiveBatchScheduler, the source
vertex's parallelism can indeed exceed the job's global max parallelism.)

As Lijie and Junrui pointed out, the key issue is "semantic consistency."
Currently, if a vertex has not set maxParallelism, the
AdaptiveBatchScheduler will use
`execution.batch.adaptive.auto-parallelism.max-parallelism` as the vertex's
maxParallelism. Since the current implementation does not distinguish
between source vertices and downstream vertices, source vertices are also
subject to this limitation.

Therefore, I believe that if the issue of "semantic consistency" can be
well explained in the code and configuration documentation, the
AdaptiveBatchScheduler should support that the parallelism of source
vertices can exceed the job's global max parallelism.

Best,
Xia

Venkatakrishnan Sowrirajan  于2024年4月14日周日 10:31写道:

> Let me state why I think "*jobmanager.adaptive-batch-sche*
> *duler.default-source-parallelism*" should not be bound by the "
> *jobmanager.adaptive-batch-sche**duler.max-parallelism*".
>
>- Source vertex is unique and does not have any upstream vertices
>- Downstream vertices read shuffled data partitioned by key, which is
>not the case for the Source vertex
>- Limiting source parallelism by downstream vertices' max parallelism is
>incorrect
>
> If we say for ""semantic consistency" the source vertex parallelism has to
> be bound by the overall job's max parallelism, it can lead to following
> issues:
>
>- High filter selectivity with huge amounts of data to read - setting
>high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" so that
>source parallelism can be set higher can lead to small blocks and
>sub-optimal performance.
>- Setting high "*jobmanager.adaptive-batch-scheduler.max-parallelism*"
>requires careful tuning of network buffer configurations which is
>unnecessary in cases where it is not required just so that the source
>parallelism can be set high.
>
> Regards
> Venkata krishnan
>
> On Thu, Apr 11, 2024 at 9:30 PM Junrui Lee  wrote:
>
> > Hello Venkata krishnan,
> >
> > I think the term "semantic inconsistency" defined by
> > jobmanager.adaptive-batch-scheduler.max-parallelism refers to
> maintaining a
> > uniform upper limit on parallelism across all vertices within a job. As
> the
> > source vertices are part of the global execution graph, they should also
> > respect this rule to ensure consistent application of parallelism
> > constraints.
> >
> > Best,
> > Junrui
> >
> > Venkatakrishnan Sowrirajan  于2024年4月12日周五 02:10写道:
> >
> > > Gentle bump on this question. cc @Becket Qin  as
> > > well.
> > >
> > > Regards
> > > Venkata krishnan
> > >
> > >
> > > On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan Sowrirajan <
> > > vsowr...@asu.edu> wrote:
> > >
> > > > Thanks for the response Lijie and Junrui. Sorry for the late reply.
> Few
> > > > follow up questions.
> > > >
> > > > > Source can actually ignore this limit
> > > > because it has no upstream, but this will lead to semantic
> > inconsistency.
> > > >
> > > > Lijie, can you please elaborate on the above comment further? What do
> > you
> > > > mean when you say it will lead to "semantic inconsistency"?
> > > >
> > > > > Secondly, we first need to limit the max parallelism of
> (downstream)
> > > > vertex, and then we can decide how many subpartitions (upstream
> vertex)
> > > > should produce. The limit should be effective, otherwise some
> > downstream
> > > > tasks will have no data to process.
> > > >
> > > > This makes sense in the context of any other vertices other than the
> > > > source vertex. As you mentioned above ("Source can actually ignore
> this
> > > > limit because it has no upstream"), therefore I feel "
> > > > jobmanager.adaptive-batch-scheduler.default-source-parallelism" need
> > not
> > > > be upper bounded by
> > > "jobmanager.adaptive-batch-scheduler.max

Re: [ANNOUNCE] New Apache Flink PMC Member - Jing Ge

2024-04-14 Thread Xia Sun
Congratulations, Jing!

Best,
Xia

Ferenc Csaky  于2024年4月13日周六 00:50写道:

> Congratulations, Jing!
>
> Best,
> Ferenc
>
>
>
> On Friday, April 12th, 2024 at 13:54, Ron liu  wrote:
>
> >
> >
> > Congratulations, Jing!
> >
> > Best,
> > Ron
> >
> > Junrui Lee jrlee@gmail.com 于2024年4月12日周五 18:54写道:
> >
> > > Congratulations, Jing!
> > >
> > > Best,
> > > Junrui
> > >
> > > Aleksandr Pilipenko z3d...@gmail.com 于2024年4月12日周五 18:28写道:
> > >
> > > > Congratulations, Jing!
> > > >
> > > > Best Regards,
> > > > Aleksandr
>


Re: [ANNOUNCE] New Apache Flink PMC Member - Lincoln Lee

2024-04-14 Thread Xia Sun
Congratulations Lincoln !

Best,
Xia

Ferenc Csaky  于2024年4月13日周六 00:50写道:

> Congratulations, Lincoln!
>
> Best,
> Ferenc
>
>
>
>
> On Friday, April 12th, 2024 at 15:54, lorenzo.affe...@ververica.com.INVALID
>  wrote:
>
> >
> >
> > Huge congrats! Well done!
> > On Apr 12, 2024 at 13:56 +0200, Ron liu ron9@gmail.com, wrote:
> >
> > > Congratulations, Lincoln!
> > >
> > > Best,
> > > Ron
> > >
> > > Junrui Lee jrlee@gmail.com 于2024年4月12日周五 18:54写道:
> > >
> > > > Congratulations, Lincoln!
> > > >
> > > > Best,
> > > > Junrui
> > > >
> > > > Aleksandr Pilipenko z3d...@gmail.com 于2024年4月12日周五 18:29写道:
> > > >
> > > > > > Congratulations, Lincoln!
> > > > > >
> > > > > > Best Regards
> > > > > > Aleksandr
>


Re: [PROPOSAL] Contribute Flink CDC Connectors project to Apache Flink

2023-12-10 Thread The Xia
+1

On 2023/12/11 02:33:24 Xin Gong wrote:
> 
> good news.
> 
> +1
> 
> Best,
> gongxin
> On 2023/12/07 03:24:59 Leonard Xu wrote:
> > Dear Flink devs,
> > 
> > As you may have heard, we at Alibaba (Ververica) are planning to donate CDC 
> > Connectors for the Apache Flink project[1] to the Apache Flink community.
> > 
> > CDC Connectors for Apache Flink comprise a collection of source connectors 
> > designed specifically for Apache Flink. These connectors[2] enable the 
> > ingestion of changes from various databases using Change Data Capture 
> > (CDC), most of these CDC connectors are powered by Debezium[3]. They 
> > support both the DataStream API and the Table/SQL API, facilitating the 
> > reading of database snapshots and continuous reading of transaction logs 
> > with exactly-once processing, even in the event of failures.
> > 
> > 
> > Additionally, in the latest version 3.0, we have introduced many 
> > long-awaited features. Starting from CDC version 3.0, we've built a 
> > Streaming ELT Framework available for streaming data integration. This 
> > framework allows users to write their data synchronization logic in a 
> > simple YAML file, which will automatically be translated into a Flink 
> > DataStreaming job. It emphasizes optimizing the task submission process and 
> > offers advanced functionalities such as whole database synchronization, 
> > merging sharded tables, and schema evolution[4].
> > 
> > 
> > I believe this initiative is a perfect match for both sides. For the Flink 
> > community, it presents an opportunity to enhance Flink's competitive 
> > advantage in streaming data integration, promoting the healthy growth and 
> > prosperity of the Apache Flink ecosystem. For the CDC Connectors project, 
> > becoming a sub-project of Apache Flink means being part of a neutral 
> > open-source community, which can attract a more diverse pool of 
> > contributors.
> > 
> > Please note that the aforementioned points represent only some of our 
> > motivations and vision for this donation. Specific future operations need 
> > to be further discussed in this thread. For example, the sub-project name 
> > after the donation; we hope to name it Flink-CDC aiming to streaming data 
> > intergration through Apache Flink, following the naming convention of 
> > Flink-ML; And this project is managed by a total of 8 maintainers, 
> > including 3 Flink PMC members and 1 Flink Committer. The remaining 4 
> > maintainers are also highly active contributors to the Flink community, 
> > donating this project to the Flink community implies that their permissions 
> > might be reduced. Therefore, we may need to bring up this topic for further 
> > discussion within the Flink PMC. Additionally, we need to discuss how to 
> > migrate existing users and documents. We have a user group of nearly 10,000 
> > people and a multi-version documentation site need to migrate. We also need 
> > to plan for the migration of CI/CD processes and other specifics. 
> > 
> > 
> > While there are many intricate details that require implementation, we are 
> > committed to progressing and finalizing this donation process.
> > 
> > 
> > Despite being Flink’s most active ecological project (as evaluated by 
> > GitHub metrics), it also boasts a significant user base. However, I believe 
> > it's essential to commence discussions on future operations only after the 
> > community reaches a consensus on whether they desire this donation.
> > 
> > 
> > Really looking forward to hear what you think! 
> > 
> > 
> > Best,
> > Leonard (on behalf of the Flink CDC Connectors project maintainers)
> > 
> > [1] https://github.com/ververica/flink-cdc-connectors
> > [2] 
> > https://ververica.github.io/flink-cdc-connectors/master/content/overview/cdc-connectors.html
> > [3] https://debezium.io
> > [4] 
> > https://ververica.github.io/flink-cdc-connectors/master/content/overview/cdc-pipeline.html
> 


[RESULT][VOTE] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-12-05 Thread Xia Sun
Dear developers,

FLIP-379: Dynamic source parallelism inference for batch jobs[1] has been
accepted and voted through this thread [2].

The proposal received 6 approving binding votes and there is no disapproval:

- Zhu Zhu (binding)
- Lijie Wang (binding)
- Rui Fan (binding)
- Etienne Chauchot (binding)
- Leonard Xu (binding)
- Jingsong Li (binding)

Thanks to all involved.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs
[2] https://lists.apache.org/thread/g03m2r8dodz6gn8jgf36mvq60h1tsnqg

Best,
Xia


[VOTE] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-11-29 Thread Xia Sun
Hi everyone,

I'd like to start a vote on FLIP-379: Dynamic source parallelism inference
for batch jobs[1] which has been discussed in this thread [2].

The vote will be open for at least 72 hours unless there is an objection or
not enough votes.


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs
[2] https://lists.apache.org/thread/ocftkqy5d2x4n58wzprgm5qqrzzkbmb8


Best Regards,
Xia


Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-11-28 Thread Xia Sun
Hi everyone,

Thanks for all the comments! I will initiate the vote tomorrow if there is
no further discussion.

Best,
Xia

Leonard Xu  于2023年11月24日周五 18:50写道:

> Thanks Xia and Zhu Zhu for driving this work,
>
> It will help unify the parallelism inference for all operators of batch
> job, the updated FLIP looks good to me.
>
> Best,
> Leonard
>
>
> > 2023年11月24日 下午5:53,Xia Sun  写道:
> >
> > Hi all,
> > Offline discussed with Zhu Zhu and Leonard Xu and we have reached the
> > following three points of consensus:
> >
> > 1. Rename the interface method Context#getMaxSourceParallelism proposed
> by
> > the FLIP to Context#getParallelismInferenceUpperBound, to make the
> meaning
> > of the method clearer. See [1] for details.
> >
> > 2. We provide a more detailed explanation of the effective priority of
> the
> > dynamic source parallelism inference proposed by this FLIP and the order
> of
> > values for the upper bound of source parallelism. We also point out the
> > current support and limitations of the AdaptiveBatchScheduler regarding
> > source parallelism inference. See [2] for details.
> >
> > 3. This FLIP will only focus on the framework-level implementation and
> will
> > prioritize the implementation of FileSource as an example of the new
> > interface proposed by the FLIP. The HiveSource, due to its existing
> static
> > parallelism dynamic inference, and changes in default values for
> > configuration items such as `table.exec.hive.infer-source-parallelism`,
> > requires a more detailed migration plan, as well as more comprehensive
> > design and discussion. It is not suitable as part of this FLIP and needs
> a
> > separate FLIP. Therefore, we have removed the HiveSource part from this
> > FLIP.
> >
> > Thanks again to everyone who participated in the discussion.
> > Looking forward to your continued feedback.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs#FLIP379:Dynamicsourceparallelisminferenceforbatchjobs-IntroduceDynamicParallelismInferenceinterfaceforSource
> > [2]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs#FLIP379:Dynamicsourceparallelisminferenceforbatchjobs-ConfigurationBehaviorChanges
> >
> > Best,
> > Xia
> >
> > Leonard Xu  于2023年11月22日周三 18:37写道:
> >
> >> Thanks Xia for the  reply, sorry for the late reply.
> >>
> >>> Thanks for pointing out the issue, the current wording does indeed seem
> >> to
> >>> be confusing. It involves the existing implementation of the
> >>> AdaptiveBatchScheduler, where the dynamically inferred parallelism
> cannot
> >>> exceed the JobVertex's maxParallelism (which is typically set to either
> >> the
> >>> global default max parallelism or the user-specified JobVertex max
> >>> parallelism), so the flip maintains the logic. I have modified the flip
> >> to
> >>> avoid confusion as much as possible.
> >>
> >> I didn’t see the change part in this FLIP, could you check it?
> >>
> >>> We can use Configuration::getOptional to check if the user has
> configured
> >>> the
> >> `execution.batch.adaptive.auto-parallelism.default-source-parallelism`.
> >>
> >> Using Readable#getOptional(ConfigOption option) makes sense to me.
> >>
> >>> As a follow-up task, we may have a dedicated discussion in the future
> to
> >>> see if we need to change the default value of
> >>> `table.exec.hive.infer-source-parallelism` to false. Before then, user
> >> can
> >>> manually set `table.exec.hive.infer-source-parallelism` to false to
> >> enable
> >>> dynamic parallelism inference, and use
> >>> `execution.batch.adaptive.auto-parallelism.default-source-parallelism`
> to
> >>> replace `table.exec.hive.infer-source-parallelism.max` as the
> parallelism
> >>> inference upper bound. I have updated both the Flip's
> >>> DynamicParallelismInference interface implementation and Migration Plan
> >>> modules to illustrate this.
> >>
> >> In my opinion, moving HiveSource to subsequent discussion is not OK, see
> >> my explanation: HiveSource supports dynamic source parallel inference is
> >> one part of the FLIP implementation, it looks like that we introduce a
> >> configuration
> >> `execution.batch.adaptive.auto-paralleli

Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-11-24 Thread Xia Sun
Hi all,
Offline discussed with Zhu Zhu and Leonard Xu and we have reached the
following three points of consensus:

1. Rename the interface method Context#getMaxSourceParallelism proposed by
the FLIP to Context#getParallelismInferenceUpperBound, to make the meaning
of the method clearer. See [1] for details.

2. We provide a more detailed explanation of the effective priority of the
dynamic source parallelism inference proposed by this FLIP and the order of
values for the upper bound of source parallelism. We also point out the
current support and limitations of the AdaptiveBatchScheduler regarding
source parallelism inference. See [2] for details.

3. This FLIP will only focus on the framework-level implementation and will
prioritize the implementation of FileSource as an example of the new
interface proposed by the FLIP. The HiveSource, due to its existing static
parallelism dynamic inference, and changes in default values for
configuration items such as `table.exec.hive.infer-source-parallelism`,
requires a more detailed migration plan, as well as more comprehensive
design and discussion. It is not suitable as part of this FLIP and needs a
separate FLIP. Therefore, we have removed the HiveSource part from this
FLIP.

Thanks again to everyone who participated in the discussion.
Looking forward to your continued feedback.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs#FLIP379:Dynamicsourceparallelisminferenceforbatchjobs-IntroduceDynamicParallelismInferenceinterfaceforSource
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs#FLIP379:Dynamicsourceparallelisminferenceforbatchjobs-ConfigurationBehaviorChanges

Best,
Xia

Leonard Xu  于2023年11月22日周三 18:37写道:

> Thanks Xia for the  reply, sorry for the late reply.
>
> > Thanks for pointing out the issue, the current wording does indeed seem
> to
> > be confusing. It involves the existing implementation of the
> > AdaptiveBatchScheduler, where the dynamically inferred parallelism cannot
> > exceed the JobVertex's maxParallelism (which is typically set to either
> the
> > global default max parallelism or the user-specified JobVertex max
> > parallelism), so the flip maintains the logic. I have modified the flip
> to
> > avoid confusion as much as possible.
>
> I didn’t see the change part in this FLIP, could you check it?
>
> > We can use Configuration::getOptional to check if the user has configured
> > the
> `execution.batch.adaptive.auto-parallelism.default-source-parallelism`.
>
> Using Readable#getOptional(ConfigOption option) makes sense to me.
>
> > As a follow-up task, we may have a dedicated discussion in the future to
> > see if we need to change the default value of
> > `table.exec.hive.infer-source-parallelism` to false. Before then, user
> can
> > manually set `table.exec.hive.infer-source-parallelism` to false to
> enable
> > dynamic parallelism inference, and use
> > `execution.batch.adaptive.auto-parallelism.default-source-parallelism` to
> > replace `table.exec.hive.infer-source-parallelism.max` as the parallelism
> > inference upper bound. I have updated both the Flip's
> > DynamicParallelismInference interface implementation and Migration Plan
> > modules to illustrate this.
>
> In my opinion, moving HiveSource to subsequent discussion is not OK, see
> my explanation: HiveSource supports dynamic source parallel inference is
> one part of the FLIP implementation, it looks like that we introduce a
> configuration
> `execution.batch.adaptive.auto-parallelism.default-source-parallelism`
> which conflicts with existing configuration `table.exec.hive.infer-
> source-parallelism`. But we do not provide conflict resolution in the
> current flip, just postpone the work to future discussions, this uncertain
> state is likely to cause subsequent discussions to be shelved from past
> community work experience. I’d suggest we make this part clear in this FLIP.
>
>
> Best,
> Leonard
>
>
> >
> >
> > Leonard Xu  于2023年11月16日周四 12:36写道:
> >
> >> Thanks Xia for the detailed reply.
> >>
> >>>> `How user disable the parallelism inference if they want to use fixed
> >> source parallelism?`
> >>>> `Could you explain the priority the static parallelism set from table
> >> layer and the proposed dynamic source parallelism?`
> >>>
> >>> From the user's perspective, if the user specifies a fixed parallelism
> >> for
> >>> the source, dynamic source parallelism inference will be automatically
> >>> disabled. From the perspective of priority, the user’s specified
> >>> para

Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-11-20 Thread Xia Sun
Thanks Leonard for the detailed feedback and input.

> The 'Max source parallelism’ is the information that runtime offered to
Source as a hint to infer the actual parallelism, a name with max prefix
but calculated > with minimum value confusing me a lot, especially when I
read the HiveSource pseudocode:

> fileEnumerator.setMinNumSplits(maxSourceParallelism);

> Although I understand that naming is a complex topic in CS, could we
improve this method name a little?

Thanks for pointing out the issue, the current wording does indeed seem to
be confusing. It involves the existing implementation of the
AdaptiveBatchScheduler, where the dynamically inferred parallelism cannot
exceed the JobVertex's maxParallelism (which is typically set to either the
global default max parallelism or the user-specified JobVertex max
parallelism), so the flip maintains the logic. I have modified the flip to
avoid confusion as much as possible.

> And,
`execution.batch.adaptive.auto-parallelism.default-source-parallelism`
config has a default value > with 1, how we distinguish user set it or not
for

> example if user happen to set value 1 ?

We can use Configuration::getOptional to check if the user has configured
the `execution.batch.adaptive.auto-parallelism.default-source-parallelism`.

> No doubt that it’s a API breaking change, for existing hive users, the
migration path is not clear in this FLIP, for example, current users used
splits number

> to infer the source parallelism, after this FLIP, could we give the
recommended value of `execution.batch.adaptive.auto-parallelism.

> default-source-parallelism` or how to set it or event users do not need
to set anythins? And the replacement for migration replacement should add
to 'table.

> exec.hive.infer-source-parallelism’s description when we propose to
change its default value, right?

As a follow-up task, we may have a dedicated discussion in the future to
see if we need to change the default value of
`table.exec.hive.infer-source-parallelism` to false. Before then, user can
manually set `table.exec.hive.infer-source-parallelism` to false to enable
dynamic parallelism inference, and use
`execution.batch.adaptive.auto-parallelism.default-source-parallelism` to
replace `table.exec.hive.infer-source-parallelism.max` as the parallelism
inference upper bound. I have updated both the Flip's
DynamicParallelismInference interface implementation and Migration Plan
modules to illustrate this.

> The pseudocode code shows:

> fileEnumerator.getInferredSourceParallelsim();

> IIRC, our public API FileEnumerator never offers such method, introducing
getInferredSourceParallelsim() is also one part of our FLIP ?

The intent was to make the pseudo code easier to understand, but did
introduce some confusion. There are no plans to introduce
getInferredSourceParallelism() in HiveSource, and I've modified the
HiveSource pseudo code in flip.

Best regards,

Xia

Leonard Xu  于2023年11月16日周四 12:36写道:

> Thanks Xia for the detailed reply.
>
> >> `How user disable the parallelism inference if they want to use fixed
> source parallelism?`
> >> `Could you explain the priority the static parallelism set from table
> layer and the proposed dynamic source parallelism?`
> >
> > From the user's perspective, if the user specifies a fixed parallelism
> for
> > the source, dynamic source parallelism inference will be automatically
> > disabled. From the perspective of priority, the user’s specified
> > parallelism > the static parallelism inference > dynamic parallelism
> > inference. Because the dynamic source parallelism inference will take
> > effect at the runtime stage and the validity conditions are: (1) the
> > current ExecutionGraph is a dynamic graph, and (2) the parallelism of the
> > source vertex is not specified (that is, the parallelism is -1).
>
> The priority explanation make sense to me, could you also add this
> explanation to FLIP?
>
> >> `So, could we consider the boundness info when design the interface?
> Both
> > FileSource and Hive Source offer streaming read ability, imaging this
> case:
> > Flink Streaming Hive Source should not apply the dynamic source
> parallelism
> > even it implemented the feature as it severing a streaming job.`
> >
> > Thanks for your feedback, it is reallly a good input. Currently, the
> > dynamic parallelism inference logic is only triggered in batch jobs.
> > Therefore, the logic will not be called in the streaming jobs.
> > In the future, if streaming jobs also support runtime parallelism
> > inference, then theoretically, the source can no longer be distinguished
> > between streaming jobs and batch jobs at the runtime stage. In addition,
> > since the new interface is implemented together with the Source
> interface,
>

Re: [VOTE] FLIP-381: Deprecate configuration getters/setters that return/set complex Java objects

2023-11-12 Thread Xia Sun
+1 (non-binding)

Best,
Xia

Samrat Deb  于2023年11月13日周一 12:37写道:

> +1 (non binding)
>
> Bests,
> Samrat
>
> On Mon, 13 Nov 2023 at 9:10 AM, Yangze Guo  wrote:
>
> > +1 (binding)
> >
> > Best,
> > Yangze Guo
> >
> > On Mon, Nov 13, 2023 at 11:35 AM weijie guo 
> > wrote:
> > >
> > > +1(binding)
> > >
> > > Best regards,
> > >
> > > Weijie
> > >
> > >
> > > Lijie Wang  于2023年11月13日周一 10:40写道:
> > >
> > > > +1 (binding)
> > > >
> > > > Best,
> > > > Lijie
> > > >
> > > > Yuepeng Pan  于2023年11月10日周五 18:32写道:
> > > >
> > > > > +1(non-binding)
> > > > >
> > > > > Best,
> > > > > Roc
> > > > >
> > > > > On 2023/11/10 03:58:10 Junrui Lee wrote:
> > > > > > Hi everyone,
> > > > > >
> > > > > > Thank you to everyone for the feedback on FLIP-381: Deprecate
> > > > > configuration
> > > > > > getters/setters that return/set complex Java objects[1] which has
> > been
> > > > > > discussed in this thread [2].
> > > > > >
> > > > > > I would like to start a vote for it. The vote will be open for at
> > least
> > > > > 72
> > > > > > hours (excluding weekends) unless there is an objection or not
> > enough
> > > > > votes.
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278464992
> > > > > > [2]
> > https://lists.apache.org/thread/y5owjkfxq3xs9lmpdbl6d6jmqdgbjqxo
> > > > > >
> > > > >
> > > >
> >
>


Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-11-09 Thread Xia Sun
 Thanks Leonard for the feedback and sorry for my late response.

> `How user disable the parallelism inference if they want to use fixed
source parallelism?`
> `Could you explain the priority the static parallelism set from table
layer and the proposed dynamic source parallelism?`

>From the user's perspective, if the user specifies a fixed parallelism for
the source, dynamic source parallelism inference will be automatically
disabled. From the perspective of priority, the user’s specified
parallelism > the static parallelism inference > dynamic parallelism
inference. Because the dynamic source parallelism inference will take
effect at the runtime stage and the validity conditions are: (1) the
current ExecutionGraph is a dynamic graph, and (2) the parallelism of the
source vertex is not specified (that is, the parallelism is -1).

> `the workflow for streaming job may looks like ... which is totally
different, the later one lacks a lot of infra in Flink, right?`

Indeed, as of now, the dynamic parallelism inference is exclusively for
batch jobs, so it only takes into account the necessary information for
batch scenarios. In the future, when we introduce support for automatic
parallelism inference in streaming jobs, we can include the required
information for streaming jobs to avoid unnecessarily complicating the
current design.
Moreover, The workflow you mentioned seems a bit complicated. Our current
idea is to perform the parallelism inference during the initialization
phase of streaming jobs and proceed to schedule the entire job once the
source parallelism is determined. This process will naturally occur during
job startup, eliminating the need for additional restarts.

> `So, could we consider the boundness info when design the interface? Both
FileSource and Hive Source offer streaming read ability, imaging this case:
Flink Streaming Hive Source should not apply the dynamic source parallelism
even it implemented the feature as it severing a streaming job.`

Thanks for your feedback, it is reallly a good input. Currently, the
dynamic parallelism inference logic is only triggered in batch jobs.
Therefore, the logic will not be called in the streaming jobs.
In the future, if streaming jobs also support runtime parallelism
inference, then theoretically, the source can no longer be distinguished
between streaming jobs and batch jobs at the runtime stage. In addition,
since the new interface is implemented together with the Source interface,
the Source::getBoundedness() method can also be obtained when inferring
parallelism.

Best regards,
Xia

Leonard Xu  于2023年11月8日周三 16:19写道:

> Thanks Xia and Zhu Zhu for kickoff this discussion.
>
> The dynamic source parallelism inference is a useful feature for batch
> story. I’ve some comments about current design.
>
> 1.How user disable the parallelism inference if they want to use fixed
> source parallelism? They can configure fixed parallelism in table layer
> currently as you explained above.
>
> 2.Could you explain the priority the static parallelism set from table
> layer and the proposed dynamic source parallelism? And changing the default
> value `table.exec.hive.infer-source-parallelism` as a sub-task does not
> resolve all case, because other Sources can set their own parallelism too.
>
> 3.Current design only works for batch josb, the workflow for streaming job
> may looks like (1) inference  parallelism for streaming source like kafka
> (2) stop job with a savepoint  (3) apply new parallelism for job (4)
> schedule the streaming job from savepoint which is totally different, the
> later one lacks a lot of infra in Flink, right?  So, could we consider the
> boundness info when design the interface? Both FileSource and Hive Source
> offer streaming read ability, imaging this case: Flink Streaming Hive
> Source should not apply the dynamic source parallelism even it implemented
> the feature as it severing a streaming job.
>
> Best,
> Leonard
>
>
> > 2023年11月1日 下午6:21,Xia Sun  写道:
> >
> > Thanks Lijie for the comments!
> > 1. For Hive source, dynamic parallelism inference in batch scenarios is a
> > superset of static parallelism inference. As a follow-up task, we can
> > consider changing the default value of
> > 'table.exec.hive.infer-source-parallelism' to false.
> >
> > 2. I think that both dynamic parallelism inference and static parallelism
> > inference have their own use cases. Currently, for streaming sources and
> > other sources that are not sensitive to dynamic information, the benefits
> > of dynamic parallelism inference may not be significant. In such cases,
> we
> > can continue to use static parallelism inference.
> >
> > Thanks,
> > Xia
> >
> > Lijie Wang  于2023年11月1日周三 14:52写道:
> >
> >> Hi Xia,
> >&

Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-11-01 Thread Xia Sun
Thanks Lijie for the comments!
1. For Hive source, dynamic parallelism inference in batch scenarios is a
superset of static parallelism inference. As a follow-up task, we can
consider changing the default value of
'table.exec.hive.infer-source-parallelism' to false.

2. I think that both dynamic parallelism inference and static parallelism
inference have their own use cases. Currently, for streaming sources and
other sources that are not sensitive to dynamic information, the benefits
of dynamic parallelism inference may not be significant. In such cases, we
can continue to use static parallelism inference.

Thanks,
Xia

Lijie Wang  于2023年11月1日周三 14:52写道:

> Hi Xia,
>
> Thanks for driving this FLIP, +1 for the proposal.
>
> I have 2 questions about the relationship between static inference and
> dynamic inference:
>
> 1. AFAIK, currently the hive table source enable static inference by
> default. In this case, which one (static vs dynamic) will take effect ? I
> think it would be better if we can point this out in FLIP
>
> 2. As you mentioned above, dynamic inference is the most ideal way, so do
> we have plan to deprecate the static inference in the future?
>
> Best,
> Lijie
>
> Zhu Zhu  于2023年10月31日周二 20:19写道:
>
> > Thanks for opening the FLIP and kicking off this discussion, Xia!
> > The proposed changes make up an important missing part of the dynamic
> > parallelism inference of adaptive batch scheduler.
> >
> > Besides that, it is also one good step towards supporting dynamic
> > parallelism inference for streaming sources, e.g. allowing Kafka
> > sources to determine its parallelism automatically based on the
> > number of partitions.
> >
> > +1 for the proposal.
> >
> > Thanks,
> > Zhu
> >
> > Xia Sun  于2023年10月31日周二 16:01写道:
> >
> > > Hi everyone,
> > > I would like to start a discussion on FLIP-379: Dynamic source
> > parallelism
> > > inference for batch jobs[1].
> > >
> > > In general, there are three main ways to set source parallelism for
> batch
> > > jobs:
> > > (1) User-defined source parallelism.
> > > (2) Connector static parallelism inference.
> > > (3) Dynamic parallelism inference.
> > >
> > > Compared to manually setting parallelism, automatic parallelism
> inference
> > > is easier to use and can better adapt to varying data volumes each day.
> > > However, static parallelism inference cannot leverage runtime
> > information,
> > > resulting in inaccurate parallelism inference. Therefore, for batch
> jobs,
> > > dynamic parallelism inference is the most ideal, but currently, the
> > support
> > > for adaptive batch scheduler is not very comprehensive.
> > >
> > > Therefore, we aim to introduce a general interface that enables the
> > > adaptive batch scheduler to dynamically infer the source parallelism at
> > > runtime. Please refer to the FLIP[1] document for more details about
> the
> > > proposed design and implementation.
> > >
> > > I also thank Zhu Zhu and LiJie Wang for their suggestions during the
> > > pre-discussion.
> > > Looking forward to your feedback and suggestions, thanks.
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs
> > >
> > > Best regards,
> > > Xia
> > >
> >
>


[DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-10-31 Thread Xia Sun
Hi everyone,
I would like to start a discussion on FLIP-379: Dynamic source parallelism
inference for batch jobs[1].

In general, there are three main ways to set source parallelism for batch
jobs:
(1) User-defined source parallelism.
(2) Connector static parallelism inference.
(3) Dynamic parallelism inference.

Compared to manually setting parallelism, automatic parallelism inference
is easier to use and can better adapt to varying data volumes each day.
However, static parallelism inference cannot leverage runtime information,
resulting in inaccurate parallelism inference. Therefore, for batch jobs,
dynamic parallelism inference is the most ideal, but currently, the support
for adaptive batch scheduler is not very comprehensive.

Therefore, we aim to introduce a general interface that enables the
adaptive batch scheduler to dynamically infer the source parallelism at
runtime. Please refer to the FLIP[1] document for more details about the
proposed design and implementation.

I also thank Zhu Zhu and LiJie Wang for their suggestions during the
pre-discussion.
Looking forward to your feedback and suggestions, thanks.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs

Best regards,
Xia


[jira] [Created] (FLINK-32975) Enhance equal() for ChangelogMapState's iterator

2023-08-28 Thread Rui Xia (Jira)
Rui Xia created FLINK-32975:
---

 Summary: Enhance equal() for ChangelogMapState's iterator
 Key: FLINK-32975
 URL: https://issues.apache.org/jira/browse/FLINK-32975
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends, Tests
Reporter: Rui Xia






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32957) Add current timer trigger lag to metrics

2023-08-25 Thread Rui Xia (Jira)
Rui Xia created FLINK-32957:
---

 Summary: Add current timer trigger lag to metrics
 Key: FLINK-32957
 URL: https://issues.apache.org/jira/browse/FLINK-32957
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Metrics, Runtime / State Backends
Reporter: Rui Xia






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32954) Metrics expose number of heap timer

2023-08-24 Thread Rui Xia (Jira)
Rui Xia created FLINK-32954:
---

 Summary: Metrics expose number of heap timer
 Key: FLINK-32954
 URL: https://issues.apache.org/jira/browse/FLINK-32954
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Metrics, Runtime / State Backends
Reporter: Rui Xia






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-24 Thread Xia Sun
+1 (non-binding)

Best Regards,

Xia

yuxia  于2023年6月25日周日 09:23写道:

> +1 (binding)
> Thanks Lijie driving it.
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "Yuepeng Pan" 
> 收件人: "dev" 
> 发送时间: 星期六, 2023年 6 月 24日 下午 9:06:53
> 主题: Re:[VOTE] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs
>
> +1 (non-binding)
>
> Thanks,
> Yuepeng Pan
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> At 2023-06-23 23:49:53, "Lijie Wang"  wrote:
> >Hi all,
> >
> >Thanks for all the feedback about the FLIP-324: Introduce Runtime Filter
> >for Flink Batch Jobs[1]. This FLIP was discussed in [2].
> >
> >I'd like to start a vote for it. The vote will be open for at least 72
> >hours (until June 29th 12:00 GMT) unless there is an objection or
> >insufficient votes.
> >
> >[1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-324%3A+Introduce+Runtime+Filter+for+Flink+Batch+Jobs
> >[2] https://lists.apache.org/thread/mm0o8fv7x7k13z11htt88zhy7lo8npmg
> >
> >Best,
> >Lijie
>


Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage

2023-03-12 Thread Xia Sun
Hi Yuxin,

Thanks for creating this FLIP!
I'm a flink user, and in our internal scenario we use the colocation
technology to run flink jobs and online service on the same machine
together. We found that flink jobs are occasionally affected by other
non-flink jobs (i.e. if the host disk space is full, that will result in
'No space left on device' error on flink jobs). This flip will really help
us to benefit from hybrid shuffle without being worried about insufficient
disk space problem.

And I also have a few questions.
1. If the same subpartition spans multiple different tiers, how to keep the
order of segments between different storage tiers (if necessary)?
2. In the process of writing to the local disk for a subpartition, what
will happen if the disk space is found to be full? Will it report an error
or automatically transfer to remote storage?
3. For remote storage, I noticed that it uses direct reading, which is
different from the other two, does the switching between different tiers
will bring overhead or waiting? In addition, compared to flink rss, which
optimizes data compression and small file merging to improve throughput and
relieve file system pressure, does the object storage system can meet the
performance requirements and concurrent access challenges of large-scale
batch jobs(parallelism > 1)?

Thanks,
Xia

Zhu Zhu  于2023年3月10日周五 16:44写道:

> Hi Yuxin,
>
> Thanks for creating this FLIP!
> The idea of tiered storage looks good. Instead of choosing one from
> multiple storages, it can help to balance between performance, cost and
> stability. It also has the potential to adaptively select proper tiers
> according to more runtime information, to achieve better performance
> and ease of use.
>
> I have a question about the tier finding of data reading. In the FLIP
> it proposes that the Read Client asks each storage tier whether a
> given segment exists in it, from higher priority tiers to lower priority
> ones. I'm a bit concerned about the cost of it, especially when data
> are written to low priority tiers. Do you have any evaluation of it?
> Is it possible to let the Reader Client know the location of the next
> segment when it has finished reading one segment? Or maybe just let it
> know whether the next segment is located in the same tier, if we can
> have the assumption that tier changing would not be very frequent.
>
> Thanks,
> Zhu
>
> Weihua Hu  于2023年3月10日周五 11:52写道:
> >
> > Thanks Yuxin for your explanation.
> >
> > That sounds reasonable. Looking forward to the new shuffle.
> >
> >
> > Best,
> > Weihua
> >
> >
> > On Fri, Mar 10, 2023 at 11:48 AM Yuxin Tan 
> wrote:
> >
> > > Hi, Weihua,
> > > Thanks for the questions and the ideas.
> > >
> > > > 1. How many performance regressions would there be if we only
> > > used remote storage?
> > >
> > > The new architecture can support to use remote storage only, but this
> > > FLIP target is to improve job stability. And the change in the FLIP has
> > > been significantly complex and the goal of the first version is to
> update
> > > Hybrid Shuffle to the new architecture and support remote storage as
> > > a supplement. The performance of this version is not the first
> priority,
> > > so we haven’t tested the performance of using only remote storage.
> > > If there are indeed regressions, we will keep optimizing the
> performance
> > > of the remote storages and improve it until only remote storage is
> > > available in the production environment.
> > >
> > > > 2. Shall we move the local data to remote storage if the producer is
> > > finished for a long time?
> > >
> > > I agree that it is a good idea, which can release task manager
> resources
> > > more timely. But moving data from TM local disk to remote storage needs
> > > more detailed discussion and design, and it is easier to implement it
> based
> > > on the new architecture. Considering the complexity, the target focus,
> and
> > > the iteration cycle of the FLIP, we decide that the details are not
> > > included
> > > in the first version. We will extend and implement them in the
> subsequent
> > > versions.
> > >
> > > Best,
> > > Yuxin
> > >
> > >
> > > Weihua Hu  于2023年3月9日周四 11:22写道:
> > >
> > > > Hi, Yuxin
> > > >
> > > > Thanks for driving this FLIP.
> > > >
> > > > The remote storage shuffle could improve the stability of Batch jobs.
> > > >
> > > > In our internal scenario, we use a hybrid cluster to run

[jira] [Created] (FLINK-30235) Comprehensive benchmarks on changelog checkpointing

2022-11-28 Thread Rui Xia (Jira)
Rui Xia created FLINK-30235:
---

 Summary: Comprehensive benchmarks on changelog checkpointing
 Key: FLINK-30235
 URL: https://issues.apache.org/jira/browse/FLINK-30235
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing, Runtime / State Backends
Reporter: Rui Xia


Changelog checkpointing is functionally usable right now. To make it as a 
productive feature, more comprehensive benchmarks are required. In this issue, 
I aim to answer the following two major concerns:
 * The expansion of full checkpoint size caused by changelog persistence;
 * The TPS regression caused by DTSL double-write;

By the way, I will also present other metrics related to checkpointing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-21415) JDBC does not query data, and the cache will store an ArrayList without data

2021-02-19 Thread Shuai Xia (Jira)
Shuai Xia created FLINK-21415:
-

 Summary: JDBC does not query data, and the cache will store an 
ArrayList without data
 Key: FLINK-21415
 URL: https://issues.apache.org/jira/browse/FLINK-21415
 Project: Flink
  Issue Type: Bug
  Components: Connectors / JDBC
Affects Versions: 1.12.1
Reporter: Shuai Xia


JDBC does not query data, and the cache will store an ArrayList without data.

We should add size judgment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Error with Flink-Gelly, lastJobExecutionResult is null

2020-08-02 Thread Xia Rui
Hello, everyone.

I am using Flink-Gelly. I got an error when running the example code of
Gelly-example. I have reported the problem in Stackoverflow, and this is the
link
(https://stackoverflow.com/questions/63211746/error-with-flink-gelly-lastjob
executionresult-is-null-for-executionenvironment)

 

I am trying to figure out the error point. I traced the env.execute()
function, and it is actually invoked in ContextEnvironment::execute() (link:
https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/
apache/flink/client/program/ContextEnvironment.java#L71). In fact, the
variable lastJobExecutionResult (from ContextEnvironment's super class
ExecutionEnvironment) is not set.

 

I checked the history of ContextEnvironment, and find that
lastJobExecutionResult is exclude from ClusterClient in FLINK-14456
(https://issues.apache.org/jira/browse/FLINK-14456). This is merged to
master after flink-1.10.

 

I was wondering If I could set the lastJobExecutionResult in
ContextEnvironment::execute() for my case (run Flink-Gelly on flink >= 1.10)
without significant side effect.

 

Thank you.



[jira] [Created] (FLINK-18147) Orc document display is disordered

2020-06-05 Thread Shuai Xia (Jira)
Shuai Xia created FLINK-18147:
-

 Summary: Orc document display is disordered
 Key: FLINK-18147
 URL: https://issues.apache.org/jira/browse/FLINK-18147
 Project: Flink
  Issue Type: Bug
  Components: Documentation, FileSystems
Affects Versions: 1.11.0
Reporter: Shuai Xia
 Fix For: 1.11.0


Official documents show that there is a problem.

link: 
[https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html#tab_java_2]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)