BugFinder created KAFKA-14029:
---------------------------------

             Summary: Consumer response serialization could block other 
response handlers at scale
                 Key: KAFKA-14029
                 URL: https://issues.apache.org/jira/browse/KAFKA-14029
             Project: Kafka
          Issue Type: Improvement
          Components: consumer
    Affects Versions: 3.2.0
            Reporter: BugFinder


Hi,

We have been using our in-house tools to test Kafka's scalability to have an 
idea of how a large-scale deployment will work and where are the bottlenecks. 
For now, we are looking at version 3.2 and focused in a many-consumers scenario.

Consumer-wise, we want to report a possible issue and eventually propose a 
solution, aiming to build our expertise in the system. When adding a new 
consumer to a group, the code path

org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle
 *// (has a synchronized block)*
 org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onLeaderElected
   
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeaderElected
    
org.apache.kafka.clients.consumer.internals.ConsumerProtocol.serializeAssignment
     
org.apache.kafka.clients.consumer.internals.ConsumerProtocol.serializeAssignment
      org.apache.kafka.common.protocol.MessageUtil.toVersionPrefixedByteBuffer 
*// linear on size of message*

could end up being costly when the size of the message is large. 
toVersionPrefixedByteBuffer seems to be linear in the size of the message, and 
albeit writing an array in linear time is not unreasonable at all, {*}under 
certain conditions, e.g. when under locks{*}, it can cause {*}undesired 
contention{*}. In this case, its invoked to serialize the assigment when adding 
a new consumer (on where there is another loop wraping up this path that seems 
to depend on the number of assignments, which could be another problematic 
dimension if growing causing undesired nesting), here

[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeaderElected]

// ...

Map<String, ByteBuffer> groupAssignment = new HashMap<>();
        for (Map.Entry<String, Assignment> assignmentEntry : 
assignments.entrySet()) {
            ByteBuffer buffer = 
*ConsumerProtocol.serializeAssignment(assignmentEntry.getValue()); // calls 
toVersionPrefixedByteBuffer* 
            groupAssignment.put(assignmentEntry.getKey(), buffer);
        }

// ...

The question here is, {*}is there a need to serialize the assignment inside the 
synchronized block{*}? if that assignment is too large, it could easily add a 
few seconds to the request and block others that use the same lock, like 
HeartbeatResponseHandler.

 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to