Thanks Leonard for the detailed feedback and input.

> The 'Max source parallelism’ is the information that runtime offered to
Source as a hint to infer the actual parallelism, a name with max prefix
but calculated > with minimum value confusing me a lot, especially when I
read the HiveSource pseudocode:

> fileEnumerator.setMinNumSplits(maxSourceParallelism);

> Although I understand that naming is a complex topic in CS, could we
improve this method name a little?

Thanks for pointing out the issue, the current wording does indeed seem to
be confusing. It involves the existing implementation of the
AdaptiveBatchScheduler, where the dynamically inferred parallelism cannot
exceed the JobVertex's maxParallelism (which is typically set to either the
global default max parallelism or the user-specified JobVertex max
parallelism), so the flip maintains the logic. I have modified the flip to
avoid confusion as much as possible.

> And,
`execution.batch.adaptive.auto-parallelism.default-source-parallelism`
config has a default value > with 1, how we distinguish user set it or not
for

> example if user happen to set value 1 ?

We can use Configuration::getOptional to check if the user has configured
the `execution.batch.adaptive.auto-parallelism.default-source-parallelism`.

> No doubt that it’s a API breaking change, for existing hive users, the
migration path is not clear in this FLIP, for example, current users used
splits number

> to infer the source parallelism, after this FLIP, could we give the
recommended value of `execution.batch.adaptive.auto-parallelism.

> default-source-parallelism` or how to set it or event users do not need
to set anythins? And the replacement for migration replacement should add
to 'table.

> exec.hive.infer-source-parallelism’s description when we propose to
change its default value, right?

As a follow-up task, we may have a dedicated discussion in the future to
see if we need to change the default value of
`table.exec.hive.infer-source-parallelism` to false. Before then, user can
manually set `table.exec.hive.infer-source-parallelism` to false to enable
dynamic parallelism inference, and use
`execution.batch.adaptive.auto-parallelism.default-source-parallelism` to
replace `table.exec.hive.infer-source-parallelism.max` as the parallelism
inference upper bound. I have updated both the Flip's
DynamicParallelismInference interface implementation and Migration Plan
modules to illustrate this.

> The pseudocode code shows:

> fileEnumerator.getInferredSourceParallelsim();

> IIRC, our public API FileEnumerator never offers such method, introducing
getInferredSourceParallelsim() is also one part of our FLIP ?

The intent was to make the pseudo code easier to understand, but did
introduce some confusion. There are no plans to introduce
getInferredSourceParallelism() in HiveSource, and I've modified the
HiveSource pseudo code in flip.

Best regards,

Xia

Leonard Xu <xbjt...@gmail.com> 于2023年11月16日周四 12:36写道:

