[ 
https://issues.apache.org/jira/browse/KAFKA-14639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee reassigned KAFKA-14639:
----------------------------------

    Assignee: Philip Nee

> Kafka CooperativeStickyAssignor revokes/assigns partition in one rebalance 
> cycle
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-14639
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14639
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>    Affects Versions: 3.2.1
>            Reporter: Bojan Blagojevic
>            Assignee: Philip Nee
>            Priority: Major
>         Attachments: consumers-jira.log
>
>
> I have an application that runs 6 consumers in parallel. I am getting some 
> unexpected results when I use {{{}CooperativeStickyAssignor{}}}. If I 
> understand the mechanism correctly, if the consumer looses partition in one 
> rebalance cycle, the partition should be assigned in the next rebalance cycle.
> This assumption is based on the 
> [RebalanceProtocol|https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.RebalanceProtocol.html]
>  documentation and few blog posts that describe the protocol, like [this 
> one|https://www.confluent.io/blog/cooperative-rebalancing-in-kafka-streams-consumer-ksqldb/]
>  on Confluent blog.
> {quote}The assignor should not reassign any owned partitions immediately, but 
> instead may indicate consumers the need for partition revocation so that the 
> revoked partitions can be reassigned to other consumers in the next rebalance 
> event. This is designed for sticky assignment logic which attempts to 
> minimize partition reassignment with cooperative adjustments.
> {quote}
> {quote}Any member that revoked partitions then rejoins the group, triggering 
> a second rebalance so that its revoked partitions can be assigned. Until 
> then, these partitions are unowned and unassigned.
> {quote}
> These are the logs from the application that uses 
> {{{}protocol='cooperative-sticky'{}}}. In the same rebalance cycle 
> ({{{}generationId=640{}}}) {{partition 74}} moves from {{consumer-3}} to 
> {{{}consumer-4{}}}. I omitted the lines that are logged by the other 4 
> consumers.
> Mind that the log is in reverse(bottom to top)
> {code:java}
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : New 
> partition assignment: partition-59, seek to min common offset: 85120524
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler2 : Partitions 
> [partition-59] assigned successfully
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Partitions 
> assigned: [partition-59]
> 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Adding newly assigned partitions: partition-59
> 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Notifying assignor about the new 
> Assignment(partitions=[partition-59])
> 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Request joining group due to: need to revoke partitions 
> [partition-26, partition-74] as indicated by the current assignment and 
> re-join
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler2 : Partitions 
> [partition-26, partition-74] revoked successfully
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Finished 
> removing partition data
> 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-4-my-client-id-my-group-id, 
> groupId=my-group-id] (Re-)joining group
> 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : New 
> partition assignment: partition-74, seek to min common offset: 107317730
> 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler2 : Partitions 
> [partition-74] assigned successfully
> 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : Partitions 
> assigned: [partition-74]
> 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-4-my-client-id-my-group-id, 
> groupId=my-group-id] Adding newly assigned partitions: partition-74
> 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-4-my-client-id-my-group-id, 
> groupId=my-group-id] Notifying assignor about the new 
> Assignment(partitions=[partition-74])
> 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-4-my-client-id-my-group-id, 
> groupId=my-group-id] Request joining group due to: need to revoke partitions 
> [partition-57] as indicated by the current assignment and re-join
> 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler2 : Partitions 
> [partition-57] revoked successfully
> 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : Finished 
> removing partition data
> 2022-12-14 11:18:22 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Partitions 
> revoked: [partition-26, partition-74]
> 2022-12-14 11:18:22 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Revoke previously assigned partitions partition-26, 
> partition-74
> 2022-12-14 11:18:22 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Updating assignment with\n\tAssigned partitions: 
> [partition-59]\n\tCurrent owned partitions: [partition-26, 
> partition-74]\n\tAdded partitions (assigned - owned): 
> [partition-59]\n\tRevoked partitions (owned - assigned): [partition-26, 
> partition-74]
> 2022-12-14 11:18:22 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Successfully synced group in generation 
> Generation{generationId=640, 
> memberId='partition-3-my-client-id-my-group-id-c31afd19-3f22-43cb-ad07-9088aa98d3af',
>  protocol='cooperative-sticky'}
> 2022-12-14 11:18:22 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Successfully joined group with generation 
> Generation{generationId=640, 
> memberId='partition-3-my-client-id-my-group-id-c31afd19-3f22-43cb-ad07-9088aa98d3af',
>  protocol='cooperative-sticky'}
> 2022-12-14 11:18:22 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : Partitions 
> revoked: [partition-57]
> 2022-12-14 11:18:22 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-4-my-client-id-my-group-id, 
> groupId=my-group-id] Revoke previously assigned partitions partition-57
> 2022-12-14 11:18:22 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-4-my-client-id-my-group-id, 
> groupId=my-group-id] Updating assignment with\n\tAssigned partitions: 
> [partition-74]\n\tCurrent owned partitions: [partition-57]\n\tAdded 
> partitions (assigned - owned): [partition-74]\n\tRevoked partitions (owned - 
> assigned): [partition-57]
> 2022-12-14 11:18:21 1 — [id-1] o.a.k.c.c.internals.ConsumerCoordinator : 
> [Consumer clientId=partition-4-my-client-id-my-group-id, groupId=my-group-id] 
> Successfully synced group in generation Generation{generationId=640, 
> memberId='partition-4-my-client-id-my-group-id-ae2af665-edc9-4a8e-b658-98372d142477',
>  protocol='cooperative-sticky'}
> 2022-12-14 11:18:21 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-4-my-client-id-my-group-id, 
> groupId=my-group-id] Successfully joined group with generation 
> Generation{generationId=640, 
> memberId='partition-4-my-client-id-my-group-id-ae2af665-edc9-4a8e-b658-98372d142477',
>  protocol='cooperative-sticky'} {code}
> Is this expected?
> Kafka client version is 3.2.1.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to