[ 
https://issues.apache.org/jira/browse/KAFKA-16361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846467#comment-17846467
 ] 

A. Sophie Blee-Goldman edited comment on KAFKA-16361 at 5/15/24 12:24 AM:
--------------------------------------------------------------------------

Thanks, I think it's safe to say this is related to the rack-aware assignment 
code that was added in 3.5. Probably the same issue that [~flashmouse] found in 
KAFKA-15170

Fortunately I just merged that fix and cherrypicked it back to 3.7, so the 
patch should be included in both the upcoming 3.8 release and the 3.7.1 bugfix 
release, whenever that happens. Not sure of the timing for 3.7.1 but 3.8 is 
just a day from KIP freeze which means if all goes well, it will be available 
in a little over a month.

If you need an immediate resolution in the meantime then you have two options:

1) disable rack-awareness which will effectively make the assignor just skip 
over the buggy code

2) if you can build from source and don't require an official release, just 
cherrypick [this fix|https://github.com/apache/kafka/pull/13965] to a branch 
with whatever version you'd like to use and compile it yourself. I wouldn't 
recommend building directly from trunk for a production environment since that 
contains untested code, but you can at least run your test again using the 
latest trunk build if you want to make sure that it fixes the issue you're 
experiencing. I'm pretty confident it will though


was (Author: ableegoldman):
Thanks, I think it's safe to say this is related to the rack-aware assignment 
code that was added in 3.5. Probably the same issue that [~flashmouse] found in 
[KAFKA-15170|https://issues.apache.org/jira/browse/KAFKA-15170]

 

Fortunately I just merged that fix and cherrypicked it back to 3.7, so the 
patch should be included in both the upcoming 3.8 release and the 3.7.1 bugfix 
release, whenever that happens. Not sure of the timing for 3.7.1 but 3.8 is 
just a day from KIP freeze which means if all goes well, it will be available 
in a little over a month.

 

If you need an immediate resolution in the meantime then you have two options:

1) disable rack-awareness which will effectively make the assignor just skip 
over the buggy code

2) if you can build from source and don't require an official release, just 
cherrypick [this fix|https://github.com/apache/kafka/pull/13965] to a branch 
with whatever version you'd like to use and compile it yourself. I wouldn't 
recommend building directly from trunk for a production environment since that 
contains untested code, but you can at least run your test again using the 
latest trunk build if you want to make sure that it fixes the issue you're 
experiencing. I'm pretty confident it will though

> Rack aware sticky assignor minQuota violations
> ----------------------------------------------
>
>                 Key: KAFKA-16361
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16361
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 3.5.1, 3.7.0, 3.6.1
>            Reporter: Luke D
>            Priority: Major
>         Attachments: illegalstateexception.log
>
>
> In some low topic replication scenarios the rack aware assignment in the 
> StickyAssignor fails to balance consumers to its own expectations and throws 
> an IllegalStateException, commonly crashing the application (depending on 
> application implementation). While uncommon the error is deterministic, and 
> so persists until the replication state changes. 
>  
> We have observed this in the wild in 3.5.1, and 3.6.1. We have reproduced it 
> locally in a test case in 3.6.1 and 3.7.0 (3.5.1 we did not try but likely 
> would also be reproducible there) 
>  
> Here is the error and stack from our test case against 3.7.0
> {code:java}
> We haven't reached the expected number of members with more than the minQuota 
> partitions, but no more partitions to be assigned
> java.lang.IllegalStateException: We haven't reached the expected number of 
> members with more than the minQuota partitions, but no more partitions to be 
> assigned
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.verifyUnfilledMembers(AbstractStickyAssignor.java:820)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.build(AbstractStickyAssignor.java:652)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assignPartitions(AbstractStickyAssignor.java:113)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:91)
>  {code}
> Here is a specific test case from 3.7.0 that fails when passed to 
> StickyAssignor.assign:
> {code:java}
> Cluster(id = cluster-id, nodes = [host-3:1 (id: 4 rack: rack-3), host-3:1 
> (id: 3 rack: rack-3), host-2:1 (id: 2 rack: rack-2), host-1:1 (id: 1 rack: 
> rack-1)], partitions = [Partition(topic = topic_name, partition = 57, leader 
> = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 90, leader = 2, replicas = [2], isr = [2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 28, leader = 
> 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 53, leader = 4, replicas = [4], isr = [4], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 86, leader = 
> 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 24, leader = 4, replicas = [4,3,1], isr = [4,3,1], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 49, leader = 
> 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 82, leader = 4, replicas = [4,2], isr = [4,2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 20, leader = 
> 2, replicas = [2,1], isr = [2,1], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 45, leader = 2, replicas = [2], isr = [2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 78, leader = 
> 1, replicas = [1], isr = [1], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 16, leader = 4, replicas = [4], isr = [4], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 41, leader = 
> 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 74, leader = 4, replicas = [4,3,1], isr = [4,3,1], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 12, leader = 
> 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 37, leader = 1, replicas = [1], isr = [1], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 70, leader = 
> 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 8, leader = 4, replicas = [4,3,1], isr = [4,3,1], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 33, leader = 
> 1, replicas = [1], isr = [1], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 66, leader = 4, replicas = [4], isr = [4], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 4, leader = 
> 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 29, leader = 3, replicas = [3,1,2], isr = [3,1,2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 62, leader = 
> 3, replicas = [3,2,1], isr = [3,2,1], offlineReplicas = []), Partition(topic 
> = topic_name, partition = 95, leader = 4, replicas = [4,3,2], isr = [4,3,2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 0, leader = 
> 4, replicas = [4,1,2], isr = [4,1,2], offlineReplicas = []), Partition(topic 
> = topic_name, partition = 25, leader = 4, replicas = [4,3,2], isr = [4,3,2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 58, leader = 
> 4, replicas = [4,3,1], isr = [4,3,1], offlineReplicas = []), Partition(topic 
> = topic_name, partition = 91, leader = 4, replicas = [4,3,1], isr = [4,3,1], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 21, leader = 
> 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 54, leader = 4, replicas = [4], isr = [4], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 87, leader = 
> 4, replicas = [4,3], isr = [4,3], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 17, leader = 2, replicas = [2,1], isr = [2,1], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 50, leader = 
> 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 83, leader = 4, replicas = [4,3,2], isr = [4,3,2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 13, leader = 
> 4, replicas = [4,1], isr = [4,1], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 46, leader = 4, replicas = [4,2], isr = [4,2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 79, leader = 
> 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 9, leader = 4, replicas = [4,2], isr = [4,2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 42, leader = 
> 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 75, leader = 4, replicas = [4,2], isr = [4,2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 5, leader = 
> 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 38, leader = 4, replicas = [4], isr = [4], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 71, leader = 
> 3, replicas = [3,2], isr = [3,2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 1, leader = 2, replicas = [2], isr = [2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 34, leader = 
> 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 67, leader = 4, replicas = [4], isr = [4], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 30, leader = 
> 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 63, leader = 1, replicas = [1,2], isr = [1,2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 26, leader = 
> 4, replicas = [4,3], isr = [4,3], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 59, leader = 4, replicas = [4,2], isr = [4,2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 92, leader = 
> 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 22, leader = 4, replicas = [4,2], isr = [4,2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 55, leader = 
> 4, replicas = [4,1,2], isr = [4,1,2], offlineReplicas = []), Partition(topic 
> = topic_name, partition = 88, leader = 2, replicas = [2], isr = [2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 18, leader = 
> 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 51, leader = 2, replicas = [2], isr = [2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 84, leader = 
> 4, replicas = [4,2,1], isr = [4,2,1], offlineReplicas = []), Partition(topic 
> = topic_name, partition = 14, leader = 1, replicas = [1], isr = [1], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 47, leader = 
> 4, replicas = [4,3,1], isr = [4,3,1], offlineReplicas = []), Partition(topic 
> = topic_name, partition = 80, leader = 4, replicas = [4,1,2], isr = [4,1,2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 10, leader = 
> 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 43, leader = 2, replicas = [2,1], isr = [2,1], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 76, leader = 
> 4, replicas = [4,1], isr = [4,1], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 6, leader = 3, replicas = [3,2], isr = [3,2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 39, leader = 
> 3, replicas = [3,1], isr = [3,1], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 72, leader = 4, replicas = [4], isr = [4], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 2, leader = 
> 4, replicas = [4,3,2], isr = [4,3,2], offlineReplicas = []), Partition(topic 
> = topic_name, partition = 35, leader = 4, replicas = [4,3,1], isr = [4,3,1], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 68, leader = 
> 4, replicas = [4,3], isr = [4,3], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 93, leader = 4, replicas = [4], isr = [4], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 31, leader = 
> 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 64, leader = 4, replicas = [4], isr = [4], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 89, leader = 
> 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 27, leader = 3, replicas = [3], isr = [3], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 60, leader = 
> 4, replicas = [4,1,2], isr = [4,1,2], offlineReplicas = []), Partition(topic 
> = topic_name, partition = 85, leader = 2, replicas = [2,1], isr = [2,1], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 23, leader = 
> 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 56, leader = 2, replicas = [2], isr = [2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 81, leader = 
> 4, replicas = [4,2], isr = [4,2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 19, leader = 2, replicas = [2], isr = [2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 52, leader = 
> 4, replicas = [4,3,2], isr = [4,3,2], offlineReplicas = []), Partition(topic 
> = topic_name, partition = 77, leader = 4, replicas = [4,1], isr = [4,1], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 15, leader = 
> 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 48, leader = 3, replicas = [3,2], isr = [3,2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 73, leader = 
> 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 11, leader = 3, replicas = [3,1], isr = [3,1], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 44, leader = 
> 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 69, leader = 4, replicas = [4,3], isr = [4,3], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 7, leader = 
> 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 40, leader = 4, replicas = [4], isr = [4], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 65, leader = 
> 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 3, leader = 4, replicas = [4,3], isr = [4,3], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 36, leader = 
> 2, replicas = [2,1], isr = [2,1], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 61, leader = 4, replicas = [4,3,2], isr = [4,3,2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 94, leader = 
> 4, replicas = [4,3], isr = [4,3], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 32, leader = 2, replicas = [2,1], isr = [2,1], 
> offlineReplicas = [])], controller = host-1:1 (id: 1 rack: rack-1))
>  
> GroupSubscription(subscriptions={Consumer-12=Subscription(topics=[topic_name],
>  userDataSize=4, ownedPartitions=[], groupInstanceId=null, generationId=-1, 
> rackId=rack-1), Consumer-8=Subscription(topics=[topic_name], userDataSize=4, 
> ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-3), 
> Consumer-10=Subscription(topics=[topic_name], userDataSize=4, 
> ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-2), 
> Consumer-7=Subscription(topics=[topic_name], userDataSize=4, 
> ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-2), 
> Consumer-11=Subscription(topics=[topic_name], userDataSize=4, 
> ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-3), 
> Consumer-9=Subscription(topics=[topic_name], userDataSize=4, 
> ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-1), 
> Consumer-0=Subscription(topics=[topic_name], userDataSize=4, 
> ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-1), 
> Consumer-2=Subscription(topics=[topic_name], userDataSize=4, 
> ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-3), 
> Consumer-1=Subscription(topics=[topic_name], userDataSize=4, 
> ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-2), 
> Consumer-4=Subscription(topics=[topic_name], userDataSize=4, 
> ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-2), 
> Consumer-3=Subscription(topics=[topic_name], userDataSize=4, 
> ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-1), 
> Consumer-6=Subscription(topics=[topic_name], userDataSize=4, 
> ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-1), 
> Consumer-5=Subscription(topics=[topic_name], userDataSize=4, 
> ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-3)}) 
> {code}
> A more general reproduction we have performed showed ~1/500 failure rate 
> using a 96 partition topic in a cluster with 4 nodes across 3 hosts on 3 
> racks being consumed by 13 consumers spread evenly across the 3 racks . Then 
> randomly replicating each partition to 1-3 nodes (all fully in sync). The 
> expectation here would be that any number of replicas >= 1 should be able to 
> be correctly assigned out, even if not rack sympathetic in 100% of scenarios. 
>  
> An actual assignment from one of the above sampled scenarios from 3.6.1 ended 
> with these assignments:
> 2 Consumers with 7 partitions
> 3 Consumers with 6 partitions
> 8 Consumers with 8 partitions
>  
> So, at least on that version, the assignment seems to be over assigning some 
> (3) consumers leaving some (3) consumers under-assigned and failing the 
> minQuota check. 
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to