[jira] [Comment Edited] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability

2019-08-12 Thread Vinoth Chandar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16905430#comment-16905430
 ] 

Vinoth Chandar edited comment on KAFKA-7149 at 8/12/19 5:47 PM:


While looking at changes to detect topology changes, realized this was still 
more complicated than it may be has to. Turns out just dictionary encoding the 
Topic names on the wire, can provide similar gains as both approaches above. PR 
is updated based on that..  This is a lot simpler .  
{code:java}
oldAssignmentInfoBytes : 77684 , newAssignmentInfoBytes: 33226{code}
P.S: My benchmark code 
{code:java}
public static void main(String[] args) {

// Assumption : Streams topology with 10 input topics, 4 sub topologies (2 
topics per sub topology) = ~20 topics
// High number of hosts = 500; High number of partitions = 128
final int numStandbyPerTask = 2;
final String topicPrefix = "streams_topic_name";
final int numTopicGroups = 4;
final int numHosts = 500;
final int numTopics = 20;
final int partitionPerTopic = 128;
final int numTasks = partitionPerTopic * numTopics;

List activeTasks = new ArrayList<>();
Map> standbyTasks = new HashMap<>();
Map> partitionsByHost = new HashMap<>();

// add tasks across each topicGroups
for (int tg =0 ; tg < numTopicGroups; tg++) {
  for (int i=0; i < (numTasks/numTopicGroups)/numHosts; i++) {
TaskId taskId = new TaskId(tg, i);
activeTasks.add(taskId);
for (int j=0; j < numStandbyPerTask; j++) {
  standbyTasks.computeIfAbsent(taskId, k -> new HashSet<>())
 .add(new TopicPartition(topicPrefix+ tg + i + j, j));
}
  }
}

// Generate actual global assignment map
Random random = new Random(12345);
for (int h=0; h < numHosts; h++) {
  Set topicPartitions = new HashSet<>();
  for (int j=0; j < numTasks/numHosts; j++) {
int topicGroupId = random.nextInt(numTopicGroups);
int topicIndex = random.nextInt(numTopics/numTopicGroups);
String topicName = topicPrefix + topicGroupId + topicIndex;
int partition = random.nextInt(128);
topicPartitions.add(new TopicPartition(topicName, partition));
  }
  HostInfo hostInfo = new HostInfo("streams_host" + h, 123456);
  partitionsByHost.put(hostInfo, topicPartitions);
}

final AssignmentInfo oldAssignmentInfo = new AssignmentInfo(4, activeTasks, 
standbyTasks, partitionsByHost, 0);
final AssignmentInfo newAssignmentInfo = new AssignmentInfo(5, activeTasks, 
standbyTasks, partitionsByHost, 0);
System.out.format("oldAssignmentInfoBytes : %d , newAssignmentInfoBytes: %d 
\n", oldAssignmentInfo.encode().array().length, 
newAssignmentInfo.encode().array().length);
}{code}


was (Author: vc):
While looking at changes to detect topology changes, realized this was still 
more complicated than it may be has to. Turns out just dictionary encoding the 
Topic names on the wire, can provide similar gains as both approaches above. PR 
is updated based on that..  This is a lot simpler . 

 
{code:java}
oldAssignmentInfoBytes : 77684 , newAssignmentInfoBytes: 33226{code}
 

P.S: My benchmark code 

 
{code:java}
public static void main(String[] args) {

// Assumption : Streams topology with 10 input topics, 4 sub topologies (2 
topics per sub topology) = ~20 topics
// High number of hosts = 500; High number of partitions = 128
final int numStandbyPerTask = 2;
final String topicPrefix = "streams_topic_name";
final int numTopicGroups = 4;
final int numHosts = 500;
final int numTopics = 20;
final int partitionPerTopic = 128;
final int numTasks = partitionPerTopic * numTopics;

List activeTasks = new ArrayList<>();
Map> standbyTasks = new HashMap<>();
Map> partitionsByHost = new HashMap<>();

