Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs
Hi everyone, Thanks for all the comments! I will initiate the vote tomorrow if there is no further discussion. Best, Xia Leonard Xu 于2023年11月24日周五 18:50写道: > Thanks Xia and Zhu Zhu for driving this work, > > It will help unify the parallelism inference for all operators of batch > job, the updated FLIP looks good to me. > > Best, > Leonard > > > > 2023年11月24日 下午5:53,Xia Sun 写道: > > > > Hi all, > > Offline discussed with Zhu Zhu and Leonard Xu and we have reached the > > following three points of consensus: > > > > 1. Rename the interface method Context#getMaxSourceParallelism proposed > by > > the FLIP to Context#getParallelismInferenceUpperBound, to make the > meaning > > of the method clearer. See [1] for details. > > > > 2. We provide a more detailed explanation of the effective priority of > the > > dynamic source parallelism inference proposed by this FLIP and the order > of > > values for the upper bound of source parallelism. We also point out the > > current support and limitations of the AdaptiveBatchScheduler regarding > > source parallelism inference. See [2] for details. > > > > 3. This FLIP will only focus on the framework-level implementation and > will > > prioritize the implementation of FileSource as an example of the new > > interface proposed by the FLIP. The HiveSource, due to its existing > static > > parallelism dynamic inference, and changes in default values for > > configuration items such as `table.exec.hive.infer-source-parallelism`, > > requires a more detailed migration plan, as well as more comprehensive > > design and discussion. It is not suitable as part of this FLIP and needs > a > > separate FLIP. Therefore, we have removed the HiveSource part from this > > FLIP. > > > > Thanks again to everyone who participated in the discussion. > > Looking forward to your continued feedback. > > > > [1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs#FLIP379:Dynamicsourceparallelisminferenceforbatchjobs-IntroduceDynamicParallelismInferenceinterfaceforSource > > [2] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs#FLIP379:Dynamicsourceparallelisminferenceforbatchjobs-ConfigurationBehaviorChanges > > > > Best, > > Xia > > > > Leonard Xu 于2023年11月22日周三 18:37写道: > > > >> Thanks Xia for the reply, sorry for the late reply. > >> > >>> Thanks for pointing out the issue, the current wording does indeed seem > >> to > >>> be confusing. It involves the existing implementation of the > >>> AdaptiveBatchScheduler, where the dynamically inferred parallelism > cannot > >>> exceed the JobVertex's maxParallelism (which is typically set to either > >> the > >>> global default max parallelism or the user-specified JobVertex max > >>> parallelism), so the flip maintains the logic. I have modified the flip > >> to > >>> avoid confusion as much as possible. > >> > >> I didn’t see the change part in this FLIP, could you check it? > >> > >>> We can use Configuration::getOptional to check if the user has > configured > >>> the > >> `execution.batch.adaptive.auto-parallelism.default-source-parallelism`. > >> > >> Using Readable#getOptional(ConfigOption option) makes sense to me. > >> > >>> As a follow-up task, we may have a dedicated discussion in the future > to > >>> see if we need to change the default value of > >>> `table.exec.hive.infer-source-parallelism` to false. Before then, user > >> can > >>> manually set `table.exec.hive.infer-source-parallelism` to false to > >> enable > >>> dynamic parallelism inference, and use > >>> `execution.batch.adaptive.auto-parallelism.default-source-parallelism` > to > >>> replace `table.exec.hive.infer-source-parallelism.max` as the > parallelism > >>> inference upper bound. I have updated both the Flip's > >>> DynamicParallelismInference interface implementation and Migration Plan > >>> modules to illustrate this. > >> > >> In my opinion, moving HiveSource to subsequent discussion is not OK, see > >> my explanation: HiveSource supports dynamic source parallel inference is > >> one part of the FLIP implementation, it looks like that we introduce a > >> configuration > >> `execution.batch.adaptive.auto-parallelism.default-source-parallelism` > >> which conflicts with existing configuration `table.exec.hive.infer- > >> source-parallelism`. But we do not provide conflict resolution in the > >> current flip, just postpone the work to future discussions, this > uncertain > >> state is likely to cause subsequent discussions to be shelved from past > >> community work experience. I’d suggest we make this part clear in this > FLIP. > >> > >> > >> Best, > >> Leonard > >> > >> > >>> > >>> > >>> Leonard Xu 于2023年11月16日周四 12:36写道: > >>> > Thanks Xia for the detailed reply. > > >> `How user disable the parallelism inference if they want to use > fixed > source parallelism?` > >> `Could you explain the
Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs
Thanks Xia and Zhu Zhu for driving this work, It will help unify the parallelism inference for all operators of batch job, the updated FLIP looks good to me. Best, Leonard > 2023年11月24日 下午5:53,Xia Sun 写道: > > Hi all, > Offline discussed with Zhu Zhu and Leonard Xu and we have reached the > following three points of consensus: > > 1. Rename the interface method Context#getMaxSourceParallelism proposed by > the FLIP to Context#getParallelismInferenceUpperBound, to make the meaning > of the method clearer. See [1] for details. > > 2. We provide a more detailed explanation of the effective priority of the > dynamic source parallelism inference proposed by this FLIP and the order of > values for the upper bound of source parallelism. We also point out the > current support and limitations of the AdaptiveBatchScheduler regarding > source parallelism inference. See [2] for details. > > 3. This FLIP will only focus on the framework-level implementation and will > prioritize the implementation of FileSource as an example of the new > interface proposed by the FLIP. The HiveSource, due to its existing static > parallelism dynamic inference, and changes in default values for > configuration items such as `table.exec.hive.infer-source-parallelism`, > requires a more detailed migration plan, as well as more comprehensive > design and discussion. It is not suitable as part of this FLIP and needs a > separate FLIP. Therefore, we have removed the HiveSource part from this > FLIP. > > Thanks again to everyone who participated in the discussion. > Looking forward to your continued feedback. > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs#FLIP379:Dynamicsourceparallelisminferenceforbatchjobs-IntroduceDynamicParallelismInferenceinterfaceforSource > [2] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs#FLIP379:Dynamicsourceparallelisminferenceforbatchjobs-ConfigurationBehaviorChanges > > Best, > Xia > > Leonard Xu 于2023年11月22日周三 18:37写道: > >> Thanks Xia for the reply, sorry for the late reply. >> >>> Thanks for pointing out the issue, the current wording does indeed seem >> to >>> be confusing. It involves the existing implementation of the >>> AdaptiveBatchScheduler, where the dynamically inferred parallelism cannot >>> exceed the JobVertex's maxParallelism (which is typically set to either >> the >>> global default max parallelism or the user-specified JobVertex max >>> parallelism), so the flip maintains the logic. I have modified the flip >> to >>> avoid confusion as much as possible. >> >> I didn’t see the change part in this FLIP, could you check it? >> >>> We can use Configuration::getOptional to check if the user has configured >>> the >> `execution.batch.adaptive.auto-parallelism.default-source-parallelism`. >> >> Using Readable#getOptional(ConfigOption option) makes sense to me. >> >>> As a follow-up task, we may have a dedicated discussion in the future to >>> see if we need to change the default value of >>> `table.exec.hive.infer-source-parallelism` to false. Before then, user >> can >>> manually set `table.exec.hive.infer-source-parallelism` to false to >> enable >>> dynamic parallelism inference, and use >>> `execution.batch.adaptive.auto-parallelism.default-source-parallelism` to >>> replace `table.exec.hive.infer-source-parallelism.max` as the parallelism >>> inference upper bound. I have updated both the Flip's >>> DynamicParallelismInference interface implementation and Migration Plan >>> modules to illustrate this. >> >> In my opinion, moving HiveSource to subsequent discussion is not OK, see >> my explanation: HiveSource supports dynamic source parallel inference is >> one part of the FLIP implementation, it looks like that we introduce a >> configuration >> `execution.batch.adaptive.auto-parallelism.default-source-parallelism` >> which conflicts with existing configuration `table.exec.hive.infer- >> source-parallelism`. But we do not provide conflict resolution in the >> current flip, just postpone the work to future discussions, this uncertain >> state is likely to cause subsequent discussions to be shelved from past >> community work experience. I’d suggest we make this part clear in this FLIP. >> >> >> Best, >> Leonard >> >> >>> >>> >>> Leonard Xu 于2023年11月16日周四 12:36写道: >>> Thanks Xia for the detailed reply. >> `How user disable the parallelism inference if they want to use fixed source parallelism?` >> `Could you explain the priority the static parallelism set from table layer and the proposed dynamic source parallelism?` > > From the user's perspective, if the user specifies a fixed parallelism for > the source, dynamic source parallelism inference will be automatically > disabled. From the perspective of priority, the user’s specified > parallelism > the static parallelism
Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs
Hi all, Offline discussed with Zhu Zhu and Leonard Xu and we have reached the following three points of consensus: 1. Rename the interface method Context#getMaxSourceParallelism proposed by the FLIP to Context#getParallelismInferenceUpperBound, to make the meaning of the method clearer. See [1] for details. 2. We provide a more detailed explanation of the effective priority of the dynamic source parallelism inference proposed by this FLIP and the order of values for the upper bound of source parallelism. We also point out the current support and limitations of the AdaptiveBatchScheduler regarding source parallelism inference. See [2] for details. 3. This FLIP will only focus on the framework-level implementation and will prioritize the implementation of FileSource as an example of the new interface proposed by the FLIP. The HiveSource, due to its existing static parallelism dynamic inference, and changes in default values for configuration items such as `table.exec.hive.infer-source-parallelism`, requires a more detailed migration plan, as well as more comprehensive design and discussion. It is not suitable as part of this FLIP and needs a separate FLIP. Therefore, we have removed the HiveSource part from this FLIP. Thanks again to everyone who participated in the discussion. Looking forward to your continued feedback. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs#FLIP379:Dynamicsourceparallelisminferenceforbatchjobs-IntroduceDynamicParallelismInferenceinterfaceforSource [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs#FLIP379:Dynamicsourceparallelisminferenceforbatchjobs-ConfigurationBehaviorChanges Best, Xia Leonard Xu 于2023年11月22日周三 18:37写道: > Thanks Xia for the reply, sorry for the late reply. > > > Thanks for pointing out the issue, the current wording does indeed seem > to > > be confusing. It involves the existing implementation of the > > AdaptiveBatchScheduler, where the dynamically inferred parallelism cannot > > exceed the JobVertex's maxParallelism (which is typically set to either > the > > global default max parallelism or the user-specified JobVertex max > > parallelism), so the flip maintains the logic. I have modified the flip > to > > avoid confusion as much as possible. > > I didn’t see the change part in this FLIP, could you check it? > > > We can use Configuration::getOptional to check if the user has configured > > the > `execution.batch.adaptive.auto-parallelism.default-source-parallelism`. > > Using Readable#getOptional(ConfigOption option) makes sense to me. > > > As a follow-up task, we may have a dedicated discussion in the future to > > see if we need to change the default value of > > `table.exec.hive.infer-source-parallelism` to false. Before then, user > can > > manually set `table.exec.hive.infer-source-parallelism` to false to > enable > > dynamic parallelism inference, and use > > `execution.batch.adaptive.auto-parallelism.default-source-parallelism` to > > replace `table.exec.hive.infer-source-parallelism.max` as the parallelism > > inference upper bound. I have updated both the Flip's > > DynamicParallelismInference interface implementation and Migration Plan > > modules to illustrate this. > > In my opinion, moving HiveSource to subsequent discussion is not OK, see > my explanation: HiveSource supports dynamic source parallel inference is > one part of the FLIP implementation, it looks like that we introduce a > configuration > `execution.batch.adaptive.auto-parallelism.default-source-parallelism` > which conflicts with existing configuration `table.exec.hive.infer- > source-parallelism`. But we do not provide conflict resolution in the > current flip, just postpone the work to future discussions, this uncertain > state is likely to cause subsequent discussions to be shelved from past > community work experience. I’d suggest we make this part clear in this FLIP. > > > Best, > Leonard > > > > > > > > Leonard Xu 于2023年11月16日周四 12:36写道: > > > >> Thanks Xia for the detailed reply. > >> > `How user disable the parallelism inference if they want to use fixed > >> source parallelism?` > `Could you explain the priority the static parallelism set from table > >> layer and the proposed dynamic source parallelism?` > >>> > >>> From the user's perspective, if the user specifies a fixed parallelism > >> for > >>> the source, dynamic source parallelism inference will be automatically > >>> disabled. From the perspective of priority, the user’s specified > >>> parallelism > the static parallelism inference > dynamic parallelism > >>> inference. Because the dynamic source parallelism inference will take > >>> effect at the runtime stage and the validity conditions are: (1) the > >>> current ExecutionGraph is a dynamic graph, and (2) the parallelism of > the > >>> source vertex is not specified (that is, the parallelism is
Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs
Thanks Xia for the reply, sorry for the late reply. > Thanks for pointing out the issue, the current wording does indeed seem to > be confusing. It involves the existing implementation of the > AdaptiveBatchScheduler, where the dynamically inferred parallelism cannot > exceed the JobVertex's maxParallelism (which is typically set to either the > global default max parallelism or the user-specified JobVertex max > parallelism), so the flip maintains the logic. I have modified the flip to > avoid confusion as much as possible. I didn’t see the change part in this FLIP, could you check it? > We can use Configuration::getOptional to check if the user has configured > the `execution.batch.adaptive.auto-parallelism.default-source-parallelism`. Using Readable#getOptional(ConfigOption option) makes sense to me. > As a follow-up task, we may have a dedicated discussion in the future to > see if we need to change the default value of > `table.exec.hive.infer-source-parallelism` to false. Before then, user can > manually set `table.exec.hive.infer-source-parallelism` to false to enable > dynamic parallelism inference, and use > `execution.batch.adaptive.auto-parallelism.default-source-parallelism` to > replace `table.exec.hive.infer-source-parallelism.max` as the parallelism > inference upper bound. I have updated both the Flip's > DynamicParallelismInference interface implementation and Migration Plan > modules to illustrate this. In my opinion, moving HiveSource to subsequent discussion is not OK, see my explanation: HiveSource supports dynamic source parallel inference is one part of the FLIP implementation, it looks like that we introduce a configuration `execution.batch.adaptive.auto-parallelism.default-source-parallelism` which conflicts with existing configuration `table.exec.hive.infer- source-parallelism`. But we do not provide conflict resolution in the current flip, just postpone the work to future discussions, this uncertain state is likely to cause subsequent discussions to be shelved from past community work experience. I’d suggest we make this part clear in this FLIP. Best, Leonard > > > Leonard Xu 于2023年11月16日周四 12:36写道: > >> Thanks Xia for the detailed reply. >> `How user disable the parallelism inference if they want to use fixed >> source parallelism?` `Could you explain the priority the static parallelism set from table >> layer and the proposed dynamic source parallelism?` >>> >>> From the user's perspective, if the user specifies a fixed parallelism >> for >>> the source, dynamic source parallelism inference will be automatically >>> disabled. From the perspective of priority, the user’s specified >>> parallelism > the static parallelism inference > dynamic parallelism >>> inference. Because the dynamic source parallelism inference will take >>> effect at the runtime stage and the validity conditions are: (1) the >>> current ExecutionGraph is a dynamic graph, and (2) the parallelism of the >>> source vertex is not specified (that is, the parallelism is -1). >> >> The priority explanation make sense to me, could you also add this >> explanation to FLIP? >> `So, could we consider the boundness info when design the interface? >> Both >>> FileSource and Hive Source offer streaming read ability, imaging this >> case: >>> Flink Streaming Hive Source should not apply the dynamic source >> parallelism >>> even it implemented the feature as it severing a streaming job.` >>> >>> Thanks for your feedback, it is reallly a good input. Currently, the >>> dynamic parallelism inference logic is only triggered in batch jobs. >>> Therefore, the logic will not be called in the streaming jobs. >>> In the future, if streaming jobs also support runtime parallelism >>> inference, then theoretically, the source can no longer be distinguished >>> between streaming jobs and batch jobs at the runtime stage. In addition, >>> since the new interface is implemented together with the Source >> interface, >>> the Source::getBoundedness() method can also be obtained when inferring >>> parallelism. >> >> +1 about the boundedness info part. >> >> Okay, let’s come back the batch world and discuss some details about >> current design: >> >> (1) About max source parallelism >> The FLIP said: (1) Max source parallelism, which is calculated as the >> minimum of the default source parallelism >> (`execution.batch.adaptive.auto-parallelism.default-source-parallelism`) >> and JobVertex#maxParallelism. If the default-source-parallelism is not set, >> the global default parallelism is used as the default source parallelism. >> >> The 'Max source parallelism’ is the information that runtime offered to >> Source as a hint to infer the actual parallelism, a name with max prefix >> but calculated with minimum value confusing me a lot, especially when I >> read the HiveSource pseudocode: >> >> fileEnumerator.setMinNumSplits(maxSourceParallelism); >> >> Although I understand that naming is a
Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs
Thanks Leonard for the detailed feedback and input. > The 'Max source parallelism’ is the information that runtime offered to Source as a hint to infer the actual parallelism, a name with max prefix but calculated > with minimum value confusing me a lot, especially when I read the HiveSource pseudocode: > fileEnumerator.setMinNumSplits(maxSourceParallelism); > Although I understand that naming is a complex topic in CS, could we improve this method name a little? Thanks for pointing out the issue, the current wording does indeed seem to be confusing. It involves the existing implementation of the AdaptiveBatchScheduler, where the dynamically inferred parallelism cannot exceed the JobVertex's maxParallelism (which is typically set to either the global default max parallelism or the user-specified JobVertex max parallelism), so the flip maintains the logic. I have modified the flip to avoid confusion as much as possible. > And, `execution.batch.adaptive.auto-parallelism.default-source-parallelism` config has a default value > with 1, how we distinguish user set it or not for > example if user happen to set value 1 ? We can use Configuration::getOptional to check if the user has configured the `execution.batch.adaptive.auto-parallelism.default-source-parallelism`. > No doubt that it’s a API breaking change, for existing hive users, the migration path is not clear in this FLIP, for example, current users used splits number > to infer the source parallelism, after this FLIP, could we give the recommended value of `execution.batch.adaptive.auto-parallelism. > default-source-parallelism` or how to set it or event users do not need to set anythins? And the replacement for migration replacement should add to 'table. > exec.hive.infer-source-parallelism’s description when we propose to change its default value, right? As a follow-up task, we may have a dedicated discussion in the future to see if we need to change the default value of `table.exec.hive.infer-source-parallelism` to false. Before then, user can manually set `table.exec.hive.infer-source-parallelism` to false to enable dynamic parallelism inference, and use `execution.batch.adaptive.auto-parallelism.default-source-parallelism` to replace `table.exec.hive.infer-source-parallelism.max` as the parallelism inference upper bound. I have updated both the Flip's DynamicParallelismInference interface implementation and Migration Plan modules to illustrate this. > The pseudocode code shows: > fileEnumerator.getInferredSourceParallelsim(); > IIRC, our public API FileEnumerator never offers such method, introducing getInferredSourceParallelsim() is also one part of our FLIP ? The intent was to make the pseudo code easier to understand, but did introduce some confusion. There are no plans to introduce getInferredSourceParallelism() in HiveSource, and I've modified the HiveSource pseudo code in flip. Best regards, Xia Leonard Xu 于2023年11月16日周四 12:36写道: > Thanks Xia for the detailed reply. > > >> `How user disable the parallelism inference if they want to use fixed > source parallelism?` > >> `Could you explain the priority the static parallelism set from table > layer and the proposed dynamic source parallelism?` > > > > From the user's perspective, if the user specifies a fixed parallelism > for > > the source, dynamic source parallelism inference will be automatically > > disabled. From the perspective of priority, the user’s specified > > parallelism > the static parallelism inference > dynamic parallelism > > inference. Because the dynamic source parallelism inference will take > > effect at the runtime stage and the validity conditions are: (1) the > > current ExecutionGraph is a dynamic graph, and (2) the parallelism of the > > source vertex is not specified (that is, the parallelism is -1). > > The priority explanation make sense to me, could you also add this > explanation to FLIP? > > >> `So, could we consider the boundness info when design the interface? > Both > > FileSource and Hive Source offer streaming read ability, imaging this > case: > > Flink Streaming Hive Source should not apply the dynamic source > parallelism > > even it implemented the feature as it severing a streaming job.` > > > > Thanks for your feedback, it is reallly a good input. Currently, the > > dynamic parallelism inference logic is only triggered in batch jobs. > > Therefore, the logic will not be called in the streaming jobs. > > In the future, if streaming jobs also support runtime parallelism > > inference, then theoretically, the source can no longer be distinguished > > between streaming jobs and batch jobs at the runtime stage. In addition, > > since the new interface is implemented together with the Source > interface, > > the Source::getBoundedness() method can also be obtained when inferring > > parallelism. > > +1 about the boundedness info part. > > Okay, let’s come back the batch world and discuss some details about > current design: > > (1) About
Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs
Thanks Xia for the detailed reply. >> `How user disable the parallelism inference if they want to use fixed source >> parallelism?` >> `Could you explain the priority the static parallelism set from table layer >> and the proposed dynamic source parallelism?` > > From the user's perspective, if the user specifies a fixed parallelism for > the source, dynamic source parallelism inference will be automatically > disabled. From the perspective of priority, the user’s specified > parallelism > the static parallelism inference > dynamic parallelism > inference. Because the dynamic source parallelism inference will take > effect at the runtime stage and the validity conditions are: (1) the > current ExecutionGraph is a dynamic graph, and (2) the parallelism of the > source vertex is not specified (that is, the parallelism is -1). The priority explanation make sense to me, could you also add this explanation to FLIP? >> `So, could we consider the boundness info when design the interface? Both > FileSource and Hive Source offer streaming read ability, imaging this case: > Flink Streaming Hive Source should not apply the dynamic source parallelism > even it implemented the feature as it severing a streaming job.` > > Thanks for your feedback, it is reallly a good input. Currently, the > dynamic parallelism inference logic is only triggered in batch jobs. > Therefore, the logic will not be called in the streaming jobs. > In the future, if streaming jobs also support runtime parallelism > inference, then theoretically, the source can no longer be distinguished > between streaming jobs and batch jobs at the runtime stage. In addition, > since the new interface is implemented together with the Source interface, > the Source::getBoundedness() method can also be obtained when inferring > parallelism. +1 about the boundedness info part. Okay, let’s come back the batch world and discuss some details about current design: (1) About max source parallelism The FLIP said: (1) Max source parallelism, which is calculated as the minimum of the default source parallelism (`execution.batch.adaptive.auto-parallelism.default-source-parallelism`) and JobVertex#maxParallelism. If the default-source-parallelism is not set, the global default parallelism is used as the default source parallelism. The 'Max source parallelism’ is the information that runtime offered to Source as a hint to infer the actual parallelism, a name with max prefix but calculated with minimum value confusing me a lot, especially when I read the HiveSource pseudocode: fileEnumerator.setMinNumSplits(maxSourceParallelism); Although I understand that naming is a complex topic in CS, could we improve this method name a little? And, `execution.batch.adaptive.auto-parallelism.default-source-parallelism` config has a default value with 1, how we distinguish user set it or not for example if user happen to set value 1 ? (2) About changing default value of 'table.exec.hive.infer-source-parallelism' The FLIP said: As a follow-up task, we will consider changing the default value of 'table.exec.hive.infer-source-parallelism' to false. No doubt that it’s a API breaking change, for existing hive users, the migration path is not clear in this FLIP, for example, current users used splits number to infer the source parallelism, after this FLIP, could we give the recommended value of `execution.batch.adaptive.auto-parallelism.default-source-parallelism` or how to set it or event users do not need to set anythins? And the replacement for migration replacement should add to 'table.exec.hive.infer-source-parallelism’s description when we propose to change its default value, right? (3) [minor] About the HiveSource The pseudocode code shows: fileEnumerator.getInferredSourceParallelsim(); IIRC, our public API FileEnumerator never offers such method, introducing getInferredSourceParallelsim() is also one part of our FLIP ? Best, Leonard > > Best regards, > Xia > > Leonard Xu 于2023年11月8日周三 16:19写道: > >> Thanks Xia and Zhu Zhu for kickoff this discussion. >> >> The dynamic source parallelism inference is a useful feature for batch >> story. I’ve some comments about current design. >> >> 1.How user disable the parallelism inference if they want to use fixed >> source parallelism? They can configure fixed parallelism in table layer >> currently as you explained above. >> >> 2.Could you explain the priority the static parallelism set from table >> layer and the proposed dynamic source parallelism? And changing the default >> value `table.exec.hive.infer-source-parallelism` as a sub-task does not >> resolve all case, because other Sources can set their own parallelism too. >> >> 3.Current design only works for batch josb, the workflow for streaming job >> may looks like (1) inference parallelism for streaming source like kafka >> (2) stop job with a savepoint (3) apply new parallelism for job (4) >> schedule the streaming
Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs
Thanks Leonard for the feedback and sorry for my late response. > `How user disable the parallelism inference if they want to use fixed source parallelism?` > `Could you explain the priority the static parallelism set from table layer and the proposed dynamic source parallelism?` >From the user's perspective, if the user specifies a fixed parallelism for the source, dynamic source parallelism inference will be automatically disabled. From the perspective of priority, the user’s specified parallelism > the static parallelism inference > dynamic parallelism inference. Because the dynamic source parallelism inference will take effect at the runtime stage and the validity conditions are: (1) the current ExecutionGraph is a dynamic graph, and (2) the parallelism of the source vertex is not specified (that is, the parallelism is -1). > `the workflow for streaming job may looks like ... which is totally different, the later one lacks a lot of infra in Flink, right?` Indeed, as of now, the dynamic parallelism inference is exclusively for batch jobs, so it only takes into account the necessary information for batch scenarios. In the future, when we introduce support for automatic parallelism inference in streaming jobs, we can include the required information for streaming jobs to avoid unnecessarily complicating the current design. Moreover, The workflow you mentioned seems a bit complicated. Our current idea is to perform the parallelism inference during the initialization phase of streaming jobs and proceed to schedule the entire job once the source parallelism is determined. This process will naturally occur during job startup, eliminating the need for additional restarts. > `So, could we consider the boundness info when design the interface? Both FileSource and Hive Source offer streaming read ability, imaging this case: Flink Streaming Hive Source should not apply the dynamic source parallelism even it implemented the feature as it severing a streaming job.` Thanks for your feedback, it is reallly a good input. Currently, the dynamic parallelism inference logic is only triggered in batch jobs. Therefore, the logic will not be called in the streaming jobs. In the future, if streaming jobs also support runtime parallelism inference, then theoretically, the source can no longer be distinguished between streaming jobs and batch jobs at the runtime stage. In addition, since the new interface is implemented together with the Source interface, the Source::getBoundedness() method can also be obtained when inferring parallelism. Best regards, Xia Leonard Xu 于2023年11月8日周三 16:19写道: > Thanks Xia and Zhu Zhu for kickoff this discussion. > > The dynamic source parallelism inference is a useful feature for batch > story. I’ve some comments about current design. > > 1.How user disable the parallelism inference if they want to use fixed > source parallelism? They can configure fixed parallelism in table layer > currently as you explained above. > > 2.Could you explain the priority the static parallelism set from table > layer and the proposed dynamic source parallelism? And changing the default > value `table.exec.hive.infer-source-parallelism` as a sub-task does not > resolve all case, because other Sources can set their own parallelism too. > > 3.Current design only works for batch josb, the workflow for streaming job > may looks like (1) inference parallelism for streaming source like kafka > (2) stop job with a savepoint (3) apply new parallelism for job (4) > schedule the streaming job from savepoint which is totally different, the > later one lacks a lot of infra in Flink, right? So, could we consider the > boundness info when design the interface? Both FileSource and Hive Source > offer streaming read ability, imaging this case: Flink Streaming Hive > Source should not apply the dynamic source parallelism even it implemented > the feature as it severing a streaming job. > > Best, > Leonard > > > > 2023年11月1日 下午6:21,Xia Sun 写道: > > > > Thanks Lijie for the comments! > > 1. For Hive source, dynamic parallelism inference in batch scenarios is a > > superset of static parallelism inference. As a follow-up task, we can > > consider changing the default value of > > 'table.exec.hive.infer-source-parallelism' to false. > > > > 2. I think that both dynamic parallelism inference and static parallelism > > inference have their own use cases. Currently, for streaming sources and > > other sources that are not sensitive to dynamic information, the benefits > > of dynamic parallelism inference may not be significant. In such cases, > we > > can continue to use static parallelism inference. > > > > Thanks, > > Xia > > > > Lijie Wang 于2023年11月1日周三 14:52写道: > > > >> Hi Xia, > >> > >> Thanks for driving this FLIP, +1 for the proposal. > >> > >> I have 2 questions about the relationship between static inference and > >> dynamic inference: > >> > >> 1. AFAIK, currently the hive table source enable static inference by >
Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs
Thanks Xia and Zhu Zhu for kickoff this discussion. The dynamic source parallelism inference is a useful feature for batch story. I’ve some comments about current design. 1.How user disable the parallelism inference if they want to use fixed source parallelism? They can configure fixed parallelism in table layer currently as you explained above. 2.Could you explain the priority the static parallelism set from table layer and the proposed dynamic source parallelism? And changing the default value `table.exec.hive.infer-source-parallelism` as a sub-task does not resolve all case, because other Sources can set their own parallelism too. 3.Current design only works for batch josb, the workflow for streaming job may looks like (1) inference parallelism for streaming source like kafka (2) stop job with a savepoint (3) apply new parallelism for job (4) schedule the streaming job from savepoint which is totally different, the later one lacks a lot of infra in Flink, right? So, could we consider the boundness info when design the interface? Both FileSource and Hive Source offer streaming read ability, imaging this case: Flink Streaming Hive Source should not apply the dynamic source parallelism even it implemented the feature as it severing a streaming job. Best, Leonard > 2023年11月1日 下午6:21,Xia Sun 写道: > > Thanks Lijie for the comments! > 1. For Hive source, dynamic parallelism inference in batch scenarios is a > superset of static parallelism inference. As a follow-up task, we can > consider changing the default value of > 'table.exec.hive.infer-source-parallelism' to false. > > 2. I think that both dynamic parallelism inference and static parallelism > inference have their own use cases. Currently, for streaming sources and > other sources that are not sensitive to dynamic information, the benefits > of dynamic parallelism inference may not be significant. In such cases, we > can continue to use static parallelism inference. > > Thanks, > Xia > > Lijie Wang 于2023年11月1日周三 14:52写道: > >> Hi Xia, >> >> Thanks for driving this FLIP, +1 for the proposal. >> >> I have 2 questions about the relationship between static inference and >> dynamic inference: >> >> 1. AFAIK, currently the hive table source enable static inference by >> default. In this case, which one (static vs dynamic) will take effect ? I >> think it would be better if we can point this out in FLIP >> >> 2. As you mentioned above, dynamic inference is the most ideal way, so do >> we have plan to deprecate the static inference in the future? >> >> Best, >> Lijie >> >> Zhu Zhu 于2023年10月31日周二 20:19写道: >> >>> Thanks for opening the FLIP and kicking off this discussion, Xia! >>> The proposed changes make up an important missing part of the dynamic >>> parallelism inference of adaptive batch scheduler. >>> >>> Besides that, it is also one good step towards supporting dynamic >>> parallelism inference for streaming sources, e.g. allowing Kafka >>> sources to determine its parallelism automatically based on the >>> number of partitions. >>> >>> +1 for the proposal. >>> >>> Thanks, >>> Zhu >>> >>> Xia Sun 于2023年10月31日周二 16:01写道: >>> Hi everyone, I would like to start a discussion on FLIP-379: Dynamic source >>> parallelism inference for batch jobs[1]. In general, there are three main ways to set source parallelism for >> batch jobs: (1) User-defined source parallelism. (2) Connector static parallelism inference. (3) Dynamic parallelism inference. Compared to manually setting parallelism, automatic parallelism >> inference is easier to use and can better adapt to varying data volumes each day. However, static parallelism inference cannot leverage runtime >>> information, resulting in inaccurate parallelism inference. Therefore, for batch >> jobs, dynamic parallelism inference is the most ideal, but currently, the >>> support for adaptive batch scheduler is not very comprehensive. Therefore, we aim to introduce a general interface that enables the adaptive batch scheduler to dynamically infer the source parallelism at runtime. Please refer to the FLIP[1] document for more details about >> the proposed design and implementation. I also thank Zhu Zhu and LiJie Wang for their suggestions during the pre-discussion. Looking forward to your feedback and suggestions, thanks. [1] >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs Best regards, Xia >>> >>
Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs
Thanks Lijie for the comments! 1. For Hive source, dynamic parallelism inference in batch scenarios is a superset of static parallelism inference. As a follow-up task, we can consider changing the default value of 'table.exec.hive.infer-source-parallelism' to false. 2. I think that both dynamic parallelism inference and static parallelism inference have their own use cases. Currently, for streaming sources and other sources that are not sensitive to dynamic information, the benefits of dynamic parallelism inference may not be significant. In such cases, we can continue to use static parallelism inference. Thanks, Xia Lijie Wang 于2023年11月1日周三 14:52写道: > Hi Xia, > > Thanks for driving this FLIP, +1 for the proposal. > > I have 2 questions about the relationship between static inference and > dynamic inference: > > 1. AFAIK, currently the hive table source enable static inference by > default. In this case, which one (static vs dynamic) will take effect ? I > think it would be better if we can point this out in FLIP > > 2. As you mentioned above, dynamic inference is the most ideal way, so do > we have plan to deprecate the static inference in the future? > > Best, > Lijie > > Zhu Zhu 于2023年10月31日周二 20:19写道: > > > Thanks for opening the FLIP and kicking off this discussion, Xia! > > The proposed changes make up an important missing part of the dynamic > > parallelism inference of adaptive batch scheduler. > > > > Besides that, it is also one good step towards supporting dynamic > > parallelism inference for streaming sources, e.g. allowing Kafka > > sources to determine its parallelism automatically based on the > > number of partitions. > > > > +1 for the proposal. > > > > Thanks, > > Zhu > > > > Xia Sun 于2023年10月31日周二 16:01写道: > > > > > Hi everyone, > > > I would like to start a discussion on FLIP-379: Dynamic source > > parallelism > > > inference for batch jobs[1]. > > > > > > In general, there are three main ways to set source parallelism for > batch > > > jobs: > > > (1) User-defined source parallelism. > > > (2) Connector static parallelism inference. > > > (3) Dynamic parallelism inference. > > > > > > Compared to manually setting parallelism, automatic parallelism > inference > > > is easier to use and can better adapt to varying data volumes each day. > > > However, static parallelism inference cannot leverage runtime > > information, > > > resulting in inaccurate parallelism inference. Therefore, for batch > jobs, > > > dynamic parallelism inference is the most ideal, but currently, the > > support > > > for adaptive batch scheduler is not very comprehensive. > > > > > > Therefore, we aim to introduce a general interface that enables the > > > adaptive batch scheduler to dynamically infer the source parallelism at > > > runtime. Please refer to the FLIP[1] document for more details about > the > > > proposed design and implementation. > > > > > > I also thank Zhu Zhu and LiJie Wang for their suggestions during the > > > pre-discussion. > > > Looking forward to your feedback and suggestions, thanks. > > > > > > [1] > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs > > > > > > Best regards, > > > Xia > > > > > >
Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs
Hi Xia, Thanks for driving this FLIP, +1 for the proposal. I have 2 questions about the relationship between static inference and dynamic inference: 1. AFAIK, currently the hive table source enable static inference by default. In this case, which one (static vs dynamic) will take effect ? I think it would be better if we can point this out in FLIP 2. As you mentioned above, dynamic inference is the most ideal way, so do we have plan to deprecate the static inference in the future? Best, Lijie Zhu Zhu 于2023年10月31日周二 20:19写道: > Thanks for opening the FLIP and kicking off this discussion, Xia! > The proposed changes make up an important missing part of the dynamic > parallelism inference of adaptive batch scheduler. > > Besides that, it is also one good step towards supporting dynamic > parallelism inference for streaming sources, e.g. allowing Kafka > sources to determine its parallelism automatically based on the > number of partitions. > > +1 for the proposal. > > Thanks, > Zhu > > Xia Sun 于2023年10月31日周二 16:01写道: > > > Hi everyone, > > I would like to start a discussion on FLIP-379: Dynamic source > parallelism > > inference for batch jobs[1]. > > > > In general, there are three main ways to set source parallelism for batch > > jobs: > > (1) User-defined source parallelism. > > (2) Connector static parallelism inference. > > (3) Dynamic parallelism inference. > > > > Compared to manually setting parallelism, automatic parallelism inference > > is easier to use and can better adapt to varying data volumes each day. > > However, static parallelism inference cannot leverage runtime > information, > > resulting in inaccurate parallelism inference. Therefore, for batch jobs, > > dynamic parallelism inference is the most ideal, but currently, the > support > > for adaptive batch scheduler is not very comprehensive. > > > > Therefore, we aim to introduce a general interface that enables the > > adaptive batch scheduler to dynamically infer the source parallelism at > > runtime. Please refer to the FLIP[1] document for more details about the > > proposed design and implementation. > > > > I also thank Zhu Zhu and LiJie Wang for their suggestions during the > > pre-discussion. > > Looking forward to your feedback and suggestions, thanks. > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs > > > > Best regards, > > Xia > > >
Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs
Thanks for opening the FLIP and kicking off this discussion, Xia! The proposed changes make up an important missing part of the dynamic parallelism inference of adaptive batch scheduler. Besides that, it is also one good step towards supporting dynamic parallelism inference for streaming sources, e.g. allowing Kafka sources to determine its parallelism automatically based on the number of partitions. +1 for the proposal. Thanks, Zhu Xia Sun 于2023年10月31日周二 16:01写道: > Hi everyone, > I would like to start a discussion on FLIP-379: Dynamic source parallelism > inference for batch jobs[1]. > > In general, there are three main ways to set source parallelism for batch > jobs: > (1) User-defined source parallelism. > (2) Connector static parallelism inference. > (3) Dynamic parallelism inference. > > Compared to manually setting parallelism, automatic parallelism inference > is easier to use and can better adapt to varying data volumes each day. > However, static parallelism inference cannot leverage runtime information, > resulting in inaccurate parallelism inference. Therefore, for batch jobs, > dynamic parallelism inference is the most ideal, but currently, the support > for adaptive batch scheduler is not very comprehensive. > > Therefore, we aim to introduce a general interface that enables the > adaptive batch scheduler to dynamically infer the source parallelism at > runtime. Please refer to the FLIP[1] document for more details about the > proposed design and implementation. > > I also thank Zhu Zhu and LiJie Wang for their suggestions during the > pre-discussion. > Looking forward to your feedback and suggestions, thanks. > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs > > Best regards, > Xia >
[DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs
Hi everyone, I would like to start a discussion on FLIP-379: Dynamic source parallelism inference for batch jobs[1]. In general, there are three main ways to set source parallelism for batch jobs: (1) User-defined source parallelism. (2) Connector static parallelism inference. (3) Dynamic parallelism inference. Compared to manually setting parallelism, automatic parallelism inference is easier to use and can better adapt to varying data volumes each day. However, static parallelism inference cannot leverage runtime information, resulting in inaccurate parallelism inference. Therefore, for batch jobs, dynamic parallelism inference is the most ideal, but currently, the support for adaptive batch scheduler is not very comprehensive. Therefore, we aim to introduce a general interface that enables the adaptive batch scheduler to dynamically infer the source parallelism at runtime. Please refer to the FLIP[1] document for more details about the proposed design and implementation. I also thank Zhu Zhu and LiJie Wang for their suggestions during the pre-discussion. Looking forward to your feedback and suggestions, thanks. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs Best regards, Xia