Thanks Leonard for the feedback and sorry for my late response.

> `How user disable the parallelism inference if they want to use fixed
source parallelism?`
> `Could you explain the priority the static parallelism set from table
layer and the proposed dynamic source parallelism?`

>From the user's perspective, if the user specifies a fixed parallelism for
the source, dynamic source parallelism inference will be automatically
disabled. From the perspective of priority, the user’s specified
parallelism > the static parallelism inference > dynamic parallelism
inference. Because the dynamic source parallelism inference will take
effect at the runtime stage and the validity conditions are: (1) the
current ExecutionGraph is a dynamic graph, and (2) the parallelism of the
source vertex is not specified (that is, the parallelism is -1).

> `the workflow for streaming job may looks like ... which is totally
different, the later one lacks a lot of infra in Flink, right?`

Indeed, as of now, the dynamic parallelism inference is exclusively for
batch jobs, so it only takes into account the necessary information for
batch scenarios. In the future, when we introduce support for automatic
parallelism inference in streaming jobs, we can include the required
information for streaming jobs to avoid unnecessarily complicating the
current design.
Moreover, The workflow you mentioned seems a bit complicated. Our current
idea is to perform the parallelism inference during the initialization
phase of streaming jobs and proceed to schedule the entire job once the
source parallelism is determined. This process will naturally occur during
job startup, eliminating the need for additional restarts.

> `So, could we consider the boundness info when design the interface? Both
FileSource and Hive Source offer streaming read ability, imaging this case:
Flink Streaming Hive Source should not apply the dynamic source parallelism
even it implemented the feature as it severing a streaming job.`

Thanks for your feedback, it is reallly a good input. Currently, the
dynamic parallelism inference logic is only triggered in batch jobs.
Therefore, the logic will not be called in the streaming jobs.
In the future, if streaming jobs also support runtime parallelism
inference, then theoretically, the source can no longer be distinguished
between streaming jobs and batch jobs at the runtime stage. In addition,
since the new interface is implemented together with the Source interface,
the Source::getBoundedness() method can also be obtained when inferring
parallelism.

Best regards,
Xia

Leonard Xu <xbjt...@gmail.com> 于2023年11月8日周三 16:19写道:

> Thanks Xia and Zhu Zhu for kickoff this discussion.
>
> The dynamic source parallelism inference is a useful feature for batch
> story. I’ve some comments about current design.
>
> 1.How user disable the parallelism inference if they want to use fixed
> source parallelism? They can configure fixed parallelism in table layer
> currently as you explained above.
>
> 2.Could you explain the priority the static parallelism set from table
> layer and the proposed dynamic source parallelism? And changing the default
> value `table.exec.hive.infer-source-parallelism` as a sub-task does not
> resolve all case, because other Sources can set their own parallelism too.
>
> 3.Current design only works for batch josb, the workflow for streaming job
> may looks like (1) inference  parallelism for streaming source like kafka
> (2) stop job with a savepoint  (3) apply new parallelism for job (4)
> schedule the streaming job from savepoint which is totally different, the
> later one lacks a lot of infra in Flink, right?  So, could we consider the
> boundness info when design the interface? Both FileSource and Hive Source
> offer streaming read ability, imaging this case: Flink Streaming Hive
> Source should not apply the dynamic source parallelism even it implemented
> the feature as it severing a streaming job.
>
> Best,
> Leonard
>
>
> > 2023年11月1日 下午6:21,Xia Sun <xingbe...@gmail.com> 写道:
> >
> > Thanks Lijie for the comments!
> > 1. For Hive source, dynamic parallelism inference in batch scenarios is a
> > superset of static parallelism inference. As a follow-up task, we can
> > consider changing the default value of
> > 'table.exec.hive.infer-source-parallelism' to false.
> >
> > 2. I think that both dynamic parallelism inference and static parallelism
> > inference have their own use cases. Currently, for streaming sources and
> > other sources that are not sensitive to dynamic information, the benefits
> > of dynamic parallelism inference may not be significant. In such cases,
> we
> > can continue to use static parallelism inference.
> >
> > Thanks,
> > Xia
> >
> > Lijie Wang <wangdachui9...@gmail.com> 于2023年11月1日周三 14:52写道:
> >
> >> Hi Xia,
> >>
> >> Thanks for driving this FLIP, +1 for the proposal.
> >>
> >> I have 2 questions about the relationship between static inference and
> >> dynamic inference:
> >>
> >> 1. AFAIK, currently the hive table source enable static inference by
> >> default. In this case, which one (static vs dynamic) will take effect ?
> I
> >> think it would be better if we can point this out in FLIP
> >>
> >> 2. As you mentioned above, dynamic inference is the most ideal way, so
> do
> >> we have plan to deprecate the static inference in the future?
> >>
> >> Best,
> >> Lijie
> >>
> >> Zhu Zhu <reed...@gmail.com> 于2023年10月31日周二 20:19写道:
> >>
> >>> Thanks for opening the FLIP and kicking off this discussion, Xia!
> >>> The proposed changes make up an important missing part of the dynamic
> >>> parallelism inference of adaptive batch scheduler.
> >>>
> >>> Besides that, it is also one good step towards supporting dynamic
> >>> parallelism inference for streaming sources, e.g. allowing Kafka
> >>> sources to determine its parallelism automatically based on the
> >>> number of partitions.
> >>>
> >>> +1 for the proposal.
> >>>
> >>> Thanks,
> >>> Zhu
> >>>
> >>> Xia Sun <xingbe...@gmail.com> 于2023年10月31日周二 16:01写道:
> >>>
> >>>> Hi everyone,
> >>>> I would like to start a discussion on FLIP-379: Dynamic source
> >>> parallelism
> >>>> inference for batch jobs[1].
> >>>>
> >>>> In general, there are three main ways to set source parallelism for
> >> batch
> >>>> jobs:
> >>>> (1) User-defined source parallelism.
> >>>> (2) Connector static parallelism inference.
> >>>> (3) Dynamic parallelism inference.
> >>>>
> >>>> Compared to manually setting parallelism, automatic parallelism
> >> inference
> >>>> is easier to use and can better adapt to varying data volumes each
> day.
> >>>> However, static parallelism inference cannot leverage runtime
> >>> information,
> >>>> resulting in inaccurate parallelism inference. Therefore, for batch
> >> jobs,
> >>>> dynamic parallelism inference is the most ideal, but currently, the
> >>> support
> >>>> for adaptive batch scheduler is not very comprehensive.
> >>>>
> >>>> Therefore, we aim to introduce a general interface that enables the
> >>>> adaptive batch scheduler to dynamically infer the source parallelism
> at
> >>>> runtime. Please refer to the FLIP[1] document for more details about
> >> the
> >>>> proposed design and implementation.
> >>>>
> >>>> I also thank Zhu Zhu and LiJie Wang for their suggestions during the
> >>>> pre-discussion.
> >>>> Looking forward to your feedback and suggestions, thanks.
> >>>>
> >>>> [1]
> >>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs
> >>>>
> >>>> Best regards,
> >>>> Xia
> >>>>
> >>>
> >>
>
>

Reply via email to