Re: Poor Load Balancing across TaskManagers for Multiple Kafka Sources

2024-06-05 Thread Kevin Lam
cc. panyuep...@apache.org as related to FLIP-370

On Wed, Jun 5, 2024 at 2:32 PM Kevin Lam  wrote:

> Hey all,
>
> I'm seeing an issue with poor load balancing across TaskManagers for Kafka
> Sources using the Flink SQL API and wondering if FLIP-370 will help with
> it, or if not, interested in any ideas the community has to mitigate the
> issue.
>
> The Kafka SplitEnumerator uses the following logic to assign split owners 
> (code
> pointer
> 
> ):
>
> ```
>   static int getSplitOwner(TopicPartition tp, int numReaders) {
> int startIndex = ((tp.topic().hashCode() * 31) & 0x7FFF) %
> numReaders;
> return (startIndex + tp.partition()) % numReaders;
> }
> ```
>
> However this can result in imbalanced distribution of kafka partition
> consumers across task managers.
>
> To illustrate, I created a pipeline that consumes from 2 kafka topics,
> each with 8 partitions, and just sinks them to a blackhole connector sink.
> For a parallelism of 16 and 1 task slot per TaskManager, we'd ideally
> expect each TaskManager to get its own kafka partition. ie. 16 partitions
> (8 partitions from each topic) split evenly across TaskManagers. However,
> due the algorithm I linked and how the startIndex is computed, I have
> observed a bunch of TaskManagers with 2 partitions (one from each topic),
> and some TaskManager completely idle.
>
> I've also run an experiment with the same pipeline where I set parallelism
> such that each task manager gets exactly 1 partition, and compared it
> against when each task manager gets exactly 2 partitions (one from each
> topic). I ensured this was the case by setting an appropriate parallelism,
> and ran the jobs on an application cluster. Since the partitions are fixed,
> the extra parallelism if any isn't used. The case where there is exactly 1
> partition per TaskManager processes a fixed set of data 20% faster.
>
> I was reading FLIP-370
> 
> and understand it will improve task scheduling in certain scenarios. Will
> FLIP-370 help with this KafkaSource scenario? If not any ideas to improve
> the subtask scheduling for KafkaSources? Ideally we don't need to carefully
> consider the partition + resulting task distribution when selecting our
> parallelism values.
>
> Thanks for your help!
>


Re: Poor Load Balancing across TaskManagers for Multiple Kafka Sources

2024-06-05 Thread Kevin Lam
I've also just found https://issues.apache.org/jira/browse/FLINK-31762
which tracks the Kafka specific issue.

On Wed, Jun 5, 2024 at 3:05 PM Kevin Lam  wrote:

> cc. panyuep...@apache.org as related to FLIP-370
>
> On Wed, Jun 5, 2024 at 2:32 PM Kevin Lam  wrote:
>
>> Hey all,
>>
>> I'm seeing an issue with poor load balancing across TaskManagers for
>> Kafka Sources using the Flink SQL API and wondering if FLIP-370 will help
>> with it, or if not, interested in any ideas the community has to mitigate
>> the issue.
>>
>> The Kafka SplitEnumerator uses the following logic to assign split owners
>> (code pointer
>> 
>> ):
>>
>> ```
>>   static int getSplitOwner(TopicPartition tp, int numReaders) {
>> int startIndex = ((tp.topic().hashCode() * 31) & 0x7FFF) %
>> numReaders;
>> return (startIndex + tp.partition()) % numReaders;
>> }
>> ```
>>
>> However this can result in imbalanced distribution of kafka partition
>> consumers across task managers.
>>
>> To illustrate, I created a pipeline that consumes from 2 kafka topics,
>> each with 8 partitions, and just sinks them to a blackhole connector sink.
>> For a parallelism of 16 and 1 task slot per TaskManager, we'd ideally
>> expect each TaskManager to get its own kafka partition. ie. 16 partitions
>> (8 partitions from each topic) split evenly across TaskManagers. However,
>> due the algorithm I linked and how the startIndex is computed, I have
>> observed a bunch of TaskManagers with 2 partitions (one from each topic),
>> and some TaskManager completely idle.
>>
>> I've also run an experiment with the same pipeline where I set
>> parallelism such that each task manager gets exactly 1 partition, and
>> compared it against when each task manager gets exactly 2 partitions (one
>> from each topic). I ensured this was the case by setting an appropriate
>> parallelism, and ran the jobs on an application cluster. Since the
>> partitions are fixed, the extra parallelism if any isn't used. The case
>> where there is exactly 1 partition per TaskManager processes a fixed set of
>> data 20% faster.
>>
>> I was reading FLIP-370
>> 
>> and understand it will improve task scheduling in certain scenarios. Will
>> FLIP-370 help with this KafkaSource scenario? If not any ideas to improve
>> the subtask scheduling for KafkaSources? Ideally we don't need to carefully
>> consider the partition + resulting task distribution when selecting our
>> parallelism values.
>>
>> Thanks for your help!
>>
>


Re: Poor Load Balancing across TaskManagers for Multiple Kafka Sources

2024-06-05 Thread Zhanghao Chen
Hi Kevin,

The problem here is about how to evenly distribute partitions from multiple 
Kafka topics to tasks, while FLIP-370 is only concerned about how to evenly 
distribute tasks to slots & taskmanagers, so FLIP-370 won't help here.