> Thanks Xia for the detailed reply.
>
> >> `How user disable the parallelism inference if they want to use fixed
> source parallelism?`
> >> `Could you explain the priority the static parallelism set from table
> layer and the proposed dynamic source parallelism?`
> >
> > From the user's perspective, if the user specifies a fixed parallelism
> for
> > the source, dynamic source parallelism inference will be automatically
> > disabled. From the perspective of priority, the user’s specified
> > parallelism > the static parallelism inference > dynamic parallelism
> > inference. Because the dynamic source parallelism inference will take
> > effect at the runtime stage and the validity conditions are: (1) the
> > current ExecutionGraph is a dynamic graph, and (2) the parallelism of the
> > source vertex is not specified (that is, the parallelism is -1).
>
> The priority explanation make sense to me, could you also add this
> explanation to FLIP?
>
> >> `So, could we consider the boundness info when design the interface?
> Both
> > FileSource and Hive Source offer streaming read ability, imaging this
> case:
> > Flink Streaming Hive Source should not apply the dynamic source
> parallelism
> > even it implemented the feature as it severing a streaming job.`
> >
> > Thanks for your feedback, it is reallly a good input. Currently, the
> > dynamic parallelism inference logic is only triggered in batch jobs.
> > Therefore, the logic will not be called in the streaming jobs.
> > In the future, if streaming jobs also support runtime parallelism
> > inference, then theoretically, the source can no longer be distinguished
> > between streaming jobs and batch jobs at the runtime stage. In addition,
> > since the new interface is implemented together with the Source
> interface,
> > the Source::getBoundedness() method can also be obtained when inferring
> > parallelism.
>
> +1 about the boundedness info part.
>
> Okay, let’s come back the batch world and discuss some details about
> current design:
>
> (1) About  max source parallelism
> The FLIP said:  (1) Max source parallelism, which is calculated as the
> minimum of the default source parallelism
> (`execution.batch.adaptive.auto-parallelism.default-source-parallelism`)
> and JobVertex#maxParallelism. If the default-source-parallelism is not set,
> the global default parallelism is used as the default source parallelism.
>
> The  'Max source parallelism’ is the information that runtime offered to
> Source as a hint to infer the actual parallelism, a name with max prefix
> but calculated with minimum value confusing me a lot, especially when I
> read the HiveSource pseudocode:
>
> fileEnumerator.setMinNumSplits(maxSourceParallelism);
>
> Although I understand that naming is a complex topic in CS, could we
> improve this method name a little?  And,
> `execution.batch.adaptive.auto-parallelism.default-source-parallelism`
> config has a default value with 1, how we distinguish user set it or not
> for example if user happen to  set value 1 ?
>
> (2) About changing default value of
> 'table.exec.hive.infer-source-parallelism'
> The FLIP said:  As a follow-up task, we will consider changing the default
> value of 'table.exec.hive.infer-source-parallelism' to false.
>
> No doubt that it’s a API breaking change, for existing hive users, the
> migration path is not clear in this FLIP, for example, current users used
> splits number to infer the source parallelism, after this FLIP,  could we
> give the recommended value of
> `execution.batch.adaptive.auto-parallelism.default-source-parallelism` or
> how to set it or event users do not need to set  anythins? And the
> replacement for migration replacement should add to
> 'table.exec.hive.infer-source-parallelism’s description when we propose to
> change its default value, right?
>
> (3) [minor] About the HiveSource
> The pseudocode  code shows:
>
> fileEnumerator.getInferredSourceParallelsim();
>
> IIRC, our public API FileEnumerator never offers such method, introducing
> getInferredSourceParallelsim() is also one part of our FLIP ?
>
> Best,
> Leonard
>
>
>
> >
> > Best regards,
> > Xia
> >
> > Leonard Xu <xbjt...@gmail.com> 于2023年11月8日周三 16:19写道:
> >
> >> Thanks Xia and Zhu Zhu for kickoff this discussion.
> >>
> >> The dynamic source parallelism inference is a useful feature for batch
> >> story. I’ve some comments about current design.
> >>
> >> 1.How user disable the parallelism inference if they want to use fixed
> >> source parallelism? They can configure fixed parallelism in table layer
> >> currently as you explained above.
> >>
> >> 2.Could you explain the priority the static parallelism set from table
> >> layer and the proposed dynamic source parallelism? And changing the
> default
> >> value `table.exec.hive.infer-source-parallelism` as a sub-task does not
> >> resolve all case, because other Sources can set their own parallelism
> too.
> >>
> >> 3.Current design only works for batch josb, the workflow for streaming
> job
> >> may looks like (1) inference  parallelism for streaming source like
> kafka
> >> (2) stop job with a savepoint  (3) apply new parallelism for job (4)
> >> schedule the streaming job from savepoint which is totally different,
> the
> >> later one lacks a lot of infra in Flink, right?  So, could we consider
> the
> >> boundness info when design the interface? Both FileSource and Hive
> Source
> >> offer streaming read ability, imaging this case: Flink Streaming Hive
> >> Source should not apply the dynamic source parallelism even it
> implemented
> >> the feature as it severing a streaming job.
> >>
> >> Best,
> >> Leonard
> >>
> >>
> >>> 2023年11月1日 下午6:21,Xia Sun <xingbe...@gmail.com> 写道:
> >>>
> >>> Thanks Lijie for the comments!
> >>> 1. For Hive source, dynamic parallelism inference in batch scenarios
> is a
> >>> superset of static parallelism inference. As a follow-up task, we can
> >>> consider changing the default value of
> >>> 'table.exec.hive.infer-source-parallelism' to false.
> >>>
> >>> 2. I think that both dynamic parallelism inference and static
> parallelism
> >>> inference have their own use cases. Currently, for streaming sources
> and
> >>> other sources that are not sensitive to dynamic information, the
> benefits
> >>> of dynamic parallelism inference may not be significant. In such cases,
> >> we
> >>> can continue to use static parallelism inference.
> >>>
> >>> Thanks,
> >>> Xia
> >>>
> >>> Lijie Wang <wangdachui9...@gmail.com> 于2023年11月1日周三 14:52写道:
> >>>
> >>>> Hi Xia,
> >>>>
> >>>> Thanks for driving this FLIP, +1 for the proposal.
> >>>>
> >>>> I have 2 questions about the relationship between static inference and
> >>>> dynamic inference:
> >>>>
> >>>> 1. AFAIK, currently the hive table source enable static inference by
> >>>> default. In this case, which one (static vs dynamic) will take effect
> ?
> >> I
> >>>> think it would be better if we can point this out in FLIP
> >>>>
> >>>> 2. As you mentioned above, dynamic inference is the most ideal way, so
> >> do
> >>>> we have plan to deprecate the static inference in the future?
> >>>>
> >>>> Best,
> >>>> Lijie
> >>>>
> >>>> Zhu Zhu <reed...@gmail.com> 于2023年10月31日周二 20:19写道:
> >>>>
> >>>>> Thanks for opening the FLIP and kicking off this discussion, Xia!
> >>>>> The proposed changes make up an important missing part of the dynamic
> >>>>> parallelism inference of adaptive batch scheduler.
> >>>>>
> >>>>> Besides that, it is also one good step towards supporting dynamic
> >>>>> parallelism inference for streaming sources, e.g. allowing Kafka
> >>>>> sources to determine its parallelism automatically based on the
> >>>>> number of partitions.
> >>>>>
> >>>>> +1 for the proposal.
> >>>>>
> >>>>> Thanks,
> >>>>> Zhu
> >>>>>
> >>>>> Xia Sun <xingbe...@gmail.com> 于2023年10月31日周二 16:01写道:
> >>>>>
> >>>>>> Hi everyone,
> >>>>>> I would like to start a discussion on FLIP-379: Dynamic source
> >>>>> parallelism
> >>>>>> inference for batch jobs[1].
> >>>>>>
> >>>>>> In general, there are three main ways to set source parallelism for
> >>>> batch
> >>>>>> jobs:
> >>>>>> (1) User-defined source parallelism.
> >>>>>> (2) Connector static parallelism inference.
> >>>>>> (3) Dynamic parallelism inference.
> >>>>>>
> >>>>>> Compared to manually setting parallelism, automatic parallelism
> >>>> inference
> >>>>>> is easier to use and can better adapt to varying data volumes each
> >> day.
> >>>>>> However, static parallelism inference cannot leverage runtime
> >>>>> information,
> >>>>>> resulting in inaccurate parallelism inference. Therefore, for batch
> >>>> jobs,
> >>>>>> dynamic parallelism inference is the most ideal, but currently, the
> >>>>> support
> >>>>>> for adaptive batch scheduler is not very comprehensive.
> >>>>>>
> >>>>>> Therefore, we aim to introduce a general interface that enables the
> >>>>>> adaptive batch scheduler to dynamically infer the source parallelism
> >> at
> >>>>>> runtime. Please refer to the FLIP[1] document for more details about
> >>>> the
> >>>>>> proposed design and implementation.
> >>>>>>
> >>>>>> I also thank Zhu Zhu and LiJie Wang for their suggestions during the
> >>>>>> pre-discussion.
> >>>>>> Looking forward to your feedback and suggestions, thanks.
> >>>>>>
> >>>>>> [1]
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs
> >>>>>>
> >>>>>> Best regards,
> >>>>>> Xia
> >>>>>>
> >>>>>
> >>>>
> >>
> >>
>
>

Reply via email to