Hi everyone, Thanks for all the comments! I will initiate the vote tomorrow if there is no further discussion.
Best, Xia Leonard Xu <xbjt...@gmail.com> 于2023年11月24日周五 18:50写道: > Thanks Xia and Zhu Zhu for driving this work, > > It will help unify the parallelism inference for all operators of batch > job, the updated FLIP looks good to me. > > Best, > Leonard > > > > 2023年11月24日 下午5:53,Xia Sun <xingbe...@gmail.com> 写道: > > > > Hi all, > > Offline discussed with Zhu Zhu and Leonard Xu and we have reached the > > following three points of consensus: > > > > 1. Rename the interface method Context#getMaxSourceParallelism proposed > by > > the FLIP to Context#getParallelismInferenceUpperBound, to make the > meaning > > of the method clearer. See [1] for details. > > > > 2. We provide a more detailed explanation of the effective priority of > the > > dynamic source parallelism inference proposed by this FLIP and the order > of > > values for the upper bound of source parallelism. We also point out the > > current support and limitations of the AdaptiveBatchScheduler regarding > > source parallelism inference. See [2] for details. > > > > 3. This FLIP will only focus on the framework-level implementation and > will > > prioritize the implementation of FileSource as an example of the new > > interface proposed by the FLIP. The HiveSource, due to its existing > static > > parallelism dynamic inference, and changes in default values for > > configuration items such as `table.exec.hive.infer-source-parallelism`, > > requires a more detailed migration plan, as well as more comprehensive > > design and discussion. It is not suitable as part of this FLIP and needs > a > > separate FLIP. Therefore, we have removed the HiveSource part from this > > FLIP. > > > > Thanks again to everyone who participated in the discussion. > > Looking forward to your continued feedback. > > > > [1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs#FLIP379:Dynamicsourceparallelisminferenceforbatchjobs-IntroduceDynamicParallelismInferenceinterfaceforSource > > [2] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs#FLIP379:Dynamicsourceparallelisminferenceforbatchjobs-ConfigurationBehaviorChanges > > > > Best, > > Xia > > > > Leonard Xu <xbjt...@gmail.com> 于2023年11月22日周三 18:37写道: > > > >> Thanks Xia for the reply, sorry for the late reply. > >> > >>> Thanks for pointing out the issue, the current wording does indeed seem > >> to > >>> be confusing. It involves the existing implementation of the > >>> AdaptiveBatchScheduler, where the dynamically inferred parallelism > cannot > >>> exceed the JobVertex's maxParallelism (which is typically set to either > >> the > >>> global default max parallelism or the user-specified JobVertex max > >>> parallelism), so the flip maintains the logic. I have modified the flip > >> to > >>> avoid confusion as much as possible. > >> > >> I didn’t see the change part in this FLIP, could you check it? > >> > >>> We can use Configuration::getOptional to check if the user has > configured > >>> the > >> `execution.batch.adaptive.auto-parallelism.default-source-parallelism`. > >> > >> Using Readable#getOptional(ConfigOption<T> option) makes sense to me. > >> > >>> As a follow-up task, we may have a dedicated discussion in the future > to > >>> see if we need to change the default value of > >>> `table.exec.hive.infer-source-parallelism` to false. Before then, user > >> can > >>> manually set `table.exec.hive.infer-source-parallelism` to false to > >> enable > >>> dynamic parallelism inference, and use > >>> `execution.batch.adaptive.auto-parallelism.default-source-parallelism` > to > >>> replace `table.exec.hive.infer-source-parallelism.max` as the > parallelism > >>> inference upper bound. I have updated both the Flip's > >>> DynamicParallelismInference interface implementation and Migration Plan > >>> modules to illustrate this. > >> > >> In my opinion, moving HiveSource to subsequent discussion is not OK, see > >> my explanation: HiveSource supports dynamic source parallel inference is > >> one part of the FLIP implementation, it looks like that we introduce a > >> configuration > >> `execution.batch.adaptive.auto-parallelism.default-source-parallelism` > >> which conflicts with existing configuration `table.exec.hive.infer- > >> source-parallelism`. But we do not provide conflict resolution in the > >> current flip, just postpone the work to future discussions, this > uncertain > >> state is likely to cause subsequent discussions to be shelved from past > >> community work experience. I’d suggest we make this part clear in this > FLIP. > >> > >> > >> Best, > >> Leonard > >> > >> > >>> > >>> > >>> Leonard Xu <xbjt...@gmail.com> 于2023年11月16日周四 12:36写道: > >>> > >>>> Thanks Xia for the detailed reply. > >>>> > >>>>>> `How user disable the parallelism inference if they want to use > fixed > >>>> source parallelism?` > >>>>>> `Could you explain the priority the static parallelism set from > table > >>>> layer and the proposed dynamic source parallelism?` > >>>>> > >>>>> From the user's perspective, if the user specifies a fixed > parallelism > >>>> for > >>>>> the source, dynamic source parallelism inference will be > automatically > >>>>> disabled. From the perspective of priority, the user’s specified > >>>>> parallelism > the static parallelism inference > dynamic parallelism > >>>>> inference. Because the dynamic source parallelism inference will take > >>>>> effect at the runtime stage and the validity conditions are: (1) the > >>>>> current ExecutionGraph is a dynamic graph, and (2) the parallelism of > >> the > >>>>> source vertex is not specified (that is, the parallelism is -1). > >>>> > >>>> The priority explanation make sense to me, could you also add this > >>>> explanation to FLIP? > >>>> > >>>>>> `So, could we consider the boundness info when design the interface? > >>>> Both > >>>>> FileSource and Hive Source offer streaming read ability, imaging this > >>>> case: > >>>>> Flink Streaming Hive Source should not apply the dynamic source > >>>> parallelism > >>>>> even it implemented the feature as it severing a streaming job.` > >>>>> > >>>>> Thanks for your feedback, it is reallly a good input. Currently, the > >>>>> dynamic parallelism inference logic is only triggered in batch jobs. > >>>>> Therefore, the logic will not be called in the streaming jobs. > >>>>> In the future, if streaming jobs also support runtime parallelism > >>>>> inference, then theoretically, the source can no longer be > >> distinguished > >>>>> between streaming jobs and batch jobs at the runtime stage. In > >> addition, > >>>>> since the new interface is implemented together with the Source > >>>> interface, > >>>>> the Source::getBoundedness() method can also be obtained when > inferring > >>>>> parallelism. > >>>> > >>>> +1 about the boundedness info part. > >>>> > >>>> Okay, let’s come back the batch world and discuss some details about > >>>> current design: > >>>> > >>>> (1) About max source parallelism > >>>> The FLIP said: (1) Max source parallelism, which is calculated as the > >>>> minimum of the default source parallelism > >>>> > (`execution.batch.adaptive.auto-parallelism.default-source-parallelism`) > >>>> and JobVertex#maxParallelism. If the default-source-parallelism is not > >> set, > >>>> the global default parallelism is used as the default source > >> parallelism. > >>>> > >>>> The 'Max source parallelism’ is the information that runtime offered > to > >>>> Source as a hint to infer the actual parallelism, a name with max > prefix > >>>> but calculated with minimum value confusing me a lot, especially when > I > >>>> read the HiveSource pseudocode: > >>>> > >>>> fileEnumerator.setMinNumSplits(maxSourceParallelism); > >>>> > >>>> Although I understand that naming is a complex topic in CS, could we > >>>> improve this method name a little? And, > >>>> `execution.batch.adaptive.auto-parallelism.default-source-parallelism` > >>>> config has a default value with 1, how we distinguish user set it or > not > >>>> for example if user happen to set value 1 ? > >>>> > >>>> (2) About changing default value of > >>>> 'table.exec.hive.infer-source-parallelism' > >>>> The FLIP said: As a follow-up task, we will consider changing the > >> default > >>>> value of 'table.exec.hive.infer-source-parallelism' to false. > >>>> > >>>> No doubt that it’s a API breaking change, for existing hive users, the > >>>> migration path is not clear in this FLIP, for example, current users > >> used > >>>> splits number to infer the source parallelism, after this FLIP, could > >> we > >>>> give the recommended value of > >>>> `execution.batch.adaptive.auto-parallelism.default-source-parallelism` > >> or > >>>> how to set it or event users do not need to set anythins? And the > >>>> replacement for migration replacement should add to > >>>> 'table.exec.hive.infer-source-parallelism’s description when we > propose > >> to > >>>> change its default value, right? > >>>> > >>>> (3) [minor] About the HiveSource > >>>> The pseudocode code shows: > >>>> > >>>> fileEnumerator.getInferredSourceParallelsim(); > >>>> > >>>> IIRC, our public API FileEnumerator never offers such method, > >> introducing > >>>> getInferredSourceParallelsim() is also one part of our FLIP ? > >>>> > >>>> Best, > >>>> Leonard > >>>> > >>>> > >>>> > >>>>> > >>>>> Best regards, > >>>>> Xia > >>>>> > >>>>> Leonard Xu <xbjt...@gmail.com> 于2023年11月8日周三 16:19写道: > >>>>> > >>>>>> Thanks Xia and Zhu Zhu for kickoff this discussion. > >>>>>> > >>>>>> The dynamic source parallelism inference is a useful feature for > batch > >>>>>> story. I’ve some comments about current design. > >>>>>> > >>>>>> 1.How user disable the parallelism inference if they want to use > fixed > >>>>>> source parallelism? They can configure fixed parallelism in table > >> layer > >>>>>> currently as you explained above. > >>>>>> > >>>>>> 2.Could you explain the priority the static parallelism set from > table > >>>>>> layer and the proposed dynamic source parallelism? And changing the > >>>> default > >>>>>> value `table.exec.hive.infer-source-parallelism` as a sub-task does > >> not > >>>>>> resolve all case, because other Sources can set their own > parallelism > >>>> too. > >>>>>> > >>>>>> 3.Current design only works for batch josb, the workflow for > streaming > >>>> job > >>>>>> may looks like (1) inference parallelism for streaming source like > >>>> kafka > >>>>>> (2) stop job with a savepoint (3) apply new parallelism for job (4) > >>>>>> schedule the streaming job from savepoint which is totally > different, > >>>> the > >>>>>> later one lacks a lot of infra in Flink, right? So, could we > consider > >>>> the > >>>>>> boundness info when design the interface? Both FileSource and Hive > >>>> Source > >>>>>> offer streaming read ability, imaging this case: Flink Streaming > Hive > >>>>>> Source should not apply the dynamic source parallelism even it > >>>> implemented > >>>>>> the feature as it severing a streaming job. > >>>>>> > >>>>>> Best, > >>>>>> Leonard > >>>>>> > >>>>>> > >>>>>>> 2023年11月1日 下午6:21,Xia Sun <xingbe...@gmail.com> 写道: > >>>>>>> > >>>>>>> Thanks Lijie for the comments! > >>>>>>> 1. For Hive source, dynamic parallelism inference in batch > scenarios > >>>> is a > >>>>>>> superset of static parallelism inference. As a follow-up task, we > can > >>>>>>> consider changing the default value of > >>>>>>> 'table.exec.hive.infer-source-parallelism' to false. > >>>>>>> > >>>>>>> 2. I think that both dynamic parallelism inference and static > >>>> parallelism > >>>>>>> inference have their own use cases. Currently, for streaming > sources > >>>> and > >>>>>>> other sources that are not sensitive to dynamic information, the > >>>> benefits > >>>>>>> of dynamic parallelism inference may not be significant. In such > >> cases, > >>>>>> we > >>>>>>> can continue to use static parallelism inference. > >>>>>>> > >>>>>>> Thanks, > >>>>>>> Xia > >>>>>>> > >>>>>>> Lijie Wang <wangdachui9...@gmail.com> 于2023年11月1日周三 14:52写道: > >>>>>>> > >>>>>>>> Hi Xia, > >>>>>>>> > >>>>>>>> Thanks for driving this FLIP, +1 for the proposal. > >>>>>>>> > >>>>>>>> I have 2 questions about the relationship between static inference > >> and > >>>>>>>> dynamic inference: > >>>>>>>> > >>>>>>>> 1. AFAIK, currently the hive table source enable static inference > by > >>>>>>>> default. In this case, which one (static vs dynamic) will take > >> effect > >>>> ? > >>>>>> I > >>>>>>>> think it would be better if we can point this out in FLIP > >>>>>>>> > >>>>>>>> 2. As you mentioned above, dynamic inference is the most ideal > way, > >> so > >>>>>> do > >>>>>>>> we have plan to deprecate the static inference in the future? > >>>>>>>> > >>>>>>>> Best, > >>>>>>>> Lijie > >>>>>>>> > >>>>>>>> Zhu Zhu <reed...@gmail.com> 于2023年10月31日周二 20:19写道: > >>>>>>>> > >>>>>>>>> Thanks for opening the FLIP and kicking off this discussion, Xia! > >>>>>>>>> The proposed changes make up an important missing part of the > >> dynamic > >>>>>>>>> parallelism inference of adaptive batch scheduler. > >>>>>>>>> > >>>>>>>>> Besides that, it is also one good step towards supporting dynamic > >>>>>>>>> parallelism inference for streaming sources, e.g. allowing Kafka > >>>>>>>>> sources to determine its parallelism automatically based on the > >>>>>>>>> number of partitions. > >>>>>>>>> > >>>>>>>>> +1 for the proposal. > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> Zhu > >>>>>>>>> > >>>>>>>>> Xia Sun <xingbe...@gmail.com> 于2023年10月31日周二 16:01写道: > >>>>>>>>> > >>>>>>>>>> Hi everyone, > >>>>>>>>>> I would like to start a discussion on FLIP-379: Dynamic source > >>>>>>>>> parallelism > >>>>>>>>>> inference for batch jobs[1]. > >>>>>>>>>> > >>>>>>>>>> In general, there are three main ways to set source parallelism > >> for > >>>>>>>> batch > >>>>>>>>>> jobs: > >>>>>>>>>> (1) User-defined source parallelism. > >>>>>>>>>> (2) Connector static parallelism inference. > >>>>>>>>>> (3) Dynamic parallelism inference. > >>>>>>>>>> > >>>>>>>>>> Compared to manually setting parallelism, automatic parallelism > >>>>>>>> inference > >>>>>>>>>> is easier to use and can better adapt to varying data volumes > each > >>>>>> day. > >>>>>>>>>> However, static parallelism inference cannot leverage runtime > >>>>>>>>> information, > >>>>>>>>>> resulting in inaccurate parallelism inference. Therefore, for > >> batch > >>>>>>>> jobs, > >>>>>>>>>> dynamic parallelism inference is the most ideal, but currently, > >> the > >>>>>>>>> support > >>>>>>>>>> for adaptive batch scheduler is not very comprehensive. > >>>>>>>>>> > >>>>>>>>>> Therefore, we aim to introduce a general interface that enables > >> the > >>>>>>>>>> adaptive batch scheduler to dynamically infer the source > >> parallelism > >>>>>> at > >>>>>>>>>> runtime. Please refer to the FLIP[1] document for more details > >> about > >>>>>>>> the > >>>>>>>>>> proposed design and implementation. > >>>>>>>>>> > >>>>>>>>>> I also thank Zhu Zhu and LiJie Wang for their suggestions during > >> the > >>>>>>>>>> pre-discussion. > >>>>>>>>>> Looking forward to your feedback and suggestions, thanks. > >>>>>>>>>> > >>>>>>>>>> [1] > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs > >>>>>>>>>> > >>>>>>>>>> Best regards, > >>>>>>>>>> Xia > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>> > >>>>>> > >>>> > >>>> > >> > >> > >