[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15325258#comment-15325258
 ] 

Guozhang Wang commented on KAFKA-3775:
--------------------------------------

Hi [~kawamuray] Thanks for sharing your usage scenarios, it is very helpful for 
us to make user experience improvements.

In the long run, we definitely would like to make convenient memory management 
in Kafka Streams since 1) many users may start their applications in a 
container with strict memory limit, and 2) we want to control the case where 
task migration caused by, say failures, can cause cascading OOMs on other 
instances because of sudden increase of memory for new tasks; this is a similar 
scenario with your case but just in an reversed order: changing from multiple 
instances to less instances. And I agree that the static {{partition.grouper}} 
config is not best suited here. There are already some discussion in the KIP-63 
thread, which I will try to summarize in a centralized wiki.

In the near term, we can remove the continuous {{poll(0)}} just for rebalance 
once KIP-62 is adopted, which will handle the heartbeat mechanism of the 
consumer and hence streams do not need to worry about frequent polling just for 
that. After this change, the memory pressure from {{ConsumerRecord}} should be 
reduced.

Does that sound good to you?

> Throttle maximum number of tasks assigned to a single KafkaStreams
> ------------------------------------------------------------------
>
>                 Key: KAFKA-3775
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3775
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 0.10.0.0
>            Reporter: Yuto Kawamura
>            Assignee: Yuto Kawamura
>             Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it 
> accepts all incoming messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that 
> users don't have a away to control maximum "partitions" assigned to a single 
> shard(an instance of KafkaStreams here). Users should be allowed to provide 
> the maximum amount of partitions that is considered as possible to be 
> processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter 
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of 
> partition) assigned to the processId(which is the notion of single 
> KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to 
> tolerate the incomplete assignment. That is, Kafka Streams should continue 
> working for the part of partitions even there are some partitions left 
> unassigned, in order to satisfy this> "user may want to take an option to 
> start the first single instance and check if it works as expected with 
> lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will 
> continue sophisticating it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to