Best,
Zhanghao Chen

From: Kevin Lam 
Sent: Thursday, June 6, 2024 2:32
To: dev@flink.apache.org 
Subject: Poor Load Balancing across TaskManagers for Multiple Kafka Sources

Hey all,

I'm seeing an issue with poor load balancing across TaskManagers for Kafka
Sources using the Flink SQL API and wondering if FLIP-370 will help with
it, or if not, interested in any ideas the community has to mitigate the
issue.

The Kafka SplitEnumerator uses the following logic to assign split owners (code
pointer

):

```
  static int getSplitOwner(TopicPartition tp, int numReaders) {
int startIndex = ((tp.topic().hashCode() * 31) & 0x7FFF) %
numReaders;
return (startIndex + tp.partition()) % numReaders;
}
```

However this can result in imbalanced distribution of kafka partition
consumers across task managers.

To illustrate, I created a pipeline that consumes from 2 kafka topics, each
with 8 partitions, and just sinks them to a blackhole connector sink. For a
parallelism of 16 and 1 task slot per TaskManager, we'd ideally expect each
TaskManager to get its own kafka partition. ie. 16 partitions (8 partitions
from each topic) split evenly across TaskManagers. However, due the
algorithm I linked and how the startIndex is computed, I have observed a
bunch of TaskManagers with 2 partitions (one from each topic), and some
TaskManager completely idle.

I've also run an experiment with the same pipeline where I set parallelism
such that each task manager gets exactly 1 partition, and compared it
against when each task manager gets exactly 2 partitions (one from each
topic). I ensured this was the case by setting an appropriate parallelism,
and ran the jobs on an application cluster. Since the partitions are fixed,
the extra parallelism if any isn't used. The case where there is exactly 1
partition per TaskManager processes a fixed set of data 20% faster.

I was reading FLIP-370

and understand it will improve task scheduling in certain scenarios. Will
FLIP-370 help with this KafkaSource scenario? If not any ideas to improve
the subtask scheduling for KafkaSources? Ideally we don't need to carefully
consider the partition + resulting task distribution when selecting our
parallelism values.

Thanks for your help!


Re: Poor Load Balancing across TaskManagers for Multiple Kafka Sources

2024-06-06 Thread Kevin Lam
Thanks for the response Zhanghao. Since FLIP-370 won't help, any ideas on
how this can be improved? Can we round-robin assign partitions from all
KafkaTopics to TaskManagers as suggested in
https://issues.apache.org/jira/browse/FLINK-31762?

On Wed, Jun 5, 2024 at 10:52 PM Zhanghao Chen 
wrote:

> Hi Kevin,
>
> The problem here is about how to evenly distribute partitions from
> multiple Kafka topics to tasks, while FLIP-370 is only concerned about how
> to evenly distribute tasks to slots & taskmanagers, so FLIP-370 won't help
> here.
>
> Best,
> Zhanghao Chen
> 
> From: Kevin Lam 
> Sent: Thursday, June 6, 2024 2:32
> To: dev@flink.apache.org 
> Subject: Poor Load Balancing across TaskManagers for Multiple Kafka Sources
>
> Hey all,
>
> I'm seeing an issue with poor load balancing across TaskManagers for Kafka
> Sources using the Flink SQL API and wondering if FLIP-370 will help with
> it, or if not, interested in any ideas the community has to mitigate the
> issue.
>
> The Kafka SplitEnumerator uses the following logic to assign split owners
> (code
> pointer
> <
> https://github.com/apache/flink-connector-kafka/blob/00c9c8c74121136a0c1710ac77f307dc53adae99/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L469
> >
> ):
>
> ```
>   static int getSplitOwner(TopicPartition tp, int numReaders) {
> int startIndex = ((tp.topic().hashCode() * 31) & 0x7FFF) %
> numReaders;
> return (startIndex + tp.partition()) % numReaders;
> }
> ```
>
> However this can result in imbalanced distribution of kafka partition
> consumers across task managers.
>
> To illustrate, I created a pipeline that consumes from 2 kafka topics, each
> with 8 partitions, and just sinks them to a blackhole connector sink. For a
> parallelism of 16 and 1 task slot per TaskManager, we'd ideally expect each
> TaskManager to get its own kafka partition. ie. 16 partitions (8 partitions
> from each topic) split evenly across TaskManagers. However, due the
> algorithm I linked and how the startIndex is computed, I have observed a
> bunch of TaskManagers with 2 partitions (one from each topic), and some
> TaskManager completely idle.
>
> I've also run an experiment with the same pipeline where I set parallelism
> such that each task manager gets exactly 1 partition, and compared it
> against when each task manager gets exactly 2 partitions (one from each
> topic). I ensured this was the case by setting an appropriate parallelism,
> and ran the jobs on an application cluster. Since the partitions are fixed,
> the extra parallelism if any isn't used. The case where there is exactly 1
> partition per TaskManager processes a fixed set of data 20% faster.
>
> I was reading FLIP-370
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling
> >
> and understand it will improve task scheduling in certain scenarios. Will
> FLIP-370 help with this KafkaSource scenario? If not any ideas to improve
> the subtask scheduling for KafkaSources? Ideally we don't need to carefully
> consider the partition + resulting task distribution when selecting our
> parallelism values.
>
> Thanks for your help!
>