Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-11-28 Thread Xia Sun
Hi everyone,

Thanks for all the comments! I will initiate the vote tomorrow if there is
no further discussion.

Best,
Xia

Leonard Xu  于2023年11月24日周五 18:50写道:

> Thanks Xia and Zhu Zhu for driving this work,
>
> It will help unify the parallelism inference for all operators of batch
> job, the updated FLIP looks good to me.
>
> Best,
> Leonard
>
>
> > 2023年11月24日 下午5:53,Xia Sun  写道:
> >
> > Hi all,
> > Offline discussed with Zhu Zhu and Leonard Xu and we have reached the
> > following three points of consensus:
> >
> > 1. Rename the interface method Context#getMaxSourceParallelism proposed
> by
> > the FLIP to Context#getParallelismInferenceUpperBound, to make the
> meaning
> > of the method clearer. See [1] for details.
> >
> > 2. We provide a more detailed explanation of the effective priority of
> the
> > dynamic source parallelism inference proposed by this FLIP and the order
> of
> > values for the upper bound of source parallelism. We also point out the
> > current support and limitations of the AdaptiveBatchScheduler regarding
> > source parallelism inference. See [2] for details.
> >
> > 3. This FLIP will only focus on the framework-level implementation and
> will
> > prioritize the implementation of FileSource as an example of the new
> > interface proposed by the FLIP. The HiveSource, due to its existing
> static
> > parallelism dynamic inference, and changes in default values for
> > configuration items such as `table.exec.hive.infer-source-parallelism`,
> > requires a more detailed migration plan, as well as more comprehensive
> > design and discussion. It is not suitable as part of this FLIP and needs
> a
> > separate FLIP. Therefore, we have removed the HiveSource part from this
> > FLIP.
> >
> > Thanks again to everyone who participated in the discussion.
> > Looking forward to your continued feedback.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs#FLIP379:Dynamicsourceparallelisminferenceforbatchjobs-IntroduceDynamicParallelismInferenceinterfaceforSource
> > [2]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs#FLIP379:Dynamicsourceparallelisminferenceforbatchjobs-ConfigurationBehaviorChanges
> >
> > Best,
> > Xia
> >
> > Leonard Xu  于2023年11月22日周三 18:37写道:
> >
> >> Thanks Xia for the  reply, sorry for the late reply.
> >>
> >>> Thanks for pointing out the issue, the current wording does indeed seem
> >> to
> >>> be confusing. It involves the existing implementation of the
> >>> AdaptiveBatchScheduler, where the dynamically inferred parallelism
> cannot
> >>> exceed the JobVertex's maxParallelism (which is typically set to either
> >> the
> >>> global default max parallelism or the user-specified JobVertex max
> >>> parallelism), so the flip maintains the logic. I have modified the flip
> >> to
> >>> avoid confusion as much as possible.
> >>
> >> I didn’t see the change part in this FLIP, could you check it?
> >>
> >>> We can use Configuration::getOptional to check if the user has
> configured
> >>> the
> >> `execution.batch.adaptive.auto-parallelism.default-source-parallelism`.
> >>
> >> Using Readable#getOptional(ConfigOption option) makes sense to me.
> >>
> >>> As a follow-up task, we may have a dedicated discussion in the future
> to
> >>> see if we need to change the default value of
> >>> `table.exec.hive.infer-source-parallelism` to false. Before then, user
> >> can
> >>> manually set `table.exec.hive.infer-source-parallelism` to false to
> >> enable
> >>> dynamic parallelism inference, and use
> >>> `execution.batch.adaptive.auto-parallelism.default-source-parallelism`
> to
> >>> replace `table.exec.hive.infer-source-parallelism.max` as the
> parallelism
> >>> inference upper bound. I have updated both the Flip's
> >>> DynamicParallelismInference interface implementation and Migration Plan
> >>> modules to illustrate this.
> >>
> >> In my opinion, moving HiveSource to subsequent discussion is not OK, see
> >> my explanation: HiveSource supports dynamic source parallel inference is
> >> one part of the FLIP implementation, it looks like that we introduce a
> >> configuration
> >> `execution.batch.adaptive.auto-parallelism.default-source-parallelism`
> >> which conflicts with existing configuration `table.exec.hive.infer-
> >> source-parallelism`. But we do not provide conflict resolution in the
> >> current flip, just postpone the work to future discussions, this
> uncertain
> >> state is likely to cause subsequent discussions to be shelved from past
> >> community work experience. I’d suggest we make this part clear in this
> FLIP.
> >>
> >>
> >> Best,
> >> Leonard
> >>
> >>
> >>>
> >>>
> >>> Leonard Xu  于2023年11月16日周四 12:36写道:
> >>>
>  Thanks Xia for the detailed reply.
> 
> >> `How user disable the parallelism inference if they want to use
> fixed
>  source parallelism?`
> >> `Could you explain the 

Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-11-24 Thread Leonard Xu
Thanks Xia and Zhu Zhu for driving this work,

It will help unify the parallelism inference for all operators of batch job, 
the updated FLIP looks good to me.

Best,
Leonard


> 2023年11月24日 下午5:53,Xia Sun  写道:
> 
> Hi all,
> Offline discussed with Zhu Zhu and Leonard Xu and we have reached the
> following three points of consensus:
> 
> 1. Rename the interface method Context#getMaxSourceParallelism proposed by
> the FLIP to Context#getParallelismInferenceUpperBound, to make the meaning
> of the method clearer. See [1] for details.
> 
> 2. We provide a more detailed explanation of the effective priority of the
> dynamic source parallelism inference proposed by this FLIP and the order of
> values for the upper bound of source parallelism. We also point out the
> current support and limitations of the AdaptiveBatchScheduler regarding
> source parallelism inference. See [2] for details.
> 
> 3. This FLIP will only focus on the framework-level implementation and will
> prioritize the implementation of FileSource as an example of the new
> interface proposed by the FLIP. The HiveSource, due to its existing static
> parallelism dynamic inference, and changes in default values for
> configuration items such as `table.exec.hive.infer-source-parallelism`,
> requires a more detailed migration plan, as well as more comprehensive
> design and discussion. It is not suitable as part of this FLIP and needs a
> separate FLIP. Therefore, we have removed the HiveSource part from this
> FLIP.
> 
> Thanks again to everyone who participated in the discussion.
> Looking forward to your continued feedback.
> 
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs#FLIP379:Dynamicsourceparallelisminferenceforbatchjobs-IntroduceDynamicParallelismInferenceinterfaceforSource
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs#FLIP379:Dynamicsourceparallelisminferenceforbatchjobs-ConfigurationBehaviorChanges
> 
> Best,
> Xia
> 
> Leonard Xu  于2023年11月22日周三 18:37写道:
> 
>> Thanks Xia for the  reply, sorry for the late reply.
>> 
>>> Thanks for pointing out the issue, the current wording does indeed seem
>> to
>>> be confusing. It involves the existing implementation of the
>>> AdaptiveBatchScheduler, where the dynamically inferred parallelism cannot
>>> exceed the JobVertex's maxParallelism (which is typically set to either
>> the
>>> global default max parallelism or the user-specified JobVertex max
>>> parallelism), so the flip maintains the logic. I have modified the flip
>> to
>>> avoid confusion as much as possible.
>> 
>> I didn’t see the change part in this FLIP, could you check it?
>> 
>>> We can use Configuration::getOptional to check if the user has configured
>>> the
>> `execution.batch.adaptive.auto-parallelism.default-source-parallelism`.
>> 
>> Using Readable#getOptional(ConfigOption option) makes sense to me.
>> 
>>> As a follow-up task, we may have a dedicated discussion in the future to
>>> see if we need to change the default value of
>>> `table.exec.hive.infer-source-parallelism` to false. Before then, user
>> can
>>> manually set `table.exec.hive.infer-source-parallelism` to false to
>> enable
>>> dynamic parallelism inference, and use
>>> `execution.batch.adaptive.auto-parallelism.default-source-parallelism` to
>>> replace `table.exec.hive.infer-source-parallelism.max` as the parallelism
>>> inference upper bound. I have updated both the Flip's
>>> DynamicParallelismInference interface implementation and Migration Plan
>>> modules to illustrate this.
>> 
>> In my opinion, moving HiveSource to subsequent discussion is not OK, see
>> my explanation: HiveSource supports dynamic source parallel inference is
>> one part of the FLIP implementation, it looks like that we introduce a
>> configuration
>> `execution.batch.adaptive.auto-parallelism.default-source-parallelism`
>> which conflicts with existing configuration `table.exec.hive.infer-
>> source-parallelism`. But we do not provide conflict resolution in the
>> current flip, just postpone the work to future discussions, this uncertain
>> state is likely to cause subsequent discussions to be shelved from past
>> community work experience. I’d suggest we make this part clear in this FLIP.
>> 
>> 
>> Best,
>> Leonard
>> 
>> 
>>> 
>>> 
>>> Leonard Xu  于2023年11月16日周四 12:36写道:
>>> 
 Thanks Xia for the detailed reply.
 
>> `How user disable the parallelism inference if they want to use fixed
 source parallelism?`
>> `Could you explain the priority the static parallelism set from table
 layer and the proposed dynamic source parallelism?`
> 
> From the user's perspective, if the user specifies a fixed parallelism
 for
> the source, dynamic source parallelism inference will be automatically
> disabled. From the perspective of priority, the user’s specified
> parallelism > the static parallelism 

Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-11-24 Thread Xia Sun
Hi all,
Offline discussed with Zhu Zhu and Leonard Xu and we have reached the
following three points of consensus:

1. Rename the interface method Context#getMaxSourceParallelism proposed by
the FLIP to Context#getParallelismInferenceUpperBound, to make the meaning
of the method clearer. See [1] for details.

2. We provide a more detailed explanation of the effective priority of the
dynamic source parallelism inference proposed by this FLIP and the order of
values for the upper bound of source parallelism. We also point out the
current support and limitations of the AdaptiveBatchScheduler regarding
source parallelism inference. See [2] for details.

3. This FLIP will only focus on the framework-level implementation and will
prioritize the implementation of FileSource as an example of the new
interface proposed by the FLIP. The HiveSource, due to its existing static
parallelism dynamic inference, and changes in default values for
configuration items such as `table.exec.hive.infer-source-parallelism`,
requires a more detailed migration plan, as well as more comprehensive
design and discussion. It is not suitable as part of this FLIP and needs a
separate FLIP. Therefore, we have removed the HiveSource part from this
FLIP.

Thanks again to everyone who participated in the discussion.
Looking forward to your continued feedback.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs#FLIP379:Dynamicsourceparallelisminferenceforbatchjobs-IntroduceDynamicParallelismInferenceinterfaceforSource
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs#FLIP379:Dynamicsourceparallelisminferenceforbatchjobs-ConfigurationBehaviorChanges

Best,
Xia

Leonard Xu  于2023年11月22日周三 18:37写道:

> Thanks Xia for the  reply, sorry for the late reply.
>
> > Thanks for pointing out the issue, the current wording does indeed seem
> to
> > be confusing. It involves the existing implementation of the
> > AdaptiveBatchScheduler, where the dynamically inferred parallelism cannot
> > exceed the JobVertex's maxParallelism (which is typically set to either
> the
> > global default max parallelism or the user-specified JobVertex max
> > parallelism), so the flip maintains the logic. I have modified the flip
> to
> > avoid confusion as much as possible.
>
> I didn’t see the change part in this FLIP, could you check it?
>
> > We can use Configuration::getOptional to check if the user has configured
> > the
> `execution.batch.adaptive.auto-parallelism.default-source-parallelism`.
>
> Using Readable#getOptional(ConfigOption option) makes sense to me.
>
> > As a follow-up task, we may have a dedicated discussion in the future to
> > see if we need to change the default value of
> > `table.exec.hive.infer-source-parallelism` to false. Before then, user
> can
> > manually set `table.exec.hive.infer-source-parallelism` to false to
> enable
> > dynamic parallelism inference, and use
> > `execution.batch.adaptive.auto-parallelism.default-source-parallelism` to
> > replace `table.exec.hive.infer-source-parallelism.max` as the parallelism
> > inference upper bound. I have updated both the Flip's
> > DynamicParallelismInference interface implementation and Migration Plan
> > modules to illustrate this.
>
> In my opinion, moving HiveSource to subsequent discussion is not OK, see
> my explanation: HiveSource supports dynamic source parallel inference is
> one part of the FLIP implementation, it looks like that we introduce a
> configuration
> `execution.batch.adaptive.auto-parallelism.default-source-parallelism`
> which conflicts with existing configuration `table.exec.hive.infer-
> source-parallelism`. But we do not provide conflict resolution in the
> current flip, just postpone the work to future discussions, this uncertain
> state is likely to cause subsequent discussions to be shelved from past
> community work experience. I’d suggest we make this part clear in this FLIP.
>
>
> Best,
> Leonard
>
>
> >
> >
> > Leonard Xu  于2023年11月16日周四 12:36写道:
> >
> >> Thanks Xia for the detailed reply.
> >>
>  `How user disable the parallelism inference if they want to use fixed
> >> source parallelism?`
>  `Could you explain the priority the static parallelism set from table
> >> layer and the proposed dynamic source parallelism?`
> >>>
> >>> From the user's perspective, if the user specifies a fixed parallelism
> >> for
> >>> the source, dynamic source parallelism inference will be automatically
> >>> disabled. From the perspective of priority, the user’s specified
> >>> parallelism > the static parallelism inference > dynamic parallelism
> >>> inference. Because the dynamic source parallelism inference will take
> >>> effect at the runtime stage and the validity conditions are: (1) the
> >>> current ExecutionGraph is a dynamic graph, and (2) the parallelism of
> the
> >>> source vertex is not specified (that is, the parallelism is 

Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-11-22 Thread Leonard Xu
Thanks Xia for the  reply, sorry for the late reply.

> Thanks for pointing out the issue, the current wording does indeed seem to
> be confusing. It involves the existing implementation of the
> AdaptiveBatchScheduler, where the dynamically inferred parallelism cannot
> exceed the JobVertex's maxParallelism (which is typically set to either the
> global default max parallelism or the user-specified JobVertex max
> parallelism), so the flip maintains the logic. I have modified the flip to
> avoid confusion as much as possible.

I didn’t see the change part in this FLIP, could you check it?

> We can use Configuration::getOptional to check if the user has configured
> the `execution.batch.adaptive.auto-parallelism.default-source-parallelism`.

Using Readable#getOptional(ConfigOption option) makes sense to me.

> As a follow-up task, we may have a dedicated discussion in the future to
> see if we need to change the default value of
> `table.exec.hive.infer-source-parallelism` to false. Before then, user can
> manually set `table.exec.hive.infer-source-parallelism` to false to enable
> dynamic parallelism inference, and use
> `execution.batch.adaptive.auto-parallelism.default-source-parallelism` to
> replace `table.exec.hive.infer-source-parallelism.max` as the parallelism
> inference upper bound. I have updated both the Flip's
> DynamicParallelismInference interface implementation and Migration Plan
> modules to illustrate this.

In my opinion, moving HiveSource to subsequent discussion is not OK, see my 
explanation: HiveSource supports dynamic source parallel inference is one part 
of the FLIP implementation, it looks like that we introduce a configuration 
`execution.batch.adaptive.auto-parallelism.default-source-parallelism` which 
conflicts with existing configuration `table.exec.hive.infer- 
source-parallelism`. But we do not provide conflict resolution in the current 
flip, just postpone the work to future discussions, this uncertain state is 
likely to cause subsequent discussions to be shelved from past community work 
experience. I’d suggest we make this part clear in this FLIP.


Best,
Leonard


> 
> 
> Leonard Xu  于2023年11月16日周四 12:36写道:
> 
>> Thanks Xia for the detailed reply.
>> 
 `How user disable the parallelism inference if they want to use fixed
