[ https://issues.apache.org/jira/browse/KAFKA-9885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jason Gustafson resolved KAFKA-9885. ------------------------------------ Fix Version/s: 2.6.0 Resolution: Fixed > Evict last members of a group when the maximum allowed is reached > ----------------------------------------------------------------- > > Key: KAFKA-9885 > URL: https://issues.apache.org/jira/browse/KAFKA-9885 > Project: Kafka > Issue Type: Bug > Reporter: David Jacot > Assignee: David Jacot > Priority: Major > Fix For: 2.6.0 > > > While analysing https://issues.apache.org/jira/browse/KAFKA-7965, we found > that multiple members of a group can be evicted from a group if the leader of > the consumer offset partition changes before the group is persisted. This > happens because the current evection logic always evict the first member > which rejoins the group. > We would like to change the evection logic so that the last members to rejoin > the group are kicked out instead. > Here is an example of what happens when the leader changes: > {noformat} > // Group is loaded in GroupCoordinator 0 > // A rebalance is triggered because the group is over capacity > [2020-04-02 11:14:33,393] INFO [GroupMetadataManager brokerId=0] Scheduling > loading of offsets and group metadata from __consumer_offsets-0 > (kafka.coordinator.group.GroupMetadataManager:66) > [2020-04-02 11:14:33,406] INFO [Consumer clientId=ConsumerTestConsumer, > groupId=group-max-size-test] Discovered group coordinator localhost:40071 > (id: 2147483647 rack: null) > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:794) > [2020-04-02 11:14:33,409] INFO Static member > MemberMetadata(memberId=ConsumerTestConsumer-ceaab707-69af-4a65-8275-cb7db7fb66b3, > groupInstanceId=Some(null), clientId=ConsumerTestConsumer, > clientHost=/127.0.0.1, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, > supportedProtocols=List(range), ).groupInstanceId of group > group-max-size-test loaded with member id > ConsumerTestConsumer-ceaab707-69af-4a65-8275-cb7db7fb66b3 at generation 1. > (kafka.coordinator.group.GroupMetadata$:126) > [2020-04-02 11:14:33,410] INFO Static member > MemberMetadata(memberId=ConsumerTestConsumer-07077ca2-30e9-45cd-b363-30672281bacb, > groupInstanceId=Some(null), clientId=ConsumerTestConsumer, > clientHost=/127.0.0.1, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, > supportedProtocols=List(range), ).groupInstanceId of group > group-max-size-test loaded with member id > ConsumerTestConsumer-07077ca2-30e9-45cd-b363-30672281bacb at generation 1. > (kafka.coordinator.group.GroupMetadata$:126) > [2020-04-02 11:14:33,412] INFO Static member > MemberMetadata(memberId=ConsumerTestConsumer-5d359e65-1f11-43ce-874e-fddf55c0b49d, > groupInstanceId=Some(null), clientId=ConsumerTestConsumer, > clientHost=/127.0.0.1, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, > supportedProtocols=List(range), ).groupInstanceId of group > group-max-size-test loaded with member id > ConsumerTestConsumer-5d359e65-1f11-43ce-874e-fddf55c0b49d at generation 1. > (kafka.coordinator.group.GroupMetadata$:126) > [2020-04-02 11:14:33,413] INFO [GroupCoordinator 0]: Loading group metadata > for group-max-size-test with generation 1 > (kafka.coordinator.group.GroupCoordinator:66) > [2020-04-02 11:14:33,413] INFO [GroupCoordinator 0]: Preparing to rebalance > group group-max-size-test in state PreparingRebalance with old generation 1 > (__consumer_offsets-0) (reason: Freshly-loaded group is over capacity > (GroupConfig(10,1800000,2,0).groupMaxSize). Rebalacing in order to give a > chance for consumers to commit offsets) > (kafka.coordinator.group.GroupCoordinator:66) > [2020-04-02 11:14:33,431] INFO [GroupMetadataManager brokerId=0] Finished > loading offsets and group metadata from __consumer_offsets-0 in 28 > milliseconds, of which 0 milliseconds was spent in the scheduler. > (kafka.coordinator.group.GroupMetadataManager:66) > // A first consumer is kicked out of the group while trying to re-join > [2020-04-02 11:14:33,449] ERROR [Consumer clientId=ConsumerTestConsumer, > groupId=group-max-size-test] Attempt to join group failed due to fatal error: > The consumer group has reached its max size. > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:627) > [2020-04-02 11:14:33,451] ERROR [daemon-consumer-assignment-2]: Error due to > (kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:76) > org.apache.kafka.common.errors.GroupMaxSizeReachedException: Consumer group > group-max-size-test already has the configured maximum number of members. > [2020-04-02 11:14:33,451] INFO [daemon-consumer-assignment-2]: Stopped > (kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:66) > // Before the rebalance is completed, a preferred replica leader election > kicks in and move the leader from 0 to 1 > [2020-04-02 11:14:34,155] INFO [Controller id=0] Processing automatic > preferred replica leader election (kafka.controller.KafkaController:66) > [2020-04-02 11:14:34,169] INFO [Controller id=0] Starting replica leader > election (PREFERRED) for partitions > group-max-size-test-0,group-max-size-test-3,__consumer_offsets-0 triggered by > AutoTriggered (kafka.controller.KafkaController:66) > // The group is loaded in GroupCoordinator 1 before completing the rebalance > // Another rebalance is triggered because the group is still over capacity > [2020-04-02 11:14:34,194] INFO [GroupMetadataManager brokerId=1] Scheduling > loading of offsets and group metadata from __consumer_offsets-0 > (kafka.coordinator.group.GroupMetadataManager:66) > [2020-04-02 11:14:34,199] INFO Static member > MemberMetadata(memberId=ConsumerTestConsumer-ceaab707-69af-4a65-8275-cb7db7fb66b3, > groupInstanceId=Some(null), clientId=ConsumerTestConsumer, > clientHost=/127.0.0.1, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, > supportedProtocols=List(range), ).groupInstanceId of group > group-max-size-test loaded with member id > ConsumerTestConsumer-ceaab707-69af-4a65-8275-cb7db7fb66b3 at generation 1. > (kafka.coordinator.group.GroupMetadata$:126) > [2020-04-02 11:14:34,199] INFO Static member > MemberMetadata(memberId=ConsumerTestConsumer-07077ca2-30e9-45cd-b363-30672281bacb, > groupInstanceId=Some(null), clientId=ConsumerTestConsumer, > clientHost=/127.0.0.1, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, > supportedProtocols=List(range), ).groupInstanceId of group > group-max-size-test loaded with member id > ConsumerTestConsumer-07077ca2-30e9-45cd-b363-30672281bacb at generation 1. > (kafka.coordinator.group.GroupMetadata$:126) > [2020-04-02 11:14:34,199] INFO Static member > MemberMetadata(memberId=ConsumerTestConsumer-5d359e65-1f11-43ce-874e-fddf55c0b49d, > groupInstanceId=Some(null), clientId=ConsumerTestConsumer, > clientHost=/127.0.0.1, sessionTimeoutMs=10000, rebalanceTimeoutMs=60000, > supportedProtocols=List(range), ).groupInstanceId of group > group-max-size-test loaded with member id > ConsumerTestConsumer-5d359e65-1f11-43ce-874e-fddf55c0b49d at generation 1. > (kafka.coordinator.group.GroupMetadata$:126) > [2020-04-02 11:14:34,201] INFO [GroupCoordinator 1]: Loading group metadata > for group-max-size-test with generation 1 > (kafka.coordinator.group.GroupCoordinator:66) > [2020-04-02 11:14:34,202] INFO [GroupCoordinator 1]: Preparing to rebalance > group group-max-size-test in state PreparingRebalance with old generation 1 > (__consumer_offsets-0) (reason: Freshly-loaded group is over capacity > (GroupConfig(10,1800000,2,0).groupMaxSize). Rebalacing in order to give a > chance for consumers to commit offsets) > (kafka.coordinator.group.GroupCoordinator:66) > [2020-04-02 11:14:34,203] INFO [GroupMetadataManager brokerId=1] Finished > loading offsets and group metadata from __consumer_offsets-0 in 9 > milliseconds, of which 0 milliseconds was spent in the scheduler. > (kafka.coordinator.group.GroupMetadataManager:66) > // Prefered leader election is completed > [2020-04-02 11:14:34,235] INFO [Controller id=0] Partition > __consumer_offsets-0 completed preferred replica leader election. New leader > is 1 (kafka.controller.KafkaController:66) > // Group is unloaded from GroupCoordinator 0 > [2020-04-02 11:14:34,237] INFO [GroupMetadataManager brokerId=0] Scheduling > unloading of offsets and group metadata from __consumer_offsets-0 > (kafka.coordinator.group.GroupMetadataManager:66) > [2020-04-02 11:14:34,237] INFO [GroupCoordinator 0]: Unloading group metadata > for group-max-size-test with generation 1 > (kafka.coordinator.group.GroupCoordinator:66) > [2020-04-02 11:14:34,238] INFO [GroupMetadataManager brokerId=0] Finished > unloading __consumer_offsets-0. Removed 0 cached offsets and 1 cached groups. > (kafka.coordinator.group.GroupMetadataManager:66) > // A second consumer is kicked out of the group while trying to re-join > [2020-04-02 11:14:34,252] ERROR [Consumer clientId=ConsumerTestConsumer, > groupId=group-max-size-test] Attempt to join group failed due to fatal error: > The consumer group has reached its max size. > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:627) > [2020-04-02 11:14:34,254] ERROR [daemon-consumer-assignment-1]: Error due to > (kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:76) > org.apache.kafka.common.errors.GroupMaxSizeReachedException: Consumer group > group-max-size-test already has the configured maximum number of members. > [2020-04-02 11:14:34,254] INFO [daemon-consumer-assignment-1]: Stopped > (kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:66) {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)