[ https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yuto Kawamura closed KAFKA-3775. -------------------------------- > Throttle maximum number of tasks assigned to a single KafkaStreams > ------------------------------------------------------------------ > > Key: KAFKA-3775 > URL: https://issues.apache.org/jira/browse/KAFKA-3775 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 0.10.0.0 > Reporter: Yuto Kawamura > Assignee: Yuto Kawamura > Fix For: 0.10.1.0 > > > As of today, if I start a Kafka Streams app on a single machine which > consists of single KafkaStreams instance, that instance gets all partitions > of the target topic assigned. > As we're using it to process topics which has huge number of partitions and > message traffic, it is a problem that we don't have a way of throttling the > maximum amount of partitions assigned to a single instance. > In fact, when we started a Kafka Streams app which consumes a topic which has > more than 10MB/sec traffic of each partition we saw that all partitions > assigned to the first instance and soon the app dead by OOM. > I know that there's some workarounds considerable here. for example: > - Start multiple instances at once so the partitions distributed evenly. > => Maybe works. but as Kafka Streams is a library but not an execution > framework, there's no predefined procedure of starting Kafka Streams apps so > some users might wanna take an option to start the first single instance and > check if it works as expected with lesster number of partitions(I want :p) > - Adjust config parameters such as {{buffered.records.per.partition}}, > {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap > pressure. > => Maybe works. but still have two problems IMO: > - Still leads traffic explosion with high throughput processing as it > accepts all incoming messages from hundreads of partitions. > - In the first place, by the distributed system principle, it's wired that > users don't have a away to control maximum "partitions" assigned to a single > shard(an instance of KafkaStreams here). Users should be allowed to provide > the maximum amount of partitions that is considered as possible to be > processed with single instance(or host). > Here, I'd like to introduce a new configuration parameter > {{max.tasks.assigned}}, which limits the number of tasks(a notion of > partition) assigned to the processId(which is the notion of single > KafkaStreams instance). > At the same time we need to change StreamPartitionAssignor(TaskAssignor) to > tolerate the incomplete assignment. That is, Kafka Streams should continue > working for the part of partitions even there are some partitions left > unassigned, in order to satisfy this> "user may want to take an option to > start the first single instance and check if it works as expected with > lesster number of partitions(I want :p)". > I've implemented the rough POC for this. PTAL and if it make sense I will > continue sophisticating it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)