[ 
https://issues.apache.org/jira/browse/FLINK-36192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878747#comment-17878747
 ] 

yuanfenghu edited comment on FLINK-36192 at 9/3/24 7:18 AM:
------------------------------------------------------------

{quote}Overall LGTM, could we involve the vertex id in the message? It's useful 
to distinguish which source is limited when one job has multiple sources.
{quote}
How about this:

*vertex \{xxx}*  {*}unable to scale up due to the limit of the  ({*}{*}number 
of{*} {*}source partitions\ maxParallelism). expected p{*}{*}arallelism : xxx , 
actual (partition \{*} {*}maxParallelism{*}{*}): xxx{*}


was (Author: JIRAUSER296932):
{quote}Overall LGTM, could we involve the vertex id in the message? It's useful 
to distinguish which source is limited when one job has multiple sources.
{quote}
How about this:

*vertex \{xxx}*  {*}unable to scale up due to the limit of the  ({*}{*}number 
of{*} {*}source partitions\ maxParallelism). expected p{*}{*}arallelism : xxx , 
actual (partition \{*} {*}maxParallelism{*}{*}): xxx{*}

> Optimize the logic to make it the common divisor of the partition number of 
> the data source when determining the parallelism of the source task.
> ------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-36192
>                 URL: https://issues.apache.org/jira/browse/FLINK-36192
>             Project: Flink
>          Issue Type: Improvement
>          Components: Autoscaler
>            Reporter: yuanfenghu
>            Priority: Minor
>
> *Description:*
> We hope that when we know the number of partitions of Kafka data, we can try 
> our best to make the parallelism of tasks that consume Kafka equal to the 
> common divisor of the partitions, so that the tasks that are consumed can be 
> balanced.
>  
> {*}current logic{*}:
> Currently, the parallelism of tasks in the autoscaler is determined as 
> follows:
> step1: Calculate the processing rate of the task target and the corresponding 
> parallelism p1
> step2: Use the currently calculated degree of parallelism and the maximum 
> degree of parallelism of the operator to calculate, and take out the greatest 
> common divisor p2 of the maximum degree of parallelism / 2. If p2 < 
> maxparalleliem / 2, use p2 as the final degree of parallelism. If p2 > 
> maxparalleliem / 2 then use p1 as the final parallelism
> If the task that needs to be judged is a task that consumes Kafka or Pulsar, 
> the maximum parallelism of the task will be determined first: if the number 
> of partitions < the maximum parallelism of the current task, then the maximum 
> parallelism of the current task is the number of partitions of Kafka or 
> Pulsar. , otherwise the maximum degree of parallelism remains unchanged, so 
> there are the following situations:
> When the number of partitions in kafka or pulsar is less than the maximum 
> parallelism of the operator
> 1. If the parallelism calculated in step 1 <the number of kafka or pulsar 
> partitions/2, then the demand is met and the number of tasks can be balanced.
> 2. If the parallelism calculated in step 1 > the number of kafka or pulsar 
> partitions / 2, use the parallelism calculated in step 1. At this time, the 
> consumption will become unbalanced. For example, the number of partitions in 
> kafka is 64, and the expected parallelism calculated in step 1 is If the 
> degree is 48, the final task parallelism degree is 48
> When the number of partitions in kafka or pulsar is greater than the maximum 
> parallelism of the operator
> Calculate the parallelism completely according to the logic of step 1. For 
> example, the parallelism of one of my kafka partitions is 200, and the 
> maximum parallelism of the operator is 128. Then the calculated parallelism 
> is 2, 4, 8, 16... It is very likely that Kafka cannot be consumed evenly
>  
> {*}expect logic{*}:
>  * When the number of partitions is less than the maximum parallelism, 
> determine the number of parallelism of the task as the common divisor of the 
> number of partitions.
>  * When the number of partitions is greater than the maximum parallelism, the 
> number of parallelism of the task is determined to be the common divisor of 
> the number of partitions but does not exceed the maximum parallelism.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to