Hi all,
Offline discussed with Zhu Zhu and Leonard Xu and we have reached the
following three points of consensus:

1. Rename the interface method Context#getMaxSourceParallelism proposed by
the FLIP to Context#getParallelismInferenceUpperBound, to make the meaning
of the method clearer. See [1] for details.

2. We provide a more detailed explanation of the effective priority of the
dynamic source parallelism inference proposed by this FLIP and the order of
values for the upper bound of source parallelism. We also point out the
current support and limitations of the AdaptiveBatchScheduler regarding
source parallelism inference. See [2] for details.

3. This FLIP will only focus on the framework-level implementation and will
prioritize the implementation of FileSource as an example of the new
interface proposed by the FLIP. The HiveSource, due to its existing static
parallelism dynamic inference, and changes in default values for
configuration items such as `table.exec.hive.infer-source-parallelism`,
requires a more detailed migration plan, as well as more comprehensive
design and discussion. It is not suitable as part of this FLIP and needs a
separate FLIP. Therefore, we have removed the HiveSource part from this
FLIP.

Thanks again to everyone who participated in the discussion.
Looking forward to your continued feedback.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs#FLIP379:Dynamicsourceparallelisminferenceforbatchjobs-IntroduceDynamicParallelismInferenceinterfaceforSource
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs#FLIP379:Dynamicsourceparallelisminferenceforbatchjobs-ConfigurationBehaviorChanges

Best,
Xia

Leonard Xu <xbjt...@gmail.com> 于2023年11月22日周三 18:37写道:

