Hi everyone,

Thanks for all the comments! I will initiate the vote tomorrow if there is
no further discussion.

Best,
Xia

Leonard Xu <xbjt...@gmail.com> 于2023年11月24日周五 18:50写道:

> Thanks Xia and Zhu Zhu for driving this work,
>
> It will help unify the parallelism inference for all operators of batch
> job, the updated FLIP looks good to me.
>
> Best,
> Leonard
>
>
> > 2023年11月24日 下午5:53,Xia Sun <xingbe...@gmail.com> 写道:
> >
> > Hi all,
> > Offline discussed with Zhu Zhu and Leonard Xu and we have reached the
> > following three points of consensus:
> >
> > 1. Rename the interface method Context#getMaxSourceParallelism proposed
> by
> > the FLIP to Context#getParallelismInferenceUpperBound, to make the
> meaning
> > of the method clearer. See [1] for details.
> >
> > 2. We provide a more detailed explanation of the effective priority of
> the
> > dynamic source parallelism inference proposed by this FLIP and the order
> of
> > values for the upper bound of source parallelism. We also point out the
> > current support and limitations of the AdaptiveBatchScheduler regarding
> > source parallelism inference. See [2] for details.
> >
> > 3. This FLIP will only focus on the framework-level implementation and
> will
> > prioritize the implementation of FileSource as an example of the new
> > interface proposed by the FLIP. The HiveSource, due to its existing
> static
> > parallelism dynamic inference, and changes in default values for
> > configuration items such as `table.exec.hive.infer-source-parallelism`,
> > requires a more detailed migration plan, as well as more comprehensive
> > design and discussion. It is not suitable as part of this FLIP and needs
> a
> > separate FLIP. Therefore, we have removed the HiveSource part from this
> > FLIP.
> >
> > Thanks again to everyone who participated in the discussion.
> > Looking forward to your continued feedback.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs#FLIP379:Dynamicsourceparallelisminferenceforbatchjobs-IntroduceDynamicParallelismInferenceinterfaceforSource
> > [2]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs#FLIP379:Dynamicsourceparallelisminferenceforbatchjobs-ConfigurationBehaviorChanges
> >
> > Best,
> > Xia
> >
> > Leonard Xu <xbjt...@gmail.com> 于2023年11月22日周三 18:37写道:
> >
> >> Thanks Xia for the  reply, sorry for the late reply.
> >>
> >>> Thanks for pointing out the issue, the current wording does indeed seem
> >> to
> >>> be confusing. It involves the existing implementation of the
> >>> AdaptiveBatchScheduler, where the dynamically inferred parallelism
> cannot
> >>> exceed the JobVertex's maxParallelism (which is typically set to either
> >> the
> >>> global default max parallelism or the user-specified JobVertex max
> >>> parallelism), so the flip maintains the logic. I have modified the flip
> >> to
> >>> avoid confusion as much as possible.
> >>
> >> I didn’t see the change part in this FLIP, could you check it?
> >>
> >>> We can use Configuration::getOptional to check if the user has
> configured
> >>> the
> >> `execution.batch.adaptive.auto-parallelism.default-source-parallelism`.
> >>
> >> Using Readable#getOptional(ConfigOption<T> option) makes sense to me.
> >>
> >>> As a follow-up task, we may have a dedicated discussion in the future
> to
> >>> see if we need to change the default value of
> >>> `table.exec.hive.infer-source-parallelism` to false. Before then, user
> >> can
> >>> manually set `table.exec.hive.infer-source-parallelism` to false to
> >> enable
> >>> dynamic parallelism inference, and use
> >>> `execution.batch.adaptive.auto-parallelism.default-source-parallelism`
> to
> >>> replace `table.exec.hive.infer-source-parallelism.max` as the
> parallelism
> >>> inference upper bound. I have updated both the Flip's
> >>> DynamicParallelismInference interface implementation and Migration Plan
> >>> modules to illustrate this.
> >>
> >> In my opinion, moving HiveSource to subsequent discussion is not OK, see
> >> my explanation: HiveSource supports dynamic source parallel inference is
> >> one part of the FLIP implementation, it looks like that we introduce a
> >> configuration
> >> `execution.batch.adaptive.auto-parallelism.default-source-parallelism`
> >> which conflicts with existing configuration `table.exec.hive.infer-
> >> source-parallelism`. But we do not provide conflict resolution in the
> >> current flip, just postpone the work to future discussions, this
> uncertain
> >> state is likely to cause subsequent discussions to be shelved from past
> >> community work experience. I’d suggest we make this part clear in this
> FLIP.
> >>
> >>
> >> Best,
> >> Leonard
> >>
> >>
> >>>
> >>>
> >>> Leonard Xu <xbjt...@gmail.com> 于2023年11月16日周四 12:36写道:
> >>>
> >>>> Thanks Xia for the detailed reply.
> >>>>
> >>>>>> `How user disable the parallelism inference if they want to use
> fixed
> >>>> source parallelism?`
> >>>>>> `Could you explain the priority the static parallelism set from
> table
> >>>> layer and the proposed dynamic source parallelism?`
> >>>>>
> >>>>> From the user's perspective, if the user specifies a fixed
> parallelism
> >>>> for
> >>>>> the source, dynamic source parallelism inference will be
> automatically
> >>>>> disabled. From the perspective of priority, the user’s specified
> >>>>> parallelism > the static parallelism inference > dynamic parallelism
> >>>>> inference. Because the dynamic source parallelism inference will take
> >>>>> effect at the runtime stage and the validity conditions are: (1) the
> >>>>> current ExecutionGraph is a dynamic graph, and (2) the parallelism of
> >> the
> >>>>> source vertex is not specified (that is, the parallelism is -1).
> >>>>
> >>>> The priority explanation make sense to me, could you also add this
> >>>> explanation to FLIP?
> >>>>
> >>>>>> `So, could we consider the boundness info when design the interface?
> >>>> Both
> >>>>> FileSource and Hive Source offer streaming read ability, imaging this
> >>>> case:
> >>>>> Flink Streaming Hive Source should not apply the dynamic source
> >>>> parallelism
> >>>>> even it implemented the feature as it severing a streaming job.`
> >>>>>
> >>>>> Thanks for your feedback, it is reallly a good input. Currently, the
> >>>>> dynamic parallelism inference logic is only triggered in batch jobs.
> >>>>> Therefore, the logic will not be called in the streaming jobs.
> >>>>> In the future, if streaming jobs also support runtime parallelism
> >>>>> inference, then theoretically, the source can no longer be
> >> distinguished
> >>>>> between streaming jobs and batch jobs at the runtime stage. In
> >> addition,
> >>>>> since the new interface is implemented together with the Source
> >>>> interface,
> >>>>> the Source::getBoundedness() method can also be obtained when
> inferring
> >>>>> parallelism.
> >>>>
> >>>> +1 about the boundedness info part.
> >>>>
> >>>> Okay, let’s come back the batch world and discuss some details about
> >>>> current design:
> >>>>
> >>>> (1) About  max source parallelism
> >>>> The FLIP said:  (1) Max source parallelism, which is calculated as the
> >>>> minimum of the default source parallelism
> >>>>
> (`execution.batch.adaptive.auto-parallelism.default-source-parallelism`)
> >>>> and JobVertex#maxParallelism. If the default-source-parallelism is not
> >> set,
> >>>> the global default parallelism is used as the default source
> >> parallelism.
> >>>>
> >>>> The  'Max source parallelism’ is the information that runtime offered
> to
> >>>> Source as a hint to infer the actual parallelism, a name with max
> prefix
> >>>> but calculated with minimum value confusing me a lot, especially when
> I
> >>>> read the HiveSource pseudocode:
> >>>>
> >>>> fileEnumerator.setMinNumSplits(maxSourceParallelism);
> >>>>
> >>>> Although I understand that naming is a complex topic in CS, could we
> >>>> improve this method name a little?  And,
> >>>> `execution.batch.adaptive.auto-parallelism.default-source-parallelism`
> >>>> config has a default value with 1, how we distinguish user set it or
> not
> >>>> for example if user happen to  set value 1 ?
> >>>>
> >>>> (2) About changing default value of
> >>>> 'table.exec.hive.infer-source-parallelism'
> >>>> The FLIP said:  As a follow-up task, we will consider changing the
> >> default
> >>>> value of 'table.exec.hive.infer-source-parallelism' to false.
> >>>>
> >>>> No doubt that it’s a API breaking change, for existing hive users, the
> >>>> migration path is not clear in this FLIP, for example, current users
> >> used
> >>>> splits number to infer the source parallelism, after this FLIP,  could
> >> we
> >>>> give the recommended value of
> >>>> `execution.batch.adaptive.auto-parallelism.default-source-parallelism`
> >> or
> >>>> how to set it or event users do not need to set  anythins? And the
> >>>> replacement for migration replacement should add to
> >>>> 'table.exec.hive.infer-source-parallelism’s description when we
> propose
> >> to
> >>>> change its default value, right?
> >>>>
> >>>> (3) [minor] About the HiveSource
> >>>> The pseudocode  code shows:
> >>>>
> >>>> fileEnumerator.getInferredSourceParallelsim();
> >>>>
> >>>> IIRC, our public API FileEnumerator never offers such method,
> >> introducing
> >>>> getInferredSourceParallelsim() is also one part of our FLIP ?
> >>>>
> >>>> Best,
> >>>> Leonard
> >>>>
> >>>>
> >>>>
> >>>>>
> >>>>> Best regards,
> >>>>> Xia
> >>>>>
> >>>>> Leonard Xu <xbjt...@gmail.com> 于2023年11月8日周三 16:19写道:
> >>>>>
> >>>>>> Thanks Xia and Zhu Zhu for kickoff this discussion.
> >>>>>>
> >>>>>> The dynamic source parallelism inference is a useful feature for
> batch
> >>>>>> story. I’ve some comments about current design.
> >>>>>>
> >>>>>> 1.How user disable the parallelism inference if they want to use
> fixed
> >>>>>> source parallelism? They can configure fixed parallelism in table
> >> layer
> >>>>>> currently as you explained above.
> >>>>>>
> >>>>>> 2.Could you explain the priority the static parallelism set from
> table
> >>>>>> layer and the proposed dynamic source parallelism? And changing the
> >>>> default
> >>>>>> value `table.exec.hive.infer-source-parallelism` as a sub-task does
> >> not
> >>>>>> resolve all case, because other Sources can set their own
> parallelism
> >>>> too.
> >>>>>>
> >>>>>> 3.Current design only works for batch josb, the workflow for
> streaming
> >>>> job
> >>>>>> may looks like (1) inference  parallelism for streaming source like
> >>>> kafka
> >>>>>> (2) stop job with a savepoint  (3) apply new parallelism for job (4)
> >>>>>> schedule the streaming job from savepoint which is totally
> different,
> >>>> the
> >>>>>> later one lacks a lot of infra in Flink, right?  So, could we
> consider
> >>>> the
> >>>>>> boundness info when design the interface? Both FileSource and Hive
> >>>> Source
> >>>>>> offer streaming read ability, imaging this case: Flink Streaming
> Hive
> >>>>>> Source should not apply the dynamic source parallelism even it
> >>>> implemented
> >>>>>> the feature as it severing a streaming job.
> >>>>>>
> >>>>>> Best,
> >>>>>> Leonard
> >>>>>>
> >>>>>>
> >>>>>>> 2023年11月1日 下午6:21,Xia Sun <xingbe...@gmail.com> 写道:
> >>>>>>>
> >>>>>>> Thanks Lijie for the comments!
> >>>>>>> 1. For Hive source, dynamic parallelism inference in batch
> scenarios
> >>>> is a
> >>>>>>> superset of static parallelism inference. As a follow-up task, we
> can
> >>>>>>> consider changing the default value of
> >>>>>>> 'table.exec.hive.infer-source-parallelism' to false.
> >>>>>>>
> >>>>>>> 2. I think that both dynamic parallelism inference and static
> >>>> parallelism
> >>>>>>> inference have their own use cases. Currently, for streaming
> sources
> >>>> and
> >>>>>>> other sources that are not sensitive to dynamic information, the
> >>>> benefits
> >>>>>>> of dynamic parallelism inference may not be significant. In such
> >> cases,
> >>>>>> we
> >>>>>>> can continue to use static parallelism inference.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Xia
> >>>>>>>
> >>>>>>> Lijie Wang <wangdachui9...@gmail.com> 于2023年11月1日周三 14:52写道:
> >>>>>>>
> >>>>>>>> Hi Xia,
> >>>>>>>>
> >>>>>>>> Thanks for driving this FLIP, +1 for the proposal.
> >>>>>>>>
> >>>>>>>> I have 2 questions about the relationship between static inference
> >> and
> >>>>>>>> dynamic inference:
> >>>>>>>>
> >>>>>>>> 1. AFAIK, currently the hive table source enable static inference
> by
> >>>>>>>> default. In this case, which one (static vs dynamic) will take
> >> effect
> >>>> ?
> >>>>>> I
> >>>>>>>> think it would be better if we can point this out in FLIP
> >>>>>>>>
> >>>>>>>> 2. As you mentioned above, dynamic inference is the most ideal
> way,
> >> so
> >>>>>> do
> >>>>>>>> we have plan to deprecate the static inference in the future?
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Lijie
> >>>>>>>>
> >>>>>>>> Zhu Zhu <reed...@gmail.com> 于2023年10月31日周二 20:19写道:
> >>>>>>>>
> >>>>>>>>> Thanks for opening the FLIP and kicking off this discussion, Xia!
> >>>>>>>>> The proposed changes make up an important missing part of the
> >> dynamic
> >>>>>>>>> parallelism inference of adaptive batch scheduler.
> >>>>>>>>>
> >>>>>>>>> Besides that, it is also one good step towards supporting dynamic
> >>>>>>>>> parallelism inference for streaming sources, e.g. allowing Kafka
> >>>>>>>>> sources to determine its parallelism automatically based on the
> >>>>>>>>> number of partitions.
> >>>>>>>>>
> >>>>>>>>> +1 for the proposal.
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Zhu
> >>>>>>>>>
> >>>>>>>>> Xia Sun <xingbe...@gmail.com> 于2023年10月31日周二 16:01写道:
> >>>>>>>>>
> >>>>>>>>>> Hi everyone,
> >>>>>>>>>> I would like to start a discussion on FLIP-379: Dynamic source
> >>>>>>>>> parallelism
> >>>>>>>>>> inference for batch jobs[1].
> >>>>>>>>>>
> >>>>>>>>>> In general, there are three main ways to set source parallelism
> >> for
> >>>>>>>> batch
> >>>>>>>>>> jobs:
> >>>>>>>>>> (1) User-defined source parallelism.
> >>>>>>>>>> (2) Connector static parallelism inference.
> >>>>>>>>>> (3) Dynamic parallelism inference.
> >>>>>>>>>>
> >>>>>>>>>> Compared to manually setting parallelism, automatic parallelism
> >>>>>>>> inference
> >>>>>>>>>> is easier to use and can better adapt to varying data volumes
> each
> >>>>>> day.
> >>>>>>>>>> However, static parallelism inference cannot leverage runtime
> >>>>>>>>> information,
> >>>>>>>>>> resulting in inaccurate parallelism inference. Therefore, for
> >> batch
> >>>>>>>> jobs,
> >>>>>>>>>> dynamic parallelism inference is the most ideal, but currently,
> >> the
> >>>>>>>>> support
> >>>>>>>>>> for adaptive batch scheduler is not very comprehensive.
> >>>>>>>>>>
> >>>>>>>>>> Therefore, we aim to introduce a general interface that enables
> >> the
> >>>>>>>>>> adaptive batch scheduler to dynamically infer the source
> >> parallelism
> >>>>>> at
> >>>>>>>>>> runtime. Please refer to the FLIP[1] document for more details
> >> about
> >>>>>>>> the
> >>>>>>>>>> proposed design and implementation.
> >>>>>>>>>>
> >>>>>>>>>> I also thank Zhu Zhu and LiJie Wang for their suggestions during
> >> the
> >>>>>>>>>> pre-discussion.
> >>>>>>>>>> Looking forward to your feedback and suggestions, thanks.
> >>>>>>>>>>
> >>>>>>>>>> [1]
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs
> >>>>>>>>>>
> >>>>>>>>>> Best regards,
> >>>>>>>>>> Xia
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>>
> >>>>
> >>>>
> >>
> >>
>
>

Reply via email to