[ https://issues.apache.org/jira/browse/FLINK-36192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878735#comment-17878735 ]
Rui Fan commented on FLINK-36192: --------------------------------- {quote}{color:#172b4d}like {color}{*}eventType='{*}{color:#172b4d}*Warning'* {*}reason = 'scaleUp{*}{*}Limited' message = 'currently unable to scale up due to the limit of the number of source partitions.'{*} ?{color} {quote} Overall LGTM, could we involve the vertex id in the message? It's useful to distinguish which source is limited when one job has multiple sources. > Optimize the logic to make it the common divisor of the partition number of > the data source when determining the parallelism of the source task. > ------------------------------------------------------------------------------------------------------------------------------------------------ > > Key: FLINK-36192 > URL: https://issues.apache.org/jira/browse/FLINK-36192 > Project: Flink > Issue Type: Improvement > Components: Autoscaler > Reporter: yuanfenghu > Priority: Minor > > *Description:* > We hope that when we know the number of partitions of Kafka data, we can try > our best to make the parallelism of tasks that consume Kafka equal to the > common divisor of the partitions, so that the tasks that are consumed can be > balanced. > > {*}current logic{*}: > Currently, the parallelism of tasks in the autoscaler is determined as > follows: > step1: Calculate the processing rate of the task target and the corresponding > parallelism p1 > step2: Use the currently calculated degree of parallelism and the maximum > degree of parallelism of the operator to calculate, and take out the greatest > common divisor p2 of the maximum degree of parallelism / 2. If p2 < > maxparalleliem / 2, use p2 as the final degree of parallelism. If p2 > > maxparalleliem / 2 then use p1 as the final parallelism > If the task that needs to be judged is a task that consumes Kafka or Pulsar, > the maximum parallelism of the task will be determined first: if the number > of partitions < the maximum parallelism of the current task, then the maximum > parallelism of the current task is the number of partitions of Kafka or > Pulsar. , otherwise the maximum degree of parallelism remains unchanged, so > there are the following situations: > When the number of partitions in kafka or pulsar is less than the maximum > parallelism of the operator > 1. If the parallelism calculated in step 1 <the number of kafka or pulsar > partitions/2, then the demand is met and the number of tasks can be balanced. > 2. If the parallelism calculated in step 1 > the number of kafka or pulsar > partitions / 2, use the parallelism calculated in step 1. At this time, the > consumption will become unbalanced. For example, the number of partitions in > kafka is 64, and the expected parallelism calculated in step 1 is If the > degree is 48, the final task parallelism degree is 48 > When the number of partitions in kafka or pulsar is greater than the maximum > parallelism of the operator > Calculate the parallelism completely according to the logic of step 1. For > example, the parallelism of one of my kafka partitions is 200, and the > maximum parallelism of the operator is 128. Then the calculated parallelism > is 2, 4, 8, 16... It is very likely that Kafka cannot be consumed evenly > > {*}expect logic{*}: > * When the number of partitions is less than the maximum parallelism, > determine the number of parallelism of the task as the common divisor of the > number of partitions. > * When the number of partitions is greater than the maximum parallelism, the > number of parallelism of the task is determined to be the common divisor of > the number of partitions but does not exceed the maximum parallelism. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)