[ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16611610#comment-16611610
 ] 

Navinder Brar edited comment on KAFKA-7149 at 9/14/18 8:01 AM:
---------------------------------------------------------------

[~guozhang] [~mjsax] I am working with [~asurana] on raising this PR, will send 
it in a couple of days. Currently, the changes I have made is using taskIds 
instead of topicPartitions in AssignmentInfo. But another thing I observed is 
we are sending the same assignmentInfo to all consumers, so we are replicating 
the complete assignment(of all hosts and partitions) to all the consumers. 
Maybe we can take out partitionsByHost(newly TaskIdsByHost) map from the 
consumers array so that it is not replicated for all the hosts and is sent just 
once. With the current changes(changing TopicPartitions to TaskIDs and using 
GZIP compression) I have reduced assignment size(on 400 hosts with 3 threads 
each, having 512 partitions) from 196 MBs to 8 MB). If we can stop the 
replication of partitionsByHost on each consumer, the assignment size can be 
reduced to a few hundred kbs). Please share your thoughts.


was (Author: navibrar):
[~guozhang] I am working with [~asurana] on raising this PR, will send it in a 
couple of days. Currently, the changes I have made is using taskIds instead of 
topicPartitions in AssignmentInfo. But another thing I observed is we are 
sending the same assignmentInfo to all consumers, so we are replicating the 
complete assignment(of all hosts and partitions) to all the consumers. Maybe we 
can take out partitionsByHost(newly TaskIdsByHost) map from the consumers array 
so that it is not replicated for all the hosts and is sent just once. With the 
current changes(changing TopicPartitions to TaskIDs and using GZIP compression) 
I have reduced assignment size(on 400 hosts with 3 threads each, having 512 
partitions) from 196 MBs to 8 MB). If we can stop the replication of 
partitionsByHost on each consumer, the assignment size can be reduced to a few 
hundred kbs). Please share your thoughts.

> Reduce assignment data size to improve kafka streams scalability
> ----------------------------------------------------------------
>
>                 Key: KAFKA-7149
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7149
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Ashish Surana
>            Assignee: Ashish Surana
>            Priority: Major
>
> We observed that when we have high number of partitions, instances or 
> stream-threads, assignment-data size grows too fast and we start getting 
> below RecordTooLargeException at kafka-broker.
> Workaround of this issue is commented at: 
> https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of 
> assignment data for each rebalancing affects performance & reliability 
> (timeout exceptions starts appearing) as well. Also this limits kafka streams 
> scale even with high max.message.bytes setting as data size increases pretty 
> quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed 
> assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
> the kafka streams scalability drastically for us and we could now run it with 
> more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to