>> source parallelism?`
 `Could you explain the priority the static parallelism set from table
>> layer and the proposed dynamic source parallelism?`
>>> 
>>> From the user's perspective, if the user specifies a fixed parallelism
>> for
>>> the source, dynamic source parallelism inference will be automatically
>>> disabled. From the perspective of priority, the user’s specified
>>> parallelism > the static parallelism inference > dynamic parallelism
>>> inference. Because the dynamic source parallelism inference will take
>>> effect at the runtime stage and the validity conditions are: (1) the
>>> current ExecutionGraph is a dynamic graph, and (2) the parallelism of the
>>> source vertex is not specified (that is, the parallelism is -1).
>> 
>> The priority explanation make sense to me, could you also add this
>> explanation to FLIP?
>> 
 `So, could we consider the boundness info when design the interface?
>> Both
>>> FileSource and Hive Source offer streaming read ability, imaging this
>> case:
>>> Flink Streaming Hive Source should not apply the dynamic source
>> parallelism
>>> even it implemented the feature as it severing a streaming job.`
>>> 
>>> Thanks for your feedback, it is reallly a good input. Currently, the
>>> dynamic parallelism inference logic is only triggered in batch jobs.
>>> Therefore, the logic will not be called in the streaming jobs.
>>> In the future, if streaming jobs also support runtime parallelism
>>> inference, then theoretically, the source can no longer be distinguished
>>> between streaming jobs and batch jobs at the runtime stage. In addition,
>>> since the new interface is implemented together with the Source
>> interface,
>>> the Source::getBoundedness() method can also be obtained when inferring
>>> parallelism.
>> 
>> +1 about the boundedness info part.
>> 
>> Okay, let’s come back the batch world and discuss some details about
>> current design:
>> 
>> (1) About  max source parallelism
>> The FLIP said:  (1) Max source parallelism, which is calculated as the
>> minimum of the default source parallelism
>> (`execution.batch.adaptive.auto-parallelism.default-source-parallelism`)
>> and JobVertex#maxParallelism. If the default-source-parallelism is not set,
>> the global default parallelism is used as the default source parallelism.
>> 
>> The  'Max source parallelism’ is the information that runtime offered to
>> Source as a hint to infer the actual parallelism, a name with max prefix
>> but calculated with minimum value confusing me a lot, especially when I
>> read the HiveSource pseudocode:
>> 
>> fileEnumerator.setMinNumSplits(maxSourceParallelism);
>> 
>> Although I understand that naming is a 

Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-11-20 Thread Xia Sun
Thanks Leonard for the detailed feedback and input.

> The 'Max source parallelism’ is the information that runtime offered to
Source as a hint to infer the actual parallelism, a name with max prefix
but calculated > with minimum value confusing me a lot, especially when I
read the HiveSource pseudocode:

> fileEnumerator.setMinNumSplits(maxSourceParallelism);

> Although I understand that naming is a complex topic in CS, could we
improve this method name a little?

Thanks for pointing out the issue, the current wording does indeed seem to
be confusing. It involves the existing implementation of the
AdaptiveBatchScheduler, where the dynamically inferred parallelism cannot
exceed the JobVertex's maxParallelism (which is typically set to either the
global default max parallelism or the user-specified JobVertex max
parallelism), so the flip maintains the logic. I have modified the flip to
avoid confusion as much as possible.

> And,
`execution.batch.adaptive.auto-parallelism.default-source-parallelism`
config has a default value > with 1, how we distinguish user set it or not
for

> example if user happen to set value 1 ?

We can use Configuration::getOptional to check if the user has configured
the `execution.batch.adaptive.auto-parallelism.default-source-parallelism`.

> No doubt that it’s a API breaking change, for existing hive users, the
migration path is not clear in this FLIP, for example, current users used
splits number

> to infer the source parallelism, after this FLIP, could we give the
recommended value of `execution.batch.adaptive.auto-parallelism.