// add tasks across each topicGroups
for (int tg =0 ; tg < numTopicGroups; tg++) {
  for (int i=0; i < (numTasks/numTopicGroups)/numHosts; i++) {
TaskId taskId = new TaskId(tg, i);
activeTasks.add(taskId);
for (int j=0; j < numStandbyPerTask; j++) {
  standbyTasks.computeIfAbsent(taskId, k -> new HashSet<>())
 .add(new TopicPartition(topicPrefix+ tg + i + j, j));
}
  }
}

// Generate actual global assignment map
Random random = new Random(12345);
for (int h=0; h < numHosts; h++) {
  Set topicPartitions = new HashSet<>();
  for (int j=0; j < numTasks/numHosts; j++) {
int topicGroupId = random.nextInt(numTopicGroups);
int topicIndex = random.nextInt(numTopics/numTopicGroups);
String topicName = topicPrefix + topicGroupId + topicIndex;
int partition = random.nextInt(128);
topicPartitions.add(new TopicPartition(topicName, partition));
  }
  HostInfo hostInfo = new HostInfo("streams_host" + h, 123456);
  partitionsByHost.put(hostInfo, topicPartitions);
}

final AssignmentInfo oldAssignmentInfo = new AssignmentInfo(4, activeTasks, 
standbyTasks, partitionsByHost, 0);
final AssignmentInfo newAssignmentInfo = new AssignmentInfo(5, activeTasks, 
standbyTasks, partitionsByHost, 0);

[jira] [Comment Edited] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability

2019-08-09 Thread Vinoth Chandar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16904245#comment-16904245
 ] 

Vinoth Chandar edited comment on KAFKA-7149 at 8/9/19 11:31 PM:


Pasting some size tests for old and new assignment information : 
{code:java}
// Assumptions : Streams topology with 10 input topics, 4 sub topologies (2 
topics per sub topology) = ~20 topics
// High number of hosts = 500; High number of partitions = 128
//topicPrefix = "streams_topic_name"; <- gains are very sensitive to topic name 
length ofc;
oldAssignmentInfoBytes : 77684 , newAssignmentInfoBytes: 42698{code}
Roughly ~45% savings.. For 500 hosts, we will be reducing from ~39MB -> ~21MB

(NOTE: this is still a single object only, we do need the protocol 
change/compression on internal topics to ultimately fix the large message 
problem) 


was (Author: vc):
Pasting some size tests for old and new assignment information : 
{code:java}
// Assumptions : Streams topology with 10 input topics, 4 sub topologies (2 
topics per sub topology) = ~20 topics
// High number of hosts = 500; High number of partitions = 128
//topicPrefix = "streams_topic_name"; <- gains are very sensitive to topic name 
length ofc;
oldAssignmentInfoBytes : 77684 , newAssignmentInfoBytes: 42698{code}
Roughly ~45% savings.. For 500 hosts, we will be reducing from ~39MB -> ~21MB

its prett s

(NOTE: this is still a single object only, we do need the protocol 
change/compression on internal topics to ultimately fix the large message 
problem) 

> Reduce assignment data size to improve kafka streams scalability
> 
>
> Key: KAFKA-7149
> URL: https://issues.apache.org/jira/browse/KAFKA-7149
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ashish Surana
>Assignee: Vinoth Chandar
>Priority: Major
>
> We observed that when we have high number of partitions, instances or 
> stream-threads, assignment-data size grows too fast and we start getting 
> below RecordTooLargeException at kafka-broker.
> Workaround of this issue is commented at: 
> https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of 
> assignment data for each rebalancing affects performance & reliability 
> (timeout exceptions starts appearing) as well. Also this limits kafka streams 
> scale even with high max.message.bytes setting as data size increases pretty 
> quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed 
> assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
> the kafka streams scalability drastically for us and we could now run it with 
> more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability

2019-08-09 Thread Vinoth Chandar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16904245#comment-16904245
 ] 