> Thanks Xia for the  reply, sorry for the late reply.
>
> > Thanks for pointing out the issue, the current wording does indeed seem
> to
> > be confusing. It involves the existing implementation of the
> > AdaptiveBatchScheduler, where the dynamically inferred parallelism cannot
> > exceed the JobVertex's maxParallelism (which is typically set to either
> the
> > global default max parallelism or the user-specified JobVertex max
> > parallelism), so the flip maintains the logic. I have modified the flip
> to
> > avoid confusion as much as possible.
>
> I didn’t see the change part in this FLIP, could you check it?
>
> > We can use Configuration::getOptional to check if the user has configured
> > the
> `execution.batch.adaptive.auto-parallelism.default-source-parallelism`.
>
> Using Readable#getOptional(ConfigOption<T> option) makes sense to me.
>
> > As a follow-up task, we may have a dedicated discussion in the future to
> > see if we need to change the default value of
> > `table.exec.hive.infer-source-parallelism` to false. Before then, user
> can
> > manually set `table.exec.hive.infer-source-parallelism` to false to
> enable
> > dynamic parallelism inference, and use
> > `execution.batch.adaptive.auto-parallelism.default-source-parallelism` to
> > replace `table.exec.hive.infer-source-parallelism.max` as the parallelism
> > inference upper bound. I have updated both the Flip's
> > DynamicParallelismInference interface implementation and Migration Plan
> > modules to illustrate this.
>
> In my opinion, moving HiveSource to subsequent discussion is not OK, see
> my explanation: HiveSource supports dynamic source parallel inference is
> one part of the FLIP implementation, it looks like that we introduce a
> configuration
> `execution.batch.adaptive.auto-parallelism.default-source-parallelism`
> which conflicts with existing configuration `table.exec.hive.infer-
> source-parallelism`. But we do not provide conflict resolution in the
> current flip, just postpone the work to future discussions, this uncertain
> state is likely to cause subsequent discussions to be shelved from past
> community work experience. I’d suggest we make this part clear in this FLIP.
>
>
> Best,
> Leonard
>
>
> >
> >
> > Leonard Xu <xbjt...@gmail.com> 于2023年11月16日周四 12:36写道:
> >
> >> Thanks Xia for the detailed reply.
> >>
> >>>> `How user disable the parallelism inference if they want to use fixed
> >> source parallelism?`
> >>>> `Could you explain the priority the static parallelism set from table
> >> layer and the proposed dynamic source parallelism?`
> >>>
> >>> From the user's perspective, if the user specifies a fixed parallelism
> >> for
> >>> the source, dynamic source parallelism inference will be automatically
> >>> disabled. From the perspective of priority, the user’s specified
> >>> parallelism > the static parallelism inference > dynamic parallelism
> >>> inference. Because the dynamic source parallelism inference will take
> >>> effect at the runtime stage and the validity conditions are: (1) the
> >>> current ExecutionGraph is a dynamic graph, and (2) the parallelism of
> the
> >>> source vertex is not specified (that is, the parallelism is -1).
> >>
> >> The priority explanation make sense to me, could you also add this
> >> explanation to FLIP?
> >>
> >>>> `So, could we consider the boundness info when design the interface?
> >> Both
> >>> FileSource and Hive Source offer streaming read ability, imaging this
> >> case:
> >>> Flink Streaming Hive Source should not apply the dynamic source
> >> parallelism
> >>> even it implemented the feature as it severing a streaming job.`
> >>>
> >>> Thanks for your feedback, it is reallly a good input. Currently, the
> >>> dynamic parallelism inference logic is only triggered in batch jobs.
> >>> Therefore, the logic will not be called in the streaming jobs.
> >>> In the future, if streaming jobs also support runtime parallelism
> >>> inference, then theoretically, the source can no longer be
> distinguished
> >>> between streaming jobs and batch jobs at the runtime stage. In
> addition,
> >>> since the new interface is implemented together with the Source
> >> interface,
> >>> the Source::getBoundedness() method can also be obtained when inferring
> >>> parallelism.
> >>
> >> +1 about the boundedness info part.
> >>
> >> Okay, let’s come back the batch world and discuss some details about
> >> current design:
> >>
> >> (1) About  max source parallelism
> >> The FLIP said:  (1) Max source parallelism, which is calculated as the
> >> minimum of the default source parallelism
> >> (`execution.batch.adaptive.auto-parallelism.default-source-parallelism`)
> >> and JobVertex#maxParallelism. If the default-source-parallelism is not
> set,
> >> the global default parallelism is used as the default source
> parallelism.
> >>
> >> The  'Max source parallelism’ is the information that runtime offered to
> >> Source as a hint to infer the actual parallelism, a name with max prefix
> >> but calculated with minimum value confusing me a lot, especially when I
> >> read the HiveSource pseudocode:
> >>
> >> fileEnumerator.setMinNumSplits(maxSourceParallelism);
> >>
> >> Although I understand that naming is a complex topic in CS, could we
> >> improve this method name a little?  And,
> >> `execution.batch.adaptive.auto-parallelism.default-source-parallelism`
> >> config has a default value with 1, how we distinguish user set it or not
> >> for example if user happen to  set value 1 ?
> >>
> >> (2) About changing default value of
> >> 'table.exec.hive.infer-source-parallelism'
> >> The FLIP said:  As a follow-up task, we will consider changing the
> default
> >> value of 'table.exec.hive.infer-source-parallelism' to false.
> >>
> >> No doubt that it’s a API breaking change, for existing hive users, the
> >> migration path is not clear in this FLIP, for example, current users
> used
> >> splits number to infer the source parallelism, after this FLIP,  could
> we
> >> give the recommended value of
> >> `execution.batch.adaptive.auto-parallelism.default-source-parallelism`
> or
> >> how to set it or event users do not need to set  anythins? And the
> >> replacement for migration replacement should add to
> >> 'table.exec.hive.infer-source-parallelism’s description when we propose
> to
> >> change its default value, right?
> >>
> >> (3) [minor] About the HiveSource
> >> The pseudocode  code shows:
> >>
> >> fileEnumerator.getInferredSourceParallelsim();
> >>
> >> IIRC, our public API FileEnumerator never offers such method,
> introducing
> >> getInferredSourceParallelsim() is also one part of our FLIP ?
> >>
> >> Best,
> >> Leonard
> >>
> >>
> >>
> >>>
> >>> Best regards,
> >>> Xia
> >>>
> >>> Leonard Xu <xbjt...@gmail.com> 于2023年11月8日周三 16:19写道:
> >>>
> >>>> Thanks Xia and Zhu Zhu for kickoff this discussion.
> >>>>
> >>>> The dynamic source parallelism inference is a useful feature for batch
> >>>> story. I’ve some comments about current design.
> >>>>
> >>>> 1.How user disable the parallelism inference if they want to use fixed
> >>>> source parallelism? They can configure fixed parallelism in table
> layer
> >>>> currently as you explained above.
> >>>>
> >>>> 2.Could you explain the priority the static parallelism set from table
> >>>> layer and the proposed dynamic source parallelism? And changing the
> >> default
> >>>> value `table.exec.hive.infer-source-parallelism` as a sub-task does
> not
> >>>> resolve all case, because other Sources can set their own parallelism
> >> too.
> >>>>
> >>>> 3.Current design only works for batch josb, the workflow for streaming
> >> job
> >>>> may looks like (1) inference  parallelism for streaming source like
> >> kafka
> >>>> (2) stop job with a savepoint  (3) apply new parallelism for job (4)
> >>>> schedule the streaming job from savepoint which is totally different,
> >> the
> >>>> later one lacks a lot of infra in Flink, right?  So, could we consider
> >> the
> >>>> boundness info when design the interface? Both FileSource and Hive
> >> Source
> >>>> offer streaming read ability, imaging this case: Flink Streaming Hive
> >>>> Source should not apply the dynamic source parallelism even it
> >> implemented
> >>>> the feature as it severing a streaming job.
> >>>>
> >>>> Best,
> >>>> Leonard
> >>>>
> >>>>
> >>>>> 2023年11月1日 下午6:21,Xia Sun <xingbe...@gmail.com> 写道:
> >>>>>
> >>>>> Thanks Lijie for the comments!
> >>>>> 1. For Hive source, dynamic parallelism inference in batch scenarios
> >> is a
> >>>>> superset of static parallelism inference. As a follow-up task, we can
> >>>>> consider changing the default value of
> >>>>> 'table.exec.hive.infer-source-parallelism' to false.
> >>>>>
> >>>>> 2. I think that both dynamic parallelism inference and static
> >> parallelism
> >>>>> inference have their own use cases. Currently, for streaming sources
> >> and
> >>>>> other sources that are not sensitive to dynamic information, the
> >> benefits
> >>>>> of dynamic parallelism inference may not be significant. In such
> cases,
> >>>> we
> >>>>> can continue to use static parallelism inference.
> >>>>>
> >>>>> Thanks,
> >>>>> Xia
> >>>>>
> >>>>> Lijie Wang <wangdachui9...@gmail.com> 于2023年11月1日周三 14:52写道:
> >>>>>
> >>>>>> Hi Xia,
> >>>>>>
> >>>>>> Thanks for driving this FLIP, +1 for the proposal.
> >>>>>>
> >>>>>> I have 2 questions about the relationship between static inference
> and
> >>>>>> dynamic inference:
> >>>>>>
> >>>>>> 1. AFAIK, currently the hive table source enable static inference by
> >>>>>> default. In this case, which one (static vs dynamic) will take
> effect
> >> ?
> >>>> I
> >>>>>> think it would be better if we can point this out in FLIP
> >>>>>>
> >>>>>> 2. As you mentioned above, dynamic inference is the most ideal way,
> so
> >>>> do
> >>>>>> we have plan to deprecate the static inference in the future?
> >>>>>>
> >>>>>> Best,
> >>>>>> Lijie
> >>>>>>
> >>>>>> Zhu Zhu <reed...@gmail.com> 于2023年10月31日周二 20:19写道:
> >>>>>>
> >>>>>>> Thanks for opening the FLIP and kicking off this discussion, Xia!
> >>>>>>> The proposed changes make up an important missing part of the
> dynamic
> >>>>>>> parallelism inference of adaptive batch scheduler.
> >>>>>>>
> >>>>>>> Besides that, it is also one good step towards supporting dynamic
> >>>>>>> parallelism inference for streaming sources, e.g. allowing Kafka
> >>>>>>> sources to determine its parallelism automatically based on the
> >>>>>>> number of partitions.
> >>>>>>>
> >>>>>>> +1 for the proposal.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Zhu
> >>>>>>>
> >>>>>>> Xia Sun <xingbe...@gmail.com> 于2023年10月31日周二 16:01写道:
> >>>>>>>
> >>>>>>>> Hi everyone,
> >>>>>>>> I would like to start a discussion on FLIP-379: Dynamic source
> >>>>>>> parallelism
> >>>>>>>> inference for batch jobs[1].
> >>>>>>>>
> >>>>>>>> In general, there are three main ways to set source parallelism
> for
> >>>>>> batch
> >>>>>>>> jobs:
> >>>>>>>> (1) User-defined source parallelism.
> >>>>>>>> (2) Connector static parallelism inference.
> >>>>>>>> (3) Dynamic parallelism inference.
> >>>>>>>>
> >>>>>>>> Compared to manually setting parallelism, automatic parallelism
> >>>>>> inference
> >>>>>>>> is easier to use and can better adapt to varying data volumes each
> >>>> day.
> >>>>>>>> However, static parallelism inference cannot leverage runtime
> >>>>>>> information,
> >>>>>>>> resulting in inaccurate parallelism inference. Therefore, for
> batch
> >>>>>> jobs,
> >>>>>>>> dynamic parallelism inference is the most ideal, but currently,
> the
> >>>>>>> support
> >>>>>>>> for adaptive batch scheduler is not very comprehensive.
> >>>>>>>>
> >>>>>>>> Therefore, we aim to introduce a general interface that enables
> the
> >>>>>>>> adaptive batch scheduler to dynamically infer the source
> parallelism
> >>>> at
> >>>>>>>> runtime. Please refer to the FLIP[1] document for more details
> about
> >>>>>> the
> >>>>>>>> proposed design and implementation.
> >>>>>>>>
> >>>>>>>> I also thank Zhu Zhu and LiJie Wang for their suggestions during
> the
> >>>>>>>> pre-discussion.
> >>>>>>>> Looking forward to your feedback and suggestions, thanks.
> >>>>>>>>
> >>>>>>>> [1]
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs
> >>>>>>>>
> >>>>>>>> Best regards,
> >>>>>>>> Xia
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>>>
> >>
> >>
>
>

Reply via email to