[ 
https://issues.apache.org/jira/browse/KAFKA-9885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-9885.
------------------------------------
    Fix Version/s: 2.6.0
       Resolution: Fixed

> Evict last members of a group when the maximum allowed is reached
> -----------------------------------------------------------------
>
>                 Key: KAFKA-9885
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9885
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: David Jacot
>            Assignee: David Jacot
>            Priority: Major
>             Fix For: 2.6.0
>
>
> While analysing https://issues.apache.org/jira/browse/KAFKA-7965, we found 
> that multiple members of a group can be evicted from a group if the leader of 
> the consumer offset partition changes before the group is persisted. This 
> happens because the current evection logic always evict the first member 
> which rejoins the group.
> We would like to change the evection logic so that the last members to rejoin 
> the group are kicked out instead.
> Here is an example of what happens when the leader changes:
> {noformat}
> // Group is loaded in GroupCoordinator 0
> // A rebalance is triggered because the group is over capacity
> [2020-04-02 11:14:33,393] INFO [GroupMetadataManager brokerId=0] Scheduling 
> loading of offsets and group metadata from __consumer_offsets-0 
> (kafka.coordinator.group.GroupMetadataManager:66)
> [2020-04-02 11:14:33,406] INFO [Consumer clientId=ConsumerTestConsumer, 
> groupId=group-max-size-test] Discovered group coordinator localhost:40071 
> (id: 2147483647 rack: null) 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:794)
> [2020-04-02 11:14:33,409] INFO Static member 
> MemberMetadata(memberId=ConsumerTestConsumer-ceaab707-69af-4a65-8275-cb7db7fb66b3,
>  groupInstanceId=Some(null), clientId=ConsumerTestConsumer, 
> clientHost=/127.0.0.1, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, 
> supportedProtocols=List(range), ).groupInstanceId of group 
> group-max-size-test loaded with member id 
> ConsumerTestConsumer-ceaab707-69af-4a65-8275-cb7db7fb66b3 at generation 1. 
> (kafka.coordinator.group.GroupMetadata$:126)
> [2020-04-02 11:14:33,410] INFO Static member 
> MemberMetadata(memberId=ConsumerTestConsumer-07077ca2-30e9-45cd-b363-30672281bacb,
>  groupInstanceId=Some(null), clientId=ConsumerTestConsumer, 
> clientHost=/127.0.0.1, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, 
> supportedProtocols=List(range), ).groupInstanceId of group 
> group-max-size-test loaded with member id 
> ConsumerTestConsumer-07077ca2-30e9-45cd-b363-30672281bacb at generation 1. 
> (kafka.coordinator.group.GroupMetadata$:126)
> [2020-04-02 11:14:33,412] INFO Static member 
> MemberMetadata(memberId=ConsumerTestConsumer-5d359e65-1f11-43ce-874e-fddf55c0b49d,
>  groupInstanceId=Some(null), clientId=ConsumerTestConsumer, 
> clientHost=/127.0.0.1, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, 
> supportedProtocols=List(range), ).groupInstanceId of group 
> group-max-size-test loaded with member id 
> ConsumerTestConsumer-5d359e65-1f11-43ce-874e-fddf55c0b49d at generation 1. 
> (kafka.coordinator.group.GroupMetadata$:126)
> [2020-04-02 11:14:33,413] INFO [GroupCoordinator 0]: Loading group metadata 
> for group-max-size-test with generation 1 
> (kafka.coordinator.group.GroupCoordinator:66)
> [2020-04-02 11:14:33,413] INFO [GroupCoordinator 0]: Preparing to rebalance 
> group group-max-size-test in state PreparingRebalance with old generation 1 
> (__consumer_offsets-0) (reason: Freshly-loaded group is over capacity 
> (GroupConfig(10,1800000,2,0).groupMaxSize). Rebalacing in order to give a 
> chance for consumers to commit offsets) 
> (kafka.coordinator.group.GroupCoordinator:66)
> [2020-04-02 11:14:33,431] INFO [GroupMetadataManager brokerId=0] Finished 
> loading offsets and group metadata from __consumer_offsets-0 in 28 
> milliseconds, of which 0 milliseconds was spent in the scheduler. 
> (kafka.coordinator.group.GroupMetadataManager:66)
> // A first consumer is kicked out of the group while trying to re-join
> [2020-04-02 11:14:33,449] ERROR [Consumer clientId=ConsumerTestConsumer, 
> groupId=group-max-size-test] Attempt to join group failed due to fatal error: 
> The consumer group has reached its max size. 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:627)
> [2020-04-02 11:14:33,451] ERROR [daemon-consumer-assignment-2]: Error due to 
> (kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:76)
> org.apache.kafka.common.errors.GroupMaxSizeReachedException: Consumer group 
> group-max-size-test already has the configured maximum number of members.
> [2020-04-02 11:14:33,451] INFO [daemon-consumer-assignment-2]: Stopped 
> (kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:66)
> // Before the rebalance is completed, a preferred replica leader election 
> kicks in and move the leader from 0 to 1
> [2020-04-02 11:14:34,155] INFO [Controller id=0] Processing automatic 
> preferred replica leader election (kafka.controller.KafkaController:66)
> [2020-04-02 11:14:34,169] INFO [Controller id=0] Starting replica leader 
> election (PREFERRED) for partitions 
> group-max-size-test-0,group-max-size-test-3,__consumer_offsets-0 triggered by 
> AutoTriggered (kafka.controller.KafkaController:66)
> // The group is loaded in GroupCoordinator 1 before completing the rebalance
> // Another rebalance is triggered because the group is still over capacity
> [2020-04-02 11:14:34,194] INFO [GroupMetadataManager brokerId=1] Scheduling 
> loading of offsets and group metadata from __consumer_offsets-0 
> (kafka.coordinator.group.GroupMetadataManager:66)
> [2020-04-02 11:14:34,199] INFO Static member 
> MemberMetadata(memberId=ConsumerTestConsumer-ceaab707-69af-4a65-8275-cb7db7fb66b3,
>  groupInstanceId=Some(null), clientId=ConsumerTestConsumer, 
> clientHost=/127.0.0.1, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, 
> supportedProtocols=List(range), ).groupInstanceId of group 
> group-max-size-test loaded with member id 
> ConsumerTestConsumer-ceaab707-69af-4a65-8275-cb7db7fb66b3 at generation 1. 
> (kafka.coordinator.group.GroupMetadata$:126)
> [2020-04-02 11:14:34,199] INFO Static member 
> MemberMetadata(memberId=ConsumerTestConsumer-07077ca2-30e9-45cd-b363-30672281bacb,
>  groupInstanceId=Some(null), clientId=ConsumerTestConsumer, 
> clientHost=/127.0.0.1, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, 
> supportedProtocols=List(range), ).groupInstanceId of group 
> group-max-size-test loaded with member id 
> ConsumerTestConsumer-07077ca2-30e9-45cd-b363-30672281bacb at generation 1. 
> (kafka.coordinator.group.GroupMetadata$:126)
> [2020-04-02 11:14:34,199] INFO Static member 
> MemberMetadata(memberId=ConsumerTestConsumer-5d359e65-1f11-43ce-874e-fddf55c0b49d,
>  groupInstanceId=Some(null), clientId=ConsumerTestConsumer, 
> clientHost=/127.0.0.1, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, 
> supportedProtocols=List(range), ).groupInstanceId of group 
> group-max-size-test loaded with member id 
> ConsumerTestConsumer-5d359e65-1f11-43ce-874e-fddf55c0b49d at generation 1. 
> (kafka.coordinator.group.GroupMetadata$:126)
> [2020-04-02 11:14:34,201] INFO [GroupCoordinator 1]: Loading group metadata 
> for group-max-size-test with generation 1 
> (kafka.coordinator.group.GroupCoordinator:66)
> [2020-04-02 11:14:34,202] INFO [GroupCoordinator 1]: Preparing to rebalance 
> group group-max-size-test in state PreparingRebalance with old generation 1 
> (__consumer_offsets-0) (reason: Freshly-loaded group is over capacity 
> (GroupConfig(10,1800000,2,0).groupMaxSize). Rebalacing in order to give a 
> chance for consumers to commit offsets) 
> (kafka.coordinator.group.GroupCoordinator:66)
> [2020-04-02 11:14:34,203] INFO [GroupMetadataManager brokerId=1] Finished 
> loading offsets and group metadata from __consumer_offsets-0 in 9 
> milliseconds, of which 0 milliseconds was spent in the scheduler. 
> (kafka.coordinator.group.GroupMetadataManager:66)
> // Prefered leader election is completed
> [2020-04-02 11:14:34,235] INFO [Controller id=0] Partition 
> __consumer_offsets-0 completed preferred replica leader election. New leader 
> is 1 (kafka.controller.KafkaController:66)
> // Group is unloaded from GroupCoordinator 0
> [2020-04-02 11:14:34,237] INFO [GroupMetadataManager brokerId=0] Scheduling 
> unloading of offsets and group metadata from __consumer_offsets-0 
> (kafka.coordinator.group.GroupMetadataManager:66)
> [2020-04-02 11:14:34,237] INFO [GroupCoordinator 0]: Unloading group metadata 
> for group-max-size-test with generation 1 
> (kafka.coordinator.group.GroupCoordinator:66)
> [2020-04-02 11:14:34,238] INFO [GroupMetadataManager brokerId=0] Finished 
> unloading __consumer_offsets-0. Removed 0 cached offsets and 1 cached groups. 
> (kafka.coordinator.group.GroupMetadataManager:66)
> // A second consumer is kicked out of the group while trying to re-join
> [2020-04-02 11:14:34,252] ERROR [Consumer clientId=ConsumerTestConsumer, 
> groupId=group-max-size-test] Attempt to join group failed due to fatal error: 
> The consumer group has reached its max size. 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:627)
> [2020-04-02 11:14:34,254] ERROR [daemon-consumer-assignment-1]: Error due to 
> (kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:76)
> org.apache.kafka.common.errors.GroupMaxSizeReachedException: Consumer group 
> group-max-size-test already has the configured maximum number of members.
> [2020-04-02 11:14:34,254] INFO [daemon-consumer-assignment-1]: Stopped 
> (kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:66) {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to