> default-source-parallelism` or how to set it or event users do not need
to set anythins? And the replacement for migration replacement should add
to 'table.

> exec.hive.infer-source-parallelism’s description when we propose to
change its default value, right?

As a follow-up task, we may have a dedicated discussion in the future to
see if we need to change the default value of
`table.exec.hive.infer-source-parallelism` to false. Before then, user can
manually set `table.exec.hive.infer-source-parallelism` to false to enable
dynamic parallelism inference, and use
`execution.batch.adaptive.auto-parallelism.default-source-parallelism` to
replace `table.exec.hive.infer-source-parallelism.max` as the parallelism
inference upper bound. I have updated both the Flip's
DynamicParallelismInference interface implementation and Migration Plan
modules to illustrate this.

> The pseudocode code shows:

> fileEnumerator.getInferredSourceParallelsim();

> IIRC, our public API FileEnumerator never offers such method, introducing
getInferredSourceParallelsim() is also one part of our FLIP ?

The intent was to make the pseudo code easier to understand, but did
introduce some confusion. There are no plans to introduce
getInferredSourceParallelism() in HiveSource, and I've modified the
HiveSource pseudo code in flip.

Best regards,

Xia

Leonard Xu  于2023年11月16日周四 12:36写道:

> Thanks Xia for the detailed reply.
>
> >> `How user disable the parallelism inference if they want to use fixed
> source parallelism?`
> >> `Could you explain the priority the static parallelism set from table
> layer and the proposed dynamic source parallelism?`
> >
> > From the user's perspective, if the user specifies a fixed parallelism
> for
> > the source, dynamic source parallelism inference will be automatically
> > disabled. From the perspective of priority, the user’s specified
> > parallelism > the static parallelism inference > dynamic parallelism
> > inference. Because the dynamic source parallelism inference will take
> > effect at the runtime stage and the validity conditions are: (1) the
> > current ExecutionGraph is a dynamic graph, and (2) the parallelism of the
> > source vertex is not specified (that is, the parallelism is -1).
>
> The priority explanation make sense to me, could you also add this
> explanation to FLIP?
>
> >> `So, could we consider the boundness info when design the interface?
> Both
> > FileSource and Hive Source offer streaming read ability, imaging this
> case:
> > Flink Streaming Hive Source should not apply the dynamic source
> parallelism
> > even it implemented the feature as it severing a streaming job.`
> >
> > Thanks for your feedback, it is reallly a good input. Currently, the
> > dynamic parallelism inference logic is only triggered in batch jobs.
> > Therefore, the logic will not be called in the streaming jobs.
> > In the future, if streaming jobs also support runtime parallelism
> > inference, then theoretically, the source can no longer be distinguished
> > between streaming jobs and batch jobs at the runtime stage. In addition,
> > since the new interface is implemented together with the Source
> interface,
> > the Source::getBoundedness() method can also be obtained when inferring
> > parallelism.
>
> +1 about the boundedness info part.
>
> Okay, let’s come back the batch world and discuss some details about
> current design:
>
> (1) About  

Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-11-15 Thread Leonard Xu
Thanks Xia for the detailed reply.

>> `How user disable the parallelism inference if they want to use fixed source 
>> parallelism?`
>> `Could you explain the priority the static parallelism set from table layer 
>> and the proposed dynamic source parallelism?`
> 
> From the user's perspective, if the user specifies a fixed parallelism for
> the source, dynamic source parallelism inference will be automatically
> disabled. From the perspective of priority, the user’s specified
> parallelism > the static parallelism inference > dynamic parallelism
> inference. Because the dynamic source parallelism inference will take
> effect at the runtime stage and the validity conditions are: (1) the
> current ExecutionGraph is a dynamic graph, and (2) the parallelism of the
> source vertex is not specified (that is, the parallelism is -1).

The priority explanation make sense to me, could you also add this explanation 
to FLIP?  

>> `So, could we consider the boundness info when design the interface? Both
> FileSource and Hive Source offer streaming read ability, imaging this case:
> Flink Streaming Hive Source should not apply the dynamic source parallelism
> even it implemented the feature as it severing a streaming job.`
> 
> Thanks for your feedback, it is reallly a good input. Currently, the
> dynamic parallelism inference logic is only triggered in batch jobs.
> Therefore, the logic will not be called in the streaming jobs.
> In the future, if streaming jobs also support runtime parallelism
> inference, then theoretically, the source can no longer be distinguished
> between streaming jobs and batch jobs at the runtime stage. In addition,
> since the new interface is implemented together with the Source interface,
> the Source::getBoundedness() method can also be obtained when inferring
> parallelism.

+1 about the boundedness info part.

Okay, let’s come back the batch world and discuss some details about current 
design:

(1) About  max source parallelism
The FLIP said:  (1) Max source parallelism, which is calculated as the minimum 
of the default source parallelism 
(`execution.batch.adaptive.auto-parallelism.default-source-parallelism`) and 
JobVertex#maxParallelism. If the default-source-parallelism is not set, the 
global default parallelism is used as the default source parallelism. 

The  'Max source parallelism’ is the information that runtime offered to Source 
as a hint to infer the actual parallelism, a name with max prefix but 
calculated with minimum value confusing me a lot, especially when I read the 
HiveSource pseudocode:

fileEnumerator.setMinNumSplits(maxSourceParallelism);

Although I understand that naming is a complex topic in CS, could we improve 
this method name a little?  And,  
`execution.batch.adaptive.auto-parallelism.default-source-parallelism` config 
has a default value with 1, how we distinguish user set it or not for example 
if user happen to  set value 1 ?

(2) About changing default value of  'table.exec.hive.infer-source-parallelism'
The FLIP said:  As a follow-up task, we will consider changing the default 
value of 'table.exec.hive.infer-source-parallelism' to false.

No doubt that it’s a API breaking change, for existing hive users, the 
migration path is not clear in this FLIP, for example, current users used 
splits number to infer the source parallelism, after this FLIP,  could we give 
the recommended value of  
`execution.batch.adaptive.auto-parallelism.default-source-parallelism` or how 
to set it or event users do not need to set  anythins? And the replacement for 
migration replacement should add to  
'table.exec.hive.infer-source-parallelism’s description when we propose to 
change its default value, right? 

(3) [minor] About the HiveSource
The pseudocode  code shows: 

fileEnumerator.getInferredSourceParallelsim();

IIRC, our public API FileEnumerator never offers such method, introducing  
getInferredSourceParallelsim() is also one part of our FLIP ?

Best,
Leonard



> 
> Best regards,
> Xia
> 
> Leonard Xu  于2023年11月8日周三 16:19写道:
> 
>> Thanks Xia and Zhu Zhu for kickoff this discussion.
>> 
>> The dynamic source parallelism inference is a useful feature for batch
>> story. I’ve some comments about current design.
>> 
>> 1.How user disable the parallelism inference if they want to use fixed
>> source parallelism? They can configure fixed parallelism in table layer
>> currently as you explained above.
>> 
>> 2.Could you explain the priority the static parallelism set from table
>> layer and the proposed dynamic source parallelism? And changing the default
>> value `table.exec.hive.infer-source-parallelism` as a sub-task does not
>> resolve all case, because other Sources can set their own parallelism too.
>> 
>> 3.Current design only works for batch josb, the workflow for streaming job
>> may looks like (1) inference  parallelism for streaming source like kafka
>> (2) stop job with a savepoint  (3) apply new parallelism for job (4)
>> schedule the streaming 

Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-11-09 Thread Xia Sun
 Thanks Leonard for the feedback and sorry for my late response.

> `How user disable the parallelism inference if they want to use fixed
source parallelism?`
> `Could you explain the priority the static parallelism set from table
layer and the proposed dynamic source parallelism?`

>From the user's perspective, if the user specifies a fixed parallelism for
the source, dynamic source parallelism inference will be automatically
disabled. From the perspective of priority, the user’s specified
parallelism > the static parallelism inference > dynamic parallelism
inference. Because the dynamic source parallelism inference will take
effect at the runtime stage and the validity conditions are: (1) the
current ExecutionGraph is a dynamic graph, and (2) the parallelism of the
source vertex is not specified (that is, the parallelism is -1).

> `the workflow for streaming job may looks like ... which is totally
different, the later one lacks a lot of infra in Flink, right?`

Indeed, as of now, the dynamic parallelism inference is exclusively for
batch jobs, so it only takes into account the necessary information for
batch scenarios. In the future, when we introduce support for automatic
parallelism inference in streaming jobs, we can include the required
information for streaming jobs to avoid unnecessarily complicating the
current design.
Moreover, The workflow you mentioned seems a bit complicated. Our current
idea is to perform the parallelism inference during the initialization
phase of streaming jobs and proceed to schedule the entire job once the
source parallelism is determined. This process will naturally occur during
job startup, eliminating the need for additional restarts.

> `So, could we consider the boundness info when design the interface? Both
FileSource and Hive Source offer streaming read ability, imaging this case:
Flink Streaming Hive Source should not apply the dynamic source parallelism
even it implemented the feature as it severing a streaming job.`

Thanks for your feedback, it is reallly a good input. Currently, the
dynamic parallelism inference logic is only triggered in batch jobs.
Therefore, the logic will not be called in the streaming jobs.
In the future, if streaming jobs also support runtime parallelism
inference, then theoretically, the source can no longer be distinguished
between streaming jobs and batch jobs at the runtime stage. In addition,
since the new interface is implemented together with the Source interface,
the Source::getBoundedness() method can also be obtained when inferring
parallelism.

Best regards,
Xia

Leonard Xu  于2023年11月8日周三 16:19写道:

> Thanks Xia and Zhu Zhu for kickoff this discussion.
>
> The dynamic source parallelism inference is a useful feature for batch
> story. I’ve some comments about current design.
>
> 1.How user disable the parallelism inference if they want to use fixed
> source parallelism? They can configure fixed parallelism in table layer
> currently as you explained above.
>
> 2.Could you explain the priority the static parallelism set from table
> layer and the proposed dynamic source parallelism? And changing the default
> value `table.exec.hive.infer-source-parallelism` as a sub-task does not
> resolve all case, because other Sources can set their own parallelism too.
>
> 3.Current design only works for batch josb, the workflow for streaming job
> may looks like (1) inference  parallelism for streaming source like kafka
> (2) stop job with a savepoint  (3) apply new parallelism for job (4)
> schedule the streaming job from savepoint which is totally different, the
> later one lacks a lot of infra in Flink, right?  So, could we consider the
> boundness info when design the interface? Both FileSource and Hive Source
> offer streaming read ability, imaging this case: Flink Streaming Hive
> Source should not apply the dynamic source parallelism even it implemented
> the feature as it severing a streaming job.
>
> Best,
> Leonard
>
>
> > 2023年11月1日 下午6:21,Xia Sun  写道:
> >
> > Thanks Lijie for the comments!
> > 1. For Hive source, dynamic parallelism inference in batch scenarios is a
> > superset of static parallelism inference. As a follow-up task, we can
> > consider changing the default value of
> > 'table.exec.hive.infer-source-parallelism' to false.
> >
> > 2. I think that both dynamic parallelism inference and static parallelism
> > inference have their own use cases. Currently, for streaming sources and
> > other sources that are not sensitive to dynamic information, the benefits
> > of dynamic parallelism inference may not be significant. In such cases,
> we
> > can continue to use static parallelism inference.
> >
> > Thanks,
> > Xia
> >
> > Lijie Wang  于2023年11月1日周三 14:52写道:
> >
> >> Hi Xia,
> >>
> >> Thanks for driving this FLIP, +1 for the proposal.
> >>
> >> I have 2 questions about the relationship between static inference and
> >> dynamic inference:
> >>
> >> 1. AFAIK, currently the hive table source enable static inference by
> 

Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-11-08 Thread Leonard Xu
Thanks Xia and Zhu Zhu for kickoff this discussion. 

The dynamic source parallelism inference is a useful feature for batch story. 
I’ve some comments about current design.

1.How user disable the parallelism inference if they want to use fixed source 
parallelism? They can configure fixed parallelism in table layer currently as 
you explained above.

2.Could you explain the priority the static parallelism set from table layer 
and the proposed dynamic source parallelism? And changing the default value 
`table.exec.hive.infer-source-parallelism` as a sub-task does not resolve all 
case, because other Sources can set their own parallelism too.

3.Current design only works for batch josb, the workflow for streaming job may 
looks like (1) inference  parallelism for streaming source like kafka (2) stop 
job with a savepoint  (3) apply new parallelism for job (4) schedule the 
streaming job from savepoint which is totally different, the later one lacks a 
lot of infra in Flink, right?  So, could we consider the boundness info when 
design the interface? Both FileSource and Hive Source offer streaming read 
ability, imaging this case: Flink Streaming Hive Source should not apply the 
dynamic source parallelism even it implemented the feature as it severing a 
streaming job.

Best,
Leonard


> 2023年11月1日 下午6:21,Xia Sun  写道:
> 
> Thanks Lijie for the comments!
> 1. For Hive source, dynamic parallelism inference in batch scenarios is a
> superset of static parallelism inference. As a follow-up task, we can
> consider changing the default value of
> 'table.exec.hive.infer-source-parallelism' to false.
> 
> 2. I think that both dynamic parallelism inference and static parallelism
> inference have their own use cases. Currently, for streaming sources and
> other sources that are not sensitive to dynamic information, the benefits
> of dynamic parallelism inference may not be significant. In such cases, we
> can continue to use static parallelism inference.
> 
> Thanks,
> Xia
> 
> Lijie Wang  于2023年11月1日周三 14:52写道:
> 
>> Hi Xia,
>> 
>> Thanks for driving this FLIP, +1 for the proposal.
>> 
>> I have 2 questions about the relationship between static inference and
>> dynamic inference:
>> 
>> 1. AFAIK, currently the hive table source enable static inference by
>> default. In this case, which one (static vs dynamic) will take effect ? I
>> think it would be better if we can point this out in FLIP
>> 
>> 2. As you mentioned above, dynamic inference is the most ideal way, so do
>> we have plan to deprecate the static inference in the future?
>> 
>> Best,
>> Lijie
>> 
>> Zhu Zhu  于2023年10月31日周二 20:19写道:
>> 
>>> Thanks for opening the FLIP and kicking off this discussion, Xia!
>>> The proposed changes make up an important missing part of the dynamic
>>> parallelism inference of adaptive batch scheduler.
>>> 
>>> Besides that, it is also one good step towards supporting dynamic
>>> parallelism inference for streaming sources, e.g. allowing Kafka
>>> sources to determine its parallelism automatically based on the
>>> number of partitions.
>>> 
>>> +1 for the proposal.
>>> 
>>> Thanks,
>>> Zhu
>>> 
>>> Xia Sun  于2023年10月31日周二 16:01写道:
>>> 
 Hi everyone,
 I would like to start a discussion on FLIP-379: Dynamic source
>>> parallelism
 inference for batch jobs[1].
 
 In general, there are three main ways to set source parallelism for
>> batch
 jobs:
 (1) User-defined source parallelism.
 (2) Connector static parallelism inference.
 (3) Dynamic parallelism inference.
 
 Compared to manually setting parallelism, automatic parallelism
>> inference
 is easier to use and can better adapt to varying data volumes each day.
 However, static parallelism inference cannot leverage runtime
>>> information,
 resulting in inaccurate parallelism inference. Therefore, for batch
>> jobs,
 dynamic parallelism inference is the most ideal, but currently, the
>>> support
 for adaptive batch scheduler is not very comprehensive.
 
 Therefore, we aim to introduce a general interface that enables the
 adaptive batch scheduler to dynamically infer the source parallelism at
 runtime. Please refer to the FLIP[1] document for more details about
>> the
 proposed design and implementation.
 
 I also thank Zhu Zhu and LiJie Wang for their suggestions during the
 pre-discussion.
 Looking forward to your feedback and suggestions, thanks.
 
 [1]
 
 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs
 
 Best regards,
 Xia
 
>>> 
>> 



Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-11-01 Thread Xia Sun
Thanks Lijie for the comments!
1. For Hive source, dynamic parallelism inference in batch scenarios is a
superset of static parallelism inference. As a follow-up task, we can
consider changing the default value of
'table.exec.hive.infer-source-parallelism' to false.

2. I think that both dynamic parallelism inference and static parallelism
inference have their own use cases. Currently, for streaming sources and
other sources that are not sensitive to dynamic information, the benefits
of dynamic parallelism inference may not be significant. In such cases, we
can continue to use static parallelism inference.

Thanks,
Xia

Lijie Wang  于2023年11月1日周三 14:52写道:

> Hi Xia,
>
> Thanks for driving this FLIP, +1 for the proposal.
>
> I have 2 questions about the relationship between static inference and
> dynamic inference:
>
> 1. AFAIK, currently the hive table source enable static inference by
> default. In this case, which one (static vs dynamic) will take effect ? I
> think it would be better if we can point this out in FLIP
>
> 2. As you mentioned above, dynamic inference is the most ideal way, so do
> we have plan to deprecate the static inference in the future?
>
> Best,
> Lijie
>
> Zhu Zhu  于2023年10月31日周二 20:19写道:
>
> > Thanks for opening the FLIP and kicking off this discussion, Xia!
> > The proposed changes make up an important missing part of the dynamic
> > parallelism inference of adaptive batch scheduler.
> >
> > Besides that, it is also one good step towards supporting dynamic
> > parallelism inference for streaming sources, e.g. allowing Kafka
> > sources to determine its parallelism automatically based on the
> > number of partitions.
> >
> > +1 for the proposal.
> >
> > Thanks,
> > Zhu
> >
> > Xia Sun  于2023年10月31日周二 16:01写道:
> >
> > > Hi everyone,
> > > I would like to start a discussion on FLIP-379: Dynamic source
> > parallelism
> > > inference for batch jobs[1].
> > >
> > > In general, there are three main ways to set source parallelism for
> batch
> > > jobs:
> > > (1) User-defined source parallelism.
> > > (2) Connector static parallelism inference.
> > > (3) Dynamic parallelism inference.
> > >
> > > Compared to manually setting parallelism, automatic parallelism
> inference
> > > is easier to use and can better adapt to varying data volumes each day.
> > > However, static parallelism inference cannot leverage runtime
> > information,
> > > resulting in inaccurate parallelism inference. Therefore, for batch
> jobs,
> > > dynamic parallelism inference is the most ideal, but currently, the
> > support
> > > for adaptive batch scheduler is not very comprehensive.
> > >
> > > Therefore, we aim to introduce a general interface that enables the
> > > adaptive batch scheduler to dynamically infer the source parallelism at
> > > runtime. Please refer to the FLIP[1] document for more details about
> the
> > > proposed design and implementation.
> > >
> > > I also thank Zhu Zhu and LiJie Wang for their suggestions during the
> > > pre-discussion.
> > > Looking forward to your feedback and suggestions, thanks.
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs
> > >
> > > Best regards,
> > > Xia
> > >
> >
>


Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-11-01 Thread Lijie Wang
Hi Xia,

Thanks for driving this FLIP, +1 for the proposal.

I have 2 questions about the relationship between static inference and
dynamic inference:

1. AFAIK, currently the hive table source enable static inference by
default. In this case, which one (static vs dynamic) will take effect ? I
think it would be better if we can point this out in FLIP

2. As you mentioned above, dynamic inference is the most ideal way, so do
we have plan to deprecate the static inference in the future?

Best,
Lijie

Zhu Zhu  于2023年10月31日周二 20:19写道:

> Thanks for opening the FLIP and kicking off this discussion, Xia!
> The proposed changes make up an important missing part of the dynamic
> parallelism inference of adaptive batch scheduler.
>
> Besides that, it is also one good step towards supporting dynamic
> parallelism inference for streaming sources, e.g. allowing Kafka
> sources to determine its parallelism automatically based on the
> number of partitions.
>
> +1 for the proposal.
>
> Thanks,
> Zhu
>
> Xia Sun  于2023年10月31日周二 16:01写道:
>
> > Hi everyone,
> > I would like to start a discussion on FLIP-379: Dynamic source
> parallelism
> > inference for batch jobs[1].
> >
> > In general, there are three main ways to set source parallelism for batch
> > jobs:
> > (1) User-defined source parallelism.
> > (2) Connector static parallelism inference.
> > (3) Dynamic parallelism inference.
> >
> > Compared to manually setting parallelism, automatic parallelism inference
> > is easier to use and can better adapt to varying data volumes each day.
> > However, static parallelism inference cannot leverage runtime
> information,
> > resulting in inaccurate parallelism inference. Therefore, for batch jobs,
> > dynamic parallelism inference is the most ideal, but currently, the
> support
> > for adaptive batch scheduler is not very comprehensive.
> >
> > Therefore, we aim to introduce a general interface that enables the
> > adaptive batch scheduler to dynamically infer the source parallelism at
> > runtime. Please refer to the FLIP[1] document for more details about the
> > proposed design and implementation.
> >
> > I also thank Zhu Zhu and LiJie Wang for their suggestions during the
> > pre-discussion.
> > Looking forward to your feedback and suggestions, thanks.
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs
> >
> > Best regards,
> > Xia
> >
>


Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-10-31 Thread Zhu Zhu
Thanks for opening the FLIP and kicking off this discussion, Xia!
The proposed changes make up an important missing part of the dynamic
parallelism inference of adaptive batch scheduler.

Besides that, it is also one good step towards supporting dynamic
parallelism inference for streaming sources, e.g. allowing Kafka
sources to determine its parallelism automatically based on the
number of partitions.

+1 for the proposal.

Thanks,
Zhu

Xia Sun  于2023年10月31日周二 16:01写道:

> Hi everyone,
> I would like to start a discussion on FLIP-379: Dynamic source parallelism
> inference for batch jobs[1].
>
> In general, there are three main ways to set source parallelism for batch
> jobs:
> (1) User-defined source parallelism.
> (2) Connector static parallelism inference.
> (3) Dynamic parallelism inference.
>
> Compared to manually setting parallelism, automatic parallelism inference
> is easier to use and can better adapt to varying data volumes each day.
> However, static parallelism inference cannot leverage runtime information,
> resulting in inaccurate parallelism inference. Therefore, for batch jobs,
> dynamic parallelism inference is the most ideal, but currently, the support
> for adaptive batch scheduler is not very comprehensive.
>
> Therefore, we aim to introduce a general interface that enables the
> adaptive batch scheduler to dynamically infer the source parallelism at
> runtime. Please refer to the FLIP[1] document for more details about the
> proposed design and implementation.
>
> I also thank Zhu Zhu and LiJie Wang for their suggestions during the
> pre-discussion.
> Looking forward to your feedback and suggestions, thanks.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs
>
> Best regards,
> Xia
>


[DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-10-31 Thread Xia Sun
Hi everyone,
I would like to start a discussion on FLIP-379: Dynamic source parallelism
inference for batch jobs[1].

In general, there are three main ways to set source parallelism for batch
jobs:
(1) User-defined source parallelism.
(2) Connector static parallelism inference.
(3) Dynamic parallelism inference.

Compared to manually setting parallelism, automatic parallelism inference
is easier to use and can better adapt to varying data volumes each day.
However, static parallelism inference cannot leverage runtime information,
resulting in inaccurate parallelism inference. Therefore, for batch jobs,
dynamic parallelism inference is the most ideal, but currently, the support
for adaptive batch scheduler is not very comprehensive.

Therefore, we aim to introduce a general interface that enables the
adaptive batch scheduler to dynamically infer the source parallelism at
runtime. Please refer to the FLIP[1] document for more details about the
proposed design and implementation.

I also thank Zhu Zhu and LiJie Wang for their suggestions during the
pre-discussion.
Looking forward to your feedback and suggestions, thanks.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs

Best regards,
Xia