Vinoth Chandar edited comment on KAFKA-7149 at 8/9/19 11:30 PM:


Pasting some size tests for old and new assignment information : 
{code:java}
// Assumptions : Streams topology with 10 input topics, 4 sub topologies (2 
topics per sub topology) = ~20 topics
// High number of hosts = 500; High number of partitions = 128
//topicPrefix = "streams_topic_name"; <- gains are very sensitive to topic name 
length ofc;
oldAssignmentInfoBytes : 77684 , newAssignmentInfoBytes: 42698{code}
Roughly ~45% savings.. For 500 hosts, we will be reducing from ~39MB -> ~21MB

its prett s

(NOTE: this is still a single object only, we do need the protocol 
change/compression on internal topics to ultimately fix the large message 
problem) 


was (Author: vc):
Pasting some size tests for old and new assignment information : 
{code:java}
// Assumptions : Streams topology with 10 input topics, 4 sub topologies (2 
topics per sub topology) = ~20 topics
// High number of hosts = 500; High number of partitions = 128
oldAssignmentInfoBytes : 77684 , newAssignmentInfoBytes: 42698{code}
Roughly ~45% savings.. For 500 hosts, we will be reducing from ~39MB -> ~21MB

(NOTE: this is still a single object only, we do need the protocol 
change/compression on internal topics to ultimately fix the large message 
problem) 

> Reduce assignment data size to improve kafka streams scalability
> 
>
> Key: KAFKA-7149
> URL: https://issues.apache.org/jira/browse/KAFKA-7149
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ashish Surana
>Assignee: Vinoth Chandar
>Priority: Major
>
> We observed that when we have high number of partitions, instances or 
> stream-threads, assignment-data size grows too fast and we start getting 
> below RecordTooLargeException at kafka-broker.
> Workaround of this issue is commented at: 
> https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of 
> assignment data for each rebalancing affects performance & reliability 
> (timeout exceptions starts appearing) as well. Also this limits kafka streams 
> scale even with high max.message.bytes setting as data size increases pretty 
> quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed 
> assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
> the kafka streams scalability drastically for us and we could now run it with 
> more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability

2018-10-20 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657902#comment-16657902
 ] 

Ismael Juma edited comment on KAFKA-7149 at 10/20/18 4:36 PM:
--

It's way too late for 2.1.0 unless it's a blocker. I've moved to 2.2.0, but 
please let us know if this needs to be in 2.1.0.


was (Author: ijuma):
It's way too late for 2.1.0 unless it's a blocker. Can we please move this to 
2.2.0?

> Reduce assignment data size to improve kafka streams scalability
> 
>
> Key: KAFKA-7149
> URL: https://issues.apache.org/jira/browse/KAFKA-7149
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ashish Surana
>Assignee: Navinder Brar
>Priority: Major
> Fix For: 2.2.0
>
>
> We observed that when we have high number of partitions, instances or 
> stream-threads, assignment-data size grows too fast and we start getting 
> below RecordTooLargeException at kafka-broker.
> Workaround of this issue is commented at: 
> https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of 
> assignment data for each rebalancing affects performance & reliability 
> (timeout exceptions starts appearing) as well. Also this limits kafka streams 
> scale even with high max.message.bytes setting as data size increases pretty 
> quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed 
> assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
> the kafka streams scalability drastically for us and we could now run it with 
> more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability

2018-09-27 Thread Ashish Surana (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631333#comment-16631333
 ] 

Ashish Surana edited comment on KAFKA-7149 at 9/28/18 4:18 AM:
---

Navinder raised PR at [https://github.com/apache/kafka/pull/5663]. Somehow he 
is not showing up in Assignee list.


was (Author: asurana):
Navinder raised PR at [https://github.com/apache/kafka/pull/5663]

> Reduce assignment data size to improve kafka streams scalability
> 
>
> Key: KAFKA-7149
> URL: https://issues.apache.org/jira/browse/KAFKA-7149
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ashish Surana
>Assignee: Ashish Surana
>Priority: Major
> Fix For: 2.1.0
>
>
> We observed that when we have high number of partitions, instances or 
> stream-threads, assignment-data size grows too fast and we start getting 
> below RecordTooLargeException at kafka-broker.
> Workaround of this issue is commented at: 
> https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of 
> assignment data for each rebalancing affects performance & reliability 
> (timeout exceptions starts appearing) as well. Also this limits kafka streams 
> scale even with high max.message.bytes setting as data size increases pretty 
> quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed 
> assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
> the kafka streams scalability drastically for us and we could now run it with 
> more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability

2018-09-14 Thread Navinder Brar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16615271#comment-16615271
 ] 

Navinder Brar edited comment on KAFKA-7149 at 9/14/18 7:24 PM:
---

Hi [~guozhang] What I mean is currently the Assignment which is shared to Group 
Coordinator looks like this:
{code:java}
[{consumer1: {activePartitions1, assignmentInfo1}}, {consumer2: 
{activePartitions2, assignmentInfo2}},  ]{code}
where
{code:java}
AssignmentInfo=
{List activeTasks, Map> standbyTasks, 
Map> partitionsByHost}
 
{code}
Now in the first version, I am changing this AssignmentInfo to:

*V1:*
{code:java}
AssignmentInfo=
{List activeTasks, Map> standbyTasks, 
Map> tasksByHost}
{code}
 

But, my point is if there are 500 consumers, the tasksByHost map will be same 
for all, which will contain global Assignment. But we are unnecessarily sending 
this same map inside the Assignment array for all the consumers. Instead, we 
can some an object like something below which is shared with GroupCoordinator.

*V2:*
{code:java}
Assignment= {Map> tasksByHost, [{consumer1: 
{activePartitions1, assignmentInfo1}}, {consumer2: {activePartitions2, 
assignmentInfo2}},  ]}{code}
where
{code:java}
AssignmentInfo= {List activeTasks, Map> 
standbyTasks}{code}


was (Author: navibrar):
Hi [~guozhang] What I mean is currently the Assignment which is shared to Group 
Coordinator looks like this:
{code:java}
[{consumer1: {activePartitions1, assignmentInfo1}}, {consumer2: 
{activePartitions2, assignmentInfo2}},  ]{code}
where
{code:java}
AssignmentInfo=
{List activeTasks, Map> standbyTasks, 
Map> partitionsByHost}
 
{code}
Now in the first version, I am changing this AssignmentInfo to:

*V1:*

 
{code:java}
AssignmentInfo=
{List activeTasks, Map> standbyTasks, 
Map> tasksByHost}
{code}
 

 

But, my point is if there are 500 consumers, the tasksByHost map will be same 
for all, which will contain global Assignment. But we are unnecessarily sending 
this same map inside the Assignment array for all the consumers. Instead, we 
can some an object like something below which is shared with GroupCoordinator.

*V2:* 
{code:java}
Assignment= {Map> tasksByHost, [{consumer1: 
{activePartitions1, assignmentInfo1}}, {consumer2: {activePartitions2, 
assignmentInfo2}},  ]}{code}
where
{code:java}
AssignmentInfo= {List activeTasks, Map> 
standbyTasks}{code}

> Reduce assignment data size to improve kafka streams scalability
> 
>
> Key: KAFKA-7149
> URL: https://issues.apache.org/jira/browse/KAFKA-7149
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Assignee: Ashish Surana
>Priority: Major
>
> We observed that when we have high number of partitions, instances or 
> stream-threads, assignment-data size grows too fast and we start getting 
> below RecordTooLargeException at kafka-broker.
> Workaround of this issue is commented at: 
> https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of 
> assignment data for each rebalancing affects performance & reliability 
> (timeout exceptions starts appearing) as well. Also this limits kafka streams 
> scale even with high max.message.bytes setting as data size increases pretty 
> quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed 
> assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
> the kafka streams scalability drastically for us and we could now run it with 
> more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability

2018-09-14 Thread Navinder Brar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16611610#comment-16611610
 ] 

