David Jacot created KAFKA-9885:
----------------------------------

             Summary: Evict last members of a group when the maximum allowed is 
reached
                 Key: KAFKA-9885
                 URL: https://issues.apache.org/jira/browse/KAFKA-9885
             Project: Kafka
          Issue Type: Bug
            Reporter: David Jacot
            Assignee: David Jacot


While analysing https://issues.apache.org/jira/browse/KAFKA-7965, we found that 
multiple members of a group can be evicted from a group if the leader of the 
consumer offset partition changes before the group is persisted. This happens 
because the current evection logic always evict the first member which rejoins 
the group.

We would like to change the evection logic so that the last members to rejoin 
the group are kicked out instead.

Here is an example of what happens when the leader changes:
{noformat}
// Group is loaded in GroupCoordinator 0
// A rebalance is triggered because the group is over capacity
[2020-04-02 11:14:33,393] INFO [GroupMetadataManager brokerId=0] Scheduling 
loading of offsets and group metadata from __consumer_offsets-0 
(kafka.coordinator.group.GroupMetadataManager:66)
[2020-04-02 11:14:33,406] INFO [Consumer clientId=ConsumerTestConsumer, 
groupId=group-max-size-test] Discovered group coordinator localhost:40071 (id: 
2147483647 rack: null) 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:794)
[2020-04-02 11:14:33,409] INFO Static member 
MemberMetadata(memberId=ConsumerTestConsumer-ceaab707-69af-4a65-8275-cb7db7fb66b3,
 groupInstanceId=Some(null), clientId=ConsumerTestConsumer, 
clientHost=/127.0.0.1, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, 
supportedProtocols=List(range), ).groupInstanceId of group group-max-size-test 
loaded with member id ConsumerTestConsumer-ceaab707-69af-4a65-8275-cb7db7fb66b3 
at generation 1. (kafka.coordinator.group.GroupMetadata$:126)
[2020-04-02 11:14:33,410] INFO Static member 
MemberMetadata(memberId=ConsumerTestConsumer-07077ca2-30e9-45cd-b363-30672281bacb,
 groupInstanceId=Some(null), clientId=ConsumerTestConsumer, 
clientHost=/127.0.0.1, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, 
supportedProtocols=List(range), ).groupInstanceId of group group-max-size-test 
loaded with member id ConsumerTestConsumer-07077ca2-30e9-45cd-b363-30672281bacb 
at generation 1. (kafka.coordinator.group.GroupMetadata$:126)
[2020-04-02 11:14:33,412] INFO Static member 
MemberMetadata(memberId=ConsumerTestConsumer-5d359e65-1f11-43ce-874e-fddf55c0b49d,
 groupInstanceId=Some(null), clientId=ConsumerTestConsumer, 
clientHost=/127.0.0.1, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, 
supportedProtocols=List(range), ).groupInstanceId of group group-max-size-test 
loaded with member id ConsumerTestConsumer-5d359e65-1f11-43ce-874e-fddf55c0b49d 
at generation 1. (kafka.coordinator.group.GroupMetadata$:126)
[2020-04-02 11:14:33,413] INFO [GroupCoordinator 0]: Loading group metadata for 
group-max-size-test with generation 1 
(kafka.coordinator.group.GroupCoordinator:66)
[2020-04-02 11:14:33,413] INFO [GroupCoordinator 0]: Preparing to rebalance 
group group-max-size-test in state PreparingRebalance with old generation 1 
(__consumer_offsets-0) (reason: Freshly-loaded group is over capacity 
(GroupConfig(10,1800000,2,0).groupMaxSize). Rebalacing in order to give a 
chance for consumers to commit offsets) 
(kafka.coordinator.group.GroupCoordinator:66)
[2020-04-02 11:14:33,431] INFO [GroupMetadataManager brokerId=0] Finished 
loading offsets and group metadata from __consumer_offsets-0 in 28 
milliseconds, of which 0 milliseconds was spent in the scheduler. 
(kafka.coordinator.group.GroupMetadataManager:66)

// A first consumer is kicked out of the group while trying to re-join
[2020-04-02 11:14:33,449] ERROR [Consumer clientId=ConsumerTestConsumer, 
groupId=group-max-size-test] Attempt to join group failed due to fatal error: 
The consumer group has reached its max size. 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:627)
[2020-04-02 11:14:33,451] ERROR [daemon-consumer-assignment-2]: Error due to 
(kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:76)
org.apache.kafka.common.errors.GroupMaxSizeReachedException: Consumer group 
group-max-size-test already has the configured maximum number of members.
[2020-04-02 11:14:33,451] INFO [daemon-consumer-assignment-2]: Stopped 
(kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:66)

// Before the rebalance is completed, a preferred replica leader election kicks 
in and move the leader from 0 to 1
[2020-04-02 11:14:34,155] INFO [Controller id=0] Processing automatic preferred 
replica leader election (kafka.controller.KafkaController:66)
[2020-04-02 11:14:34,169] INFO [Controller id=0] Starting replica leader 
election (PREFERRED) for partitions 
group-max-size-test-0,group-max-size-test-3,__consumer_offsets-0 triggered by 
AutoTriggered (kafka.controller.KafkaController:66)

