[ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16905430#comment-16905430
 ] 

Vinoth Chandar commented on KAFKA-7149:
---------------------------------------

While looking at changes to detect topology changes, realized this was still 
more complicated than it may be has to. Turns out just dictionary encoding the 
Topic names on the wire, can provide similar gains as both approaches above. PR 
is updated based on that..  This is a lot simpler . 

 
{code:java}
oldAssignmentInfoBytes : 77684 , newAssignmentInfoBytes: 33226{code}
 

P.S: My benchmark code 

 
{code:java}
public static void main(String[] args) {

// Assumption : Streams topology with 10 input topics, 4 sub topologies (2 
topics per sub topology) = ~20 topics
// High number of hosts = 500; High number of partitions = 128
final int numStandbyPerTask = 2;
final String topicPrefix = "streams_topic_name";
final int numTopicGroups = 4;
final int numHosts = 500;
final int numTopics = 20;
final int partitionPerTopic = 128;
final int numTasks = partitionPerTopic * numTopics;

List<TaskId> activeTasks = new ArrayList<>();
Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
Map<HostInfo, Set<TopicPartition>> partitionsByHost = new HashMap<>();

// add tasks across each topicGroups
for (int tg =0 ; tg < numTopicGroups; tg++) {
  for (int i=0; i < (numTasks/numTopicGroups)/numHosts; i++) {
    TaskId taskId = new TaskId(tg, i);
    activeTasks.add(taskId);
    for (int j=0; j < numStandbyPerTask; j++) {
      standbyTasks.computeIfAbsent(taskId, k -> new HashSet<>())
     .add(new TopicPartition(topicPrefix+ tg + i + j, j));
    }
  }
}

// Generate actual global assignment map
Random random = new Random(12345);
for (int h=0; h < numHosts; h++) {
  Set<TopicPartition> topicPartitions = new HashSet<>();
  for (int j=0; j < numTasks/numHosts; j++) {
    int topicGroupId = random.nextInt(numTopicGroups);
    int topicIndex = random.nextInt(numTopics/numTopicGroups);
    String topicName = topicPrefix + topicGroupId + topicIndex;
    int partition = random.nextInt(128);
    topicPartitions.add(new TopicPartition(topicName, partition));
  }
  HostInfo hostInfo = new HostInfo("streams_host" + h, 123456);
  partitionsByHost.put(hostInfo, topicPartitions);
}

final AssignmentInfo oldAssignmentInfo = new AssignmentInfo(4, activeTasks, 
standbyTasks, partitionsByHost, 0);
final AssignmentInfo newAssignmentInfo = new AssignmentInfo(5, activeTasks, 
standbyTasks, partitionsByHost, 0);
System.out.format("oldAssignmentInfoBytes : %d , newAssignmentInfoBytes: %d 
\n", oldAssignmentInfo.encode().array().length, 
newAssignmentInfo.encode().array().length);
}{code}

> Reduce assignment data size to improve kafka streams scalability
> ----------------------------------------------------------------
>
>                 Key: KAFKA-7149
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7149
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 2.0.0
>            Reporter: Ashish Surana
>            Assignee: Vinoth Chandar
>            Priority: Major
>
> We observed that when we have high number of partitions, instances or 
> stream-threads, assignment-data size grows too fast and we start getting 
> below RecordTooLargeException at kafka-broker.
> Workaround of this issue is commented at: 
> https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of 
> assignment data for each rebalancing affects performance & reliability 
> (timeout exceptions starts appearing) as well. Also this limits kafka streams 
> scale even with high max.message.bytes setting as data size increases pretty 
> quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed 
> assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
> the kafka streams scalability drastically for us and we could now run it with 
> more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to