Navinder Brar edited comment on KAFKA-7149 at 9/14/18 8:01 AM:
---

[~guozhang] [~mjsax] I am working with [~asurana] on raising this PR, will send 
it in a couple of days. Currently, the changes I have made is using taskIds 
instead of topicPartitions in AssignmentInfo. But another thing I observed is 
we are sending the same assignmentInfo to all consumers, so we are replicating 
the complete assignment(of all hosts and partitions) to all the consumers. 
Maybe we can take out partitionsByHost(newly TaskIdsByHost) map from the 
consumers array so that it is not replicated for all the hosts and is sent just 
once. With the current changes(changing TopicPartitions to TaskIDs and using 
GZIP compression) I have reduced assignment size(on 400 hosts with 3 threads 
each, having 512 partitions) from 196 MBs to 8 MB). If we can stop the 
replication of partitionsByHost on each consumer, the assignment size can be 
reduced to a few hundred kbs). Please share your thoughts.


was (Author: navibrar):
[~guozhang] I am working with [~asurana] on raising this PR, will send it in a 
couple of days. Currently, the changes I have made is using taskIds instead of 
topicPartitions in AssignmentInfo. But another thing I observed is we are 
sending the same assignmentInfo to all consumers, so we are replicating the 
complete assignment(of all hosts and partitions) to all the consumers. Maybe we 
can take out partitionsByHost(newly TaskIdsByHost) map from the consumers array 
so that it is not replicated for all the hosts and is sent just once. With the 
current changes(changing TopicPartitions to TaskIDs and using GZIP compression) 
I have reduced assignment size(on 400 hosts with 3 threads each, having 512 
partitions) from 196 MBs to 8 MB). If we can stop the replication of 
partitionsByHost on each consumer, the assignment size can be reduced to a few 
hundred kbs). Please share your thoughts.

> Reduce assignment data size to improve kafka streams scalability
> 
>
> Key: KAFKA-7149
> URL: https://issues.apache.org/jira/browse/KAFKA-7149
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Assignee: Ashish Surana
>Priority: Major
>
> We observed that when we have high number of partitions, instances or 
> stream-threads, assignment-data size grows too fast and we start getting 
> below RecordTooLargeException at kafka-broker.
> Workaround of this issue is commented at: 
> https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of 
> assignment data for each rebalancing affects performance & reliability 
> (timeout exceptions starts appearing) as well. Also this limits kafka streams 
> scale even with high max.message.bytes setting as data size increases pretty 
> quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed 
> assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
> the kafka streams scalability drastically for us and we could now run it with 
> more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability

2018-09-12 Thread Navinder Brar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16611610#comment-16611610
 ] 

Navinder Brar edited comment on KAFKA-7149 at 9/12/18 7:00 AM:
---

[~guozhang] I am working with [~asurana] on raising this PR, will send it in a 
couple of days. Currently, the changes I have made is using taskIds instead of 
topicPartitions in AssignmentInfo. But another thing I observed is we are 
sending the same assignmentInfo to all consumers, so we are replicating the 
complete assignment(of all hosts and partitions) to all the consumers. Maybe we 
can take out partitionsByHost(newly TaskIdsByHost) map from the consumers array 
so that it is not replicated for all the hosts and is sent just once. With the 
current changes(changing TopicPartitions to TaskIDs and using GZIP compression) 
I have reduced assignment size(on 400 hosts with 3 threads each, having 512 
partitions) from 196 MBs to 8 MB). If we can stop the replication of 
partitionsByHost on each consumer, the assignment size can be reduced to a few 
hundred kbs). Please share your thoughts.


was (Author: navibrar):
I am working with [~asurana] on raising this PR, will send it in a couple of 
days. Currently, the changes I have made is using taskIds instead of 
topicPartitions in AssignmentInfo. But another thing I observed is we are 
sending the same assignmentInfo to all consumers, so we are replicating the 
complete assignment(of all hosts and partitions) to all the consumers. Maybe we 
can take out partitionsByHost(newly TaskIdsByHost) map from the consumers array 
so that it is not replicated for all the hosts and is sent just once. With the 
current changes(changing TopicPartitions to TaskIDs and using GZIP compression) 
I have reduced assignment size(on 400 hosts with 3 threads each, having 512 
partitions) from 196 MBs to 8 MB). If we can stop the replication of 
partitionsByHost on each consumer, the assignment size can be reduced to a few 
hundred kbs). Please share your thoughts.

> Reduce assignment data size to improve kafka streams scalability
> 
>
> Key: KAFKA-7149
> URL: https://issues.apache.org/jira/browse/KAFKA-7149
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Assignee: Ashish Surana
>Priority: Major
>
> We observed that when we have high number of partitions, instances or 
> stream-threads, assignment-data size grows too fast and we start getting 
> below RecordTooLargeException at kafka-broker.
> Workaround of this issue is commented at: 
> https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of 
> assignment data for each rebalancing affects performance & reliability 
> (timeout exceptions starts appearing) as well. Also this limits kafka streams 
> scale even with high max.message.bytes setting as data size increases pretty 
> quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed 
> assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
> the kafka streams scalability drastically for us and we could now run it with 
> more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability

2018-09-07 Thread Ashish Surana (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16606957#comment-16606957
 ] 

Ashish Surana edited comment on KAFKA-7149 at 9/7/18 10:29 AM:
---

[~guozhang] As you suggested, below changes can reduce the assignment data size 
especially when a topic-group has multiple input topics as topic-partitions are 
repeated in assignment-info in that case:

1. Replace num.partitions with num.tasks

2. Replace [topic-partitions] with [task-ids]

This will also require some changes to identify the corresponding host while 
doing lookup for a given key in store.

Along with these encoding changes, compression is definitely required for 
further size reduction. [~mjsax] suggestion to handle this is what I was also 
thinking and makes sense to support backward/forward compatibility.


was (Author: asurana):
[~guozhang] As you suggested, below changes can reduce the assignment data size 
especially when a topic-group has multiple input topics as topic-partitions are 
repeated in assignment-info in that case:

1. Replace num.partitions with num.tasks

2. Replace [topic-partitions] with [task-ids]

This will also require some changes to identify the corresponding host while 
doing lookup for a given key in store.

Along with these encoding changes, compression is definitely required for 
further size reduction. [~mjsax] suggestion to handle this is what I was also 
thinking and makes sense to support backward compatibility.

> Reduce assignment data size to improve kafka streams scalability
> 
>
> Key: KAFKA-7149
> URL: https://issues.apache.org/jira/browse/KAFKA-7149
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Assignee: Ashish Surana
>Priority: Major
>
> We observed that when we have high number of partitions, instances or 
> stream-threads, assignment-data size grows too fast and we start getting 
> below RecordTooLargeException at kafka-broker.
> Workaround of this issue is commented at: 
> https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of 
> assignment data for each rebalancing affects performance & reliability 
> (timeout exceptions starts appearing) as well. Also this limits kafka streams 
> scale even with high max.message.bytes setting as data size increases pretty 
> quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed 
> assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
> the kafka streams scalability drastically for us and we could now run it with 
> more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability

2018-07-11 Thread Ashish Surana (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16539596#comment-16539596
 ] 

Ashish Surana edited comment on KAFKA-7149 at 7/11/18 6:34 AM:
---

Made the change here: 
[https://github.com/a-surana/kafka/commit/577992015d3bfc5a23e23b5bf32e40a3f92bc74a]

Scenario#1: This is straightforward, and works with this.

Encoded version: 4

Decoder support latest version: 4

 

Scenario#2:

Encoded version: <=3 (encoded stream as non-gzip)

Decoder support latest version: 4 (decoding as gzip stream)

 

Scenario#3: This is difficult as decoder gets to know the encoded version from 
the first few bytes of the stream. Which might be zipped or non-zipped, and no 
reliable way to infer that.

Encoded version: 4 (encoded stream is gzip stream)

Decoder latest support version: 3 (decoding as non-gzip stream)

 

The change is not backward compatible(Scenario#2 & #3), but depicts the idea 
for this improvement. It's working well for us.


was (Author: asurana):
Made the change here: 
https://github.com/a-surana/kafka/commit/577992015d3bfc5a23e23b5bf32e40a3f92bc74a

Scenario#1: This is straightforward, and works with this.

Encoded version: 4

Decoder support latest version: 4

 

Scenario#2:

Encoded version: <=3 (encoded stream as non-gzip)

Decoder support latest version: 4 (decoding as gzip stream)

 

Scenario#3: This is difficult as decoder gets to know the encoded version from 
the first few bytes of the stream. Which might be zipped or non-zipped, and no 
reliable way to infer that.

Encoded version: 4 (encoded stream is gzip stream)

Decoder latest support version: 3 (decoding as non-gzip stream)

 

The change is not backward compatible(Scenario#2 & #3), but depicts the idea 
for this improvement.

> Reduce assignment data size to improve kafka streams scalability
> 
>
> Key: KAFKA-7149
> URL: https://issues.apache.org/jira/browse/KAFKA-7149
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ashish Surana
>Assignee: Ashish Surana
>Priority: Major
>
> We observed that when we have high number of partitions, instances or 
> stream-threads, assignment-data size grows too fast and we start getting 
> below exception at kafka-broker.
> RecordTooLargeException
> Resolution of this issue is explained at: 
> https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of 
> assignment data for each rebalancing affects performance & reliability 
> (timeout exceptions starts appearing) as well. Also this limits kafka streams 
> scale even with high max.message.bytes setting as data size increases pretty 
> quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed 
> assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
> the kafka streams scalability drastically for us and we could now run it with 
> more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability

2018-07-11 Thread Ashish Surana (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16539596#comment-16539596
 ] 

Ashish Surana edited comment on KAFKA-7149 at 7/11/18 6:33 AM:
---

Made the change here: 
https://github.com/a-surana/kafka/commit/577992015d3bfc5a23e23b5bf32e40a3f92bc74a

Scenario#1: This is straightforward, and works with this.

Encoded version: 4

Decoder support latest version: 4

 

Scenario#2:

Encoded version: <=3 (encoded stream as non-gzip)

Decoder support latest version: 4 (decoding as gzip stream)

 

Scenario#3: This is difficult as decoder gets to know the encoded version from 
the first few bytes of the stream. Which might be zipped or non-zipped, and no 
reliable way to infer that.

Encoded version: 4 (encoded stream is gzip stream)

Decoder latest support version: 3 (decoding as non-gzip stream)

 

The change is not backward compatible(Scenario#2 & #3), but depicts the idea 
for this improvement.


was (Author: asurana):
This change is not backward compatible, but depicts the idea for this 
improvement.

> Reduce assignment data size to improve kafka streams scalability
> 
>
> Key: KAFKA-7149
> URL: https://issues.apache.org/jira/browse/KAFKA-7149
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ashish Surana
>Assignee: Ashish Surana
>Priority: Major
>
> We observed that when we have high number of partitions, instances or 
> stream-threads, assignment-data size grows too fast and we start getting 
> below exception at kafka-broker.
> RecordTooLargeException
> Resolution of this issue is explained at: 
> https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of 
> assignment data for each rebalancing affects performance & reliability 
> (timeout exceptions starts appearing) as well. Also this limits kafka streams 
> scale even with high max.message.bytes setting as data size increases pretty 
> quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed 
> assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
> the kafka streams scalability drastically for us and we could now run it with 
> more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)