// The group is loaded in GroupCoordinator 1 before completing the rebalance
// Another rebalance is triggered because the group is still over capacity
[2020-04-02 11:14:34,194] INFO [GroupMetadataManager brokerId=1] Scheduling 
loading of offsets and group metadata from __consumer_offsets-0 
(kafka.coordinator.group.GroupMetadataManager:66)
[2020-04-02 11:14:34,199] INFO Static member 
MemberMetadata(memberId=ConsumerTestConsumer-ceaab707-69af-4a65-8275-cb7db7fb66b3,
 groupInstanceId=Some(null), clientId=ConsumerTestConsumer, 
clientHost=/127.0.0.1, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, 
supportedProtocols=List(range), ).groupInstanceId of group group-max-size-test 
loaded with member id ConsumerTestConsumer-ceaab707-69af-4a65-8275-cb7db7fb66b3 
at generation 1. (kafka.coordinator.group.GroupMetadata$:126)
[2020-04-02 11:14:34,199] INFO Static member 
MemberMetadata(memberId=ConsumerTestConsumer-07077ca2-30e9-45cd-b363-30672281bacb,
 groupInstanceId=Some(null), clientId=ConsumerTestConsumer, 
clientHost=/127.0.0.1, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, 
supportedProtocols=List(range), ).groupInstanceId of group group-max-size-test 
loaded with member id ConsumerTestConsumer-07077ca2-30e9-45cd-b363-30672281bacb 
at generation 1. (kafka.coordinator.group.GroupMetadata$:126)
[2020-04-02 11:14:34,199] INFO Static member 
MemberMetadata(memberId=ConsumerTestConsumer-5d359e65-1f11-43ce-874e-fddf55c0b49d,
 groupInstanceId=Some(null), clientId=ConsumerTestConsumer, 
clientHost=/127.0.0.1, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, 
supportedProtocols=List(range), ).groupInstanceId of group group-max-size-test 
loaded with member id ConsumerTestConsumer-5d359e65-1f11-43ce-874e-fddf55c0b49d 
at generation 1. (kafka.coordinator.group.GroupMetadata$:126)
[2020-04-02 11:14:34,201] INFO [GroupCoordinator 1]: Loading group metadata for 
group-max-size-test with generation 1 
(kafka.coordinator.group.GroupCoordinator:66)
[2020-04-02 11:14:34,202] INFO [GroupCoordinator 1]: Preparing to rebalance 
group group-max-size-test in state PreparingRebalance with old generation 1 
(__consumer_offsets-0) (reason: Freshly-loaded group is over capacity 
(GroupConfig(10,1800000,2,0).groupMaxSize). Rebalacing in order to give a 
chance for consumers to commit offsets) 
(kafka.coordinator.group.GroupCoordinator:66)
[2020-04-02 11:14:34,203] INFO [GroupMetadataManager brokerId=1] Finished 
loading offsets and group metadata from __consumer_offsets-0 in 9 milliseconds, 
of which 0 milliseconds was spent in the scheduler. 
(kafka.coordinator.group.GroupMetadataManager:66)

// Prefered leader election is completed
[2020-04-02 11:14:34,235] INFO [Controller id=0] Partition __consumer_offsets-0 
completed preferred replica leader election. New leader is 1 
(kafka.controller.KafkaController:66)

// Group is unloaded from GroupCoordinator 0
[2020-04-02 11:14:34,237] INFO [GroupMetadataManager brokerId=0] Scheduling 
unloading of offsets and group metadata from __consumer_offsets-0 
(kafka.coordinator.group.GroupMetadataManager:66)
[2020-04-02 11:14:34,237] INFO [GroupCoordinator 0]: Unloading group metadata 
for group-max-size-test with generation 1 
(kafka.coordinator.group.GroupCoordinator:66)
[2020-04-02 11:14:34,238] INFO [GroupMetadataManager brokerId=0] Finished 
unloading __consumer_offsets-0. Removed 0 cached offsets and 1 cached groups. 
(kafka.coordinator.group.GroupMetadataManager:66)

// A second consumer is kicked out of the group while trying to re-join
[2020-04-02 11:14:34,252] ERROR [Consumer clientId=ConsumerTestConsumer, 
groupId=group-max-size-test] Attempt to join group failed due to fatal error: 
The consumer group has reached its max size. 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:627)
[2020-04-02 11:14:34,254] ERROR [daemon-consumer-assignment-1]: Error due to 
(kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:76)
org.apache.kafka.common.errors.GroupMaxSizeReachedException: Consumer group 
group-max-size-test already has the configured maximum number of members.
[2020-04-02 11:14:34,254] INFO [daemon-consumer-assignment-1]: Stopped 
(kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:66) {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to