[jira] [Comment Edited] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16905430#comment-16905430 ] Vinoth Chandar edited comment on KAFKA-7149 at 8/12/19 5:47 PM: While looking at changes to detect topology changes, realized this was still more complicated than it may be has to. Turns out just dictionary encoding the Topic names on the wire, can provide similar gains as both approaches above. PR is updated based on that.. This is a lot simpler . {code:java} oldAssignmentInfoBytes : 77684 , newAssignmentInfoBytes: 33226{code} P.S: My benchmark code {code:java} public static void main(String[] args) { // Assumption : Streams topology with 10 input topics, 4 sub topologies (2 topics per sub topology) = ~20 topics // High number of hosts = 500; High number of partitions = 128 final int numStandbyPerTask = 2; final String topicPrefix = "streams_topic_name"; final int numTopicGroups = 4; final int numHosts = 500; final int numTopics = 20; final int partitionPerTopic = 128; final int numTasks = partitionPerTopic * numTopics; List activeTasks = new ArrayList<>(); Map> standbyTasks = new HashMap<>(); Map> partitionsByHost = new HashMap<>(); // add tasks across each topicGroups for (int tg =0 ; tg < numTopicGroups; tg++) { for (int i=0; i < (numTasks/numTopicGroups)/numHosts; i++) { TaskId taskId = new TaskId(tg, i); activeTasks.add(taskId); for (int j=0; j < numStandbyPerTask; j++) { standbyTasks.computeIfAbsent(taskId, k -> new HashSet<>()) .add(new TopicPartition(topicPrefix+ tg + i + j, j)); } } } // Generate actual global assignment map Random random = new Random(12345); for (int h=0; h < numHosts; h++) { Set topicPartitions = new HashSet<>(); for (int j=0; j < numTasks/numHosts; j++) { int topicGroupId = random.nextInt(numTopicGroups); int topicIndex = random.nextInt(numTopics/numTopicGroups); String topicName = topicPrefix + topicGroupId + topicIndex; int partition = random.nextInt(128); topicPartitions.add(new TopicPartition(topicName, partition)); } HostInfo hostInfo = new HostInfo("streams_host" + h, 123456); partitionsByHost.put(hostInfo, topicPartitions); } final AssignmentInfo oldAssignmentInfo = new AssignmentInfo(4, activeTasks, standbyTasks, partitionsByHost, 0); final AssignmentInfo newAssignmentInfo = new AssignmentInfo(5, activeTasks, standbyTasks, partitionsByHost, 0); System.out.format("oldAssignmentInfoBytes : %d , newAssignmentInfoBytes: %d \n", oldAssignmentInfo.encode().array().length, newAssignmentInfo.encode().array().length); }{code} was (Author: vc): While looking at changes to detect topology changes, realized this was still more complicated than it may be has to. Turns out just dictionary encoding the Topic names on the wire, can provide similar gains as both approaches above. PR is updated based on that.. This is a lot simpler . {code:java} oldAssignmentInfoBytes : 77684 , newAssignmentInfoBytes: 33226{code} P.S: My benchmark code {code:java} public static void main(String[] args) { // Assumption : Streams topology with 10 input topics, 4 sub topologies (2 topics per sub topology) = ~20 topics // High number of hosts = 500; High number of partitions = 128 final int numStandbyPerTask = 2; final String topicPrefix = "streams_topic_name"; final int numTopicGroups = 4; final int numHosts = 500; final int numTopics = 20; final int partitionPerTopic = 128; final int numTasks = partitionPerTopic * numTopics; List activeTasks = new ArrayList<>(); Map> standbyTasks = new HashMap<>(); Map> partitionsByHost = new HashMap<>(); // add tasks across each topicGroups for (int tg =0 ; tg < numTopicGroups; tg++) { for (int i=0; i < (numTasks/numTopicGroups)/numHosts; i++) { TaskId taskId = new TaskId(tg, i); activeTasks.add(taskId); for (int j=0; j < numStandbyPerTask; j++) { standbyTasks.computeIfAbsent(taskId, k -> new HashSet<>()) .add(new TopicPartition(topicPrefix+ tg + i + j, j)); } } } // Generate actual global assignment map Random random = new Random(12345); for (int h=0; h < numHosts; h++) { Set topicPartitions = new HashSet<>(); for (int j=0; j < numTasks/numHosts; j++) { int topicGroupId = random.nextInt(numTopicGroups); int topicIndex = random.nextInt(numTopics/numTopicGroups); String topicName = topicPrefix + topicGroupId + topicIndex; int partition = random.nextInt(128); topicPartitions.add(new TopicPartition(topicName, partition)); } HostInfo hostInfo = new HostInfo("streams_host" + h, 123456); partitionsByHost.put(hostInfo, topicPartitions); } final AssignmentInfo oldAssignmentInfo = new AssignmentInfo(4, activeTasks, standbyTasks, partitionsByHost, 0); final AssignmentInfo newAssignmentInfo = new AssignmentInfo(5, activeTasks, standbyTasks, partitionsByHost, 0);
[jira] [Comment Edited] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16904245#comment-16904245 ] Vinoth Chandar edited comment on KAFKA-7149 at 8/9/19 11:31 PM: Pasting some size tests for old and new assignment information : {code:java} // Assumptions : Streams topology with 10 input topics, 4 sub topologies (2 topics per sub topology) = ~20 topics // High number of hosts = 500; High number of partitions = 128 //topicPrefix = "streams_topic_name"; <- gains are very sensitive to topic name length ofc; oldAssignmentInfoBytes : 77684 , newAssignmentInfoBytes: 42698{code} Roughly ~45% savings.. For 500 hosts, we will be reducing from ~39MB -> ~21MB (NOTE: this is still a single object only, we do need the protocol change/compression on internal topics to ultimately fix the large message problem) was (Author: vc): Pasting some size tests for old and new assignment information : {code:java} // Assumptions : Streams topology with 10 input topics, 4 sub topologies (2 topics per sub topology) = ~20 topics // High number of hosts = 500; High number of partitions = 128 //topicPrefix = "streams_topic_name"; <- gains are very sensitive to topic name length ofc; oldAssignmentInfoBytes : 77684 , newAssignmentInfoBytes: 42698{code} Roughly ~45% savings.. For 500 hosts, we will be reducing from ~39MB -> ~21MB its prett s (NOTE: this is still a single object only, we do need the protocol change/compression on internal topics to ultimately fix the large message problem) > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: Ashish Surana >Assignee: Vinoth Chandar >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16904245#comment-16904245 ] Vinoth Chandar edited comment on KAFKA-7149 at 8/9/19 11:30 PM: Pasting some size tests for old and new assignment information : {code:java} // Assumptions : Streams topology with 10 input topics, 4 sub topologies (2 topics per sub topology) = ~20 topics // High number of hosts = 500; High number of partitions = 128 //topicPrefix = "streams_topic_name"; <- gains are very sensitive to topic name length ofc; oldAssignmentInfoBytes : 77684 , newAssignmentInfoBytes: 42698{code} Roughly ~45% savings.. For 500 hosts, we will be reducing from ~39MB -> ~21MB its prett s (NOTE: this is still a single object only, we do need the protocol change/compression on internal topics to ultimately fix the large message problem) was (Author: vc): Pasting some size tests for old and new assignment information : {code:java} // Assumptions : Streams topology with 10 input topics, 4 sub topologies (2 topics per sub topology) = ~20 topics // High number of hosts = 500; High number of partitions = 128 oldAssignmentInfoBytes : 77684 , newAssignmentInfoBytes: 42698{code} Roughly ~45% savings.. For 500 hosts, we will be reducing from ~39MB -> ~21MB (NOTE: this is still a single object only, we do need the protocol change/compression on internal topics to ultimately fix the large message problem) > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: Ashish Surana >Assignee: Vinoth Chandar >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657902#comment-16657902 ] Ismael Juma edited comment on KAFKA-7149 at 10/20/18 4:36 PM: -- It's way too late for 2.1.0 unless it's a blocker. I've moved to 2.2.0, but please let us know if this needs to be in 2.1.0. was (Author: ijuma): It's way too late for 2.1.0 unless it's a blocker. Can we please move this to 2.2.0? > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: Ashish Surana >Assignee: Navinder Brar >Priority: Major > Fix For: 2.2.0 > > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631333#comment-16631333 ] Ashish Surana edited comment on KAFKA-7149 at 9/28/18 4:18 AM: --- Navinder raised PR at [https://github.com/apache/kafka/pull/5663]. Somehow he is not showing up in Assignee list. was (Author: asurana): Navinder raised PR at [https://github.com/apache/kafka/pull/5663] > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: Ashish Surana >Assignee: Ashish Surana >Priority: Major > Fix For: 2.1.0 > > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16615271#comment-16615271 ] Navinder Brar edited comment on KAFKA-7149 at 9/14/18 7:24 PM: --- Hi [~guozhang] What I mean is currently the Assignment which is shared to Group Coordinator looks like this: {code:java} [{consumer1: {activePartitions1, assignmentInfo1}}, {consumer2: {activePartitions2, assignmentInfo2}}, ]{code} where {code:java} AssignmentInfo= {List activeTasks, Map> standbyTasks, Map> partitionsByHost} {code} Now in the first version, I am changing this AssignmentInfo to: *V1:* {code:java} AssignmentInfo= {List activeTasks, Map> standbyTasks, Map> tasksByHost} {code} But, my point is if there are 500 consumers, the tasksByHost map will be same for all, which will contain global Assignment. But we are unnecessarily sending this same map inside the Assignment array for all the consumers. Instead, we can some an object like something below which is shared with GroupCoordinator. *V2:* {code:java} Assignment= {Map> tasksByHost, [{consumer1: {activePartitions1, assignmentInfo1}}, {consumer2: {activePartitions2, assignmentInfo2}}, ]}{code} where {code:java} AssignmentInfo= {List activeTasks, Map> standbyTasks}{code} was (Author: navibrar): Hi [~guozhang] What I mean is currently the Assignment which is shared to Group Coordinator looks like this: {code:java} [{consumer1: {activePartitions1, assignmentInfo1}}, {consumer2: {activePartitions2, assignmentInfo2}}, ]{code} where {code:java} AssignmentInfo= {List activeTasks, Map> standbyTasks, Map> partitionsByHost} {code} Now in the first version, I am changing this AssignmentInfo to: *V1:* {code:java} AssignmentInfo= {List activeTasks, Map> standbyTasks, Map> tasksByHost} {code} But, my point is if there are 500 consumers, the tasksByHost map will be same for all, which will contain global Assignment. But we are unnecessarily sending this same map inside the Assignment array for all the consumers. Instead, we can some an object like something below which is shared with GroupCoordinator. *V2:* {code:java} Assignment= {Map> tasksByHost, [{consumer1: {activePartitions1, assignmentInfo1}}, {consumer2: {activePartitions2, assignmentInfo2}}, ]}{code} where {code:java} AssignmentInfo= {List activeTasks, Map> standbyTasks}{code} > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Assignee: Ashish Surana >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16611610#comment-16611610 ] Navinder Brar edited comment on KAFKA-7149 at 9/14/18 8:01 AM: --- [~guozhang] [~mjsax] I am working with [~asurana] on raising this PR, will send it in a couple of days. Currently, the changes I have made is using taskIds instead of topicPartitions in AssignmentInfo. But another thing I observed is we are sending the same assignmentInfo to all consumers, so we are replicating the complete assignment(of all hosts and partitions) to all the consumers. Maybe we can take out partitionsByHost(newly TaskIdsByHost) map from the consumers array so that it is not replicated for all the hosts and is sent just once. With the current changes(changing TopicPartitions to TaskIDs and using GZIP compression) I have reduced assignment size(on 400 hosts with 3 threads each, having 512 partitions) from 196 MBs to 8 MB). If we can stop the replication of partitionsByHost on each consumer, the assignment size can be reduced to a few hundred kbs). Please share your thoughts. was (Author: navibrar): [~guozhang] I am working with [~asurana] on raising this PR, will send it in a couple of days. Currently, the changes I have made is using taskIds instead of topicPartitions in AssignmentInfo. But another thing I observed is we are sending the same assignmentInfo to all consumers, so we are replicating the complete assignment(of all hosts and partitions) to all the consumers. Maybe we can take out partitionsByHost(newly TaskIdsByHost) map from the consumers array so that it is not replicated for all the hosts and is sent just once. With the current changes(changing TopicPartitions to TaskIDs and using GZIP compression) I have reduced assignment size(on 400 hosts with 3 threads each, having 512 partitions) from 196 MBs to 8 MB). If we can stop the replication of partitionsByHost on each consumer, the assignment size can be reduced to a few hundred kbs). Please share your thoughts. > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Assignee: Ashish Surana >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16611610#comment-16611610 ] Navinder Brar edited comment on KAFKA-7149 at 9/12/18 7:00 AM: --- [~guozhang] I am working with [~asurana] on raising this PR, will send it in a couple of days. Currently, the changes I have made is using taskIds instead of topicPartitions in AssignmentInfo. But another thing I observed is we are sending the same assignmentInfo to all consumers, so we are replicating the complete assignment(of all hosts and partitions) to all the consumers. Maybe we can take out partitionsByHost(newly TaskIdsByHost) map from the consumers array so that it is not replicated for all the hosts and is sent just once. With the current changes(changing TopicPartitions to TaskIDs and using GZIP compression) I have reduced assignment size(on 400 hosts with 3 threads each, having 512 partitions) from 196 MBs to 8 MB). If we can stop the replication of partitionsByHost on each consumer, the assignment size can be reduced to a few hundred kbs). Please share your thoughts. was (Author: navibrar): I am working with [~asurana] on raising this PR, will send it in a couple of days. Currently, the changes I have made is using taskIds instead of topicPartitions in AssignmentInfo. But another thing I observed is we are sending the same assignmentInfo to all consumers, so we are replicating the complete assignment(of all hosts and partitions) to all the consumers. Maybe we can take out partitionsByHost(newly TaskIdsByHost) map from the consumers array so that it is not replicated for all the hosts and is sent just once. With the current changes(changing TopicPartitions to TaskIDs and using GZIP compression) I have reduced assignment size(on 400 hosts with 3 threads each, having 512 partitions) from 196 MBs to 8 MB). If we can stop the replication of partitionsByHost on each consumer, the assignment size can be reduced to a few hundred kbs). Please share your thoughts. > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Assignee: Ashish Surana >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16606957#comment-16606957 ] Ashish Surana edited comment on KAFKA-7149 at 9/7/18 10:29 AM: --- [~guozhang] As you suggested, below changes can reduce the assignment data size especially when a topic-group has multiple input topics as topic-partitions are repeated in assignment-info in that case: 1. Replace num.partitions with num.tasks 2. Replace [topic-partitions] with [task-ids] This will also require some changes to identify the corresponding host while doing lookup for a given key in store. Along with these encoding changes, compression is definitely required for further size reduction. [~mjsax] suggestion to handle this is what I was also thinking and makes sense to support backward/forward compatibility. was (Author: asurana): [~guozhang] As you suggested, below changes can reduce the assignment data size especially when a topic-group has multiple input topics as topic-partitions are repeated in assignment-info in that case: 1. Replace num.partitions with num.tasks 2. Replace [topic-partitions] with [task-ids] This will also require some changes to identify the corresponding host while doing lookup for a given key in store. Along with these encoding changes, compression is definitely required for further size reduction. [~mjsax] suggestion to handle this is what I was also thinking and makes sense to support backward compatibility. > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Assignee: Ashish Surana >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below RecordTooLargeException at kafka-broker. > Workaround of this issue is commented at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16539596#comment-16539596 ] Ashish Surana edited comment on KAFKA-7149 at 7/11/18 6:34 AM: --- Made the change here: [https://github.com/a-surana/kafka/commit/577992015d3bfc5a23e23b5bf32e40a3f92bc74a] Scenario#1: This is straightforward, and works with this. Encoded version: 4 Decoder support latest version: 4 Scenario#2: Encoded version: <=3 (encoded stream as non-gzip) Decoder support latest version: 4 (decoding as gzip stream) Scenario#3: This is difficult as decoder gets to know the encoded version from the first few bytes of the stream. Which might be zipped or non-zipped, and no reliable way to infer that. Encoded version: 4 (encoded stream is gzip stream) Decoder latest support version: 3 (decoding as non-gzip stream) The change is not backward compatible(Scenario#2 & #3), but depicts the idea for this improvement. It's working well for us. was (Author: asurana): Made the change here: https://github.com/a-surana/kafka/commit/577992015d3bfc5a23e23b5bf32e40a3f92bc74a Scenario#1: This is straightforward, and works with this. Encoded version: 4 Decoder support latest version: 4 Scenario#2: Encoded version: <=3 (encoded stream as non-gzip) Decoder support latest version: 4 (decoding as gzip stream) Scenario#3: This is difficult as decoder gets to know the encoded version from the first few bytes of the stream. Which might be zipped or non-zipped, and no reliable way to infer that. Encoded version: 4 (encoded stream is gzip stream) Decoder latest support version: 3 (decoding as non-gzip stream) The change is not backward compatible(Scenario#2 & #3), but depicts the idea for this improvement. > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement >Reporter: Ashish Surana >Assignee: Ashish Surana >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below exception at kafka-broker. > RecordTooLargeException > Resolution of this issue is explained at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability
[ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16539596#comment-16539596 ] Ashish Surana edited comment on KAFKA-7149 at 7/11/18 6:33 AM: --- Made the change here: https://github.com/a-surana/kafka/commit/577992015d3bfc5a23e23b5bf32e40a3f92bc74a Scenario#1: This is straightforward, and works with this. Encoded version: 4 Decoder support latest version: 4 Scenario#2: Encoded version: <=3 (encoded stream as non-gzip) Decoder support latest version: 4 (decoding as gzip stream) Scenario#3: This is difficult as decoder gets to know the encoded version from the first few bytes of the stream. Which might be zipped or non-zipped, and no reliable way to infer that. Encoded version: 4 (encoded stream is gzip stream) Decoder latest support version: 3 (decoding as non-gzip stream) The change is not backward compatible(Scenario#2 & #3), but depicts the idea for this improvement. was (Author: asurana): This change is not backward compatible, but depicts the idea for this improvement. > Reduce assignment data size to improve kafka streams scalability > > > Key: KAFKA-7149 > URL: https://issues.apache.org/jira/browse/KAFKA-7149 > Project: Kafka > Issue Type: Improvement >Reporter: Ashish Surana >Assignee: Ashish Surana >Priority: Major > > We observed that when we have high number of partitions, instances or > stream-threads, assignment-data size grows too fast and we start getting > below exception at kafka-broker. > RecordTooLargeException > Resolution of this issue is explained at: > https://issues.apache.org/jira/browse/KAFKA-6976 > Still it limits the scalability of kafka streams as moving around 100MBs of > assignment data for each rebalancing affects performance & reliability > (timeout exceptions starts appearing) as well. Also this limits kafka streams > scale even with high max.message.bytes setting as data size increases pretty > quickly with number of partitions, instances or stream-threads. > > Solution: > To address this issue in our cluster, we are sending the compressed > assignment-data. We saw assignment-data size reduced by 8X-10X. This improved > the kafka streams scalability drastically for us and we could now run it with > more than 8,000 partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)