[ 
https://issues.apache.org/jira/browse/KAFKA-14639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17680275#comment-17680275
 ] 

Bojan Blagojevic commented on KAFKA-14639:
------------------------------------------

Thank you again for the quick response. I will try to answer to your questions.
h5. Answers:
??1. Can you check and/or provide all the logs from consumer-3 between gen 639 
and gen 640? Is there anything in there about resetting the generation, 
dropping out of the group, resetting the member id, anything at all like that???
I don't see nothing like that happening between gen 639 and gen 640. I attached 
the log excerpt related to few surrounding generations [^consumers-jira.log].

??2. The only other thing off the top of my head to check would be that every 
single consumer was configured with (only) the CooperativeStickyAssignor over 
the full period from gen 639 through the end of gen 640, or at least check the 
group leader (consumer-5 I think?) and consumers 3 & 4.??
The full logs are unfortunately expired but I am pretty sure that all the 
consumers were configured with only `CooperativeStickyAssignor`. They are part 
of Kuberenetes deployment in which all the pods share the configuration. I 
observed correct group leader behaviour when changing ownership of other 
partitions. I followed ownership changes when partition *partition-68* is moved.
It belonged to the consumer partition-2-6b9db8686f-hswvn... in generation 639.
{code:java}
Final assignment of partitions to consumers:
partition-2-6b9db8686f-hswvn-bbcfa7e4-7d5b-4227-ad62-99e8cc6e176f=[partition-20,
 partition-68]
...
Finished assignment for group at generation 639:
partition-2-6b9db8686f-hswvn-bbcfa7e4-7d5b-4227-ad62-99e8cc6e176f=Assignment(partitions=[partition-20,
 partition-68]) {code}
Between generation 639 and generation 640 new pod joins, pod-6b9db8686f-p87m9. 
One of the Kafka consumers that belongs to this pod, 
partition-3-6b9db8686f-p87m9..., in generation 640 gets the partition-68 as 
assigned and it is logged in AbstractStickyAssignor.constrainedAssign.
{code:java}
Final assignment of partitions to consumers:
partition-3-6b9db8686f-p87m9-737d9359-daa5-4d89-9e4c-40a433aa8c6c=[partition-68]{code}
Since this partition is changing ownership, it does not show up in the log of 
ConsumerCoordinator, which is expected.
{code:java}
Finished assignment for group at generation 640:
partition-3-6b9db8686f-p87m9-737d9359-daa5-4d89-9e4c-40a433aa8c6c=Assignment(partitions=[]){code}
And it gets assigned in generation 641:
 
{code:java}
Final assignment of partitions to consumers: 
partition-3-6b9db8686f-psfx4-5dd12c98-e698-44aa-9131-56281e798369=[partition-68]
 
... 
Finished assignment for group at generation 641: 
partition-3-6b9db8686f-psfx4-5dd12c98-e698-44aa-9131-56281e798369=Assignment(partitions=[partition-68]){code}
It gets assigned to a consumer which is different than the one determined in 
generation 640 but this does not break the rebalance barrier.
h5. Additional notes
Not sure if it matters. I saw a several consumers logging:
{code:java}
... org.apache.kafka.clients.Metadata ... Resetting the last seen epoch of 
partition partition-74 to 149 since the associated topicId changed from null to 
nixqUZnpQYWjY0RreaCczA" {code}
 
I think that the HB thread(I am assuming this based on the [code I've 
read|https://github.com/apache/kafka/blob/3.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1193])
 is requesting join on behalf of consumers. Again, not sure if it matters:
{code:java}
"2022-12-14 11:17:48 1 --- [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
: [Consumer clientId=partition-4-my-client-id-my-group-id-random_hash, 
groupId=my-group-id] (Re-)joining group"
"2022-12-14 11:17:48 1 --- [consumer-2] o.a.k.c.c.internals.ConsumerCoordinator 
: [Consumer clientId=partition-2-my-client-id-my-group-id-random_hash, 
groupId=my-group-id] (Re-)joining group"
"2022-12-14 11:17:48 1 --- [my-group-id] 
o.a.k.c.c.internals.ConsumerCoordinator : [Consumer 
clientId=partition-4-my-client-id-my-group-id-random_hash, groupId=my-group-id] 
Request joining group due to: group is already rebalancing"
"2022-12-14 11:17:48 1 --- [my-group-id] 
o.a.k.c.c.internals.ConsumerCoordinator : [Consumer 
clientId=partition-2-my-client-id-my-group-id-random_hash, groupId=my-group-id] 
Request joining group due to: group is already rebalancing"
"2022-12-14 11:17:47 1 --- [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
: [Consumer clientId=partition-3-my-client-id-my-group-id-random_hash, 
groupId=my-group-id] (Re-)joining group"
"2022-12-14 11:17:47 1 --- [consumer-1] o.a.k.c.c.internals.ConsumerCoordinator 
: [Consumer clientId=partition-1-my-client-id-my-group-id-random_hash, 
groupId=my-group-id] (Re-)joining group"
"2022-12-14 11:17:47 1 --- [consumer-0] o.a.k.c.c.internals.ConsumerCoordinator 
: [Consumer clientId=partition-0-my-client-id-my-group-id-random_hash, 
groupId=my-group-id] (Re-)joining group"
"2022-12-14 11:17:46 1 --- [my-group-id] 
o.a.k.c.c.internals.ConsumerCoordinator : [Consumer 
clientId=partition-0-my-client-id-my-group-id-random_hash, groupId=my-group-id] 
Request joining group due to: group is already rebalancing"
"2022-12-14 11:17:46 1 --- [my-group-id] 
o.a.k.c.c.internals.ConsumerCoordinator : [Consumer 
clientId=partition-3-my-client-id-my-group-id-random_hash, groupId=my-group-id] 
Request joining group due to: group is already rebalancing"
"2022-12-14 11:17:45 1 --- [consumer-5] o.a.k.c.c.internals.ConsumerCoordinator 
: [Consumer clientId=partition-5-my-client-id-my-group-id-random_hash, 
groupId=my-group-id] (Re-)joining group"
"2022-12-14 11:17:45 1 --- [my-group-id] 
o.a.k.c.c.internals.ConsumerCoordinator : [Consumer 
clientId=partition-1-my-client-id-my-group-id-random_hash, groupId=my-group-id] 
Request joining group due to: group is already rebalancing"
"2022-12-14 11:17:45 1 --- [my-group-id] 
o.a.k.c.c.internals.ConsumerCoordinator : [Consumer 
clientId=partition-5-my-client-id-my-group-id-random_hash, groupId=my-group-id] 
Request joining group due to: group is already rebalancing" {code}
 

> Kafka CooperativeStickyAssignor revokes/assigns partition in one rebalance 
> cycle
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-14639
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14639
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>    Affects Versions: 3.2.1
>            Reporter: Bojan Blagojevic
>            Priority: Major
>         Attachments: consumers-jira.log
>
>
> I have an application that runs 6 consumers in parallel. I am getting some 
> unexpected results when I use {{{}CooperativeStickyAssignor{}}}. If I 
> understand the mechanism correctly, if the consumer looses partition in one 
> rebalance cycle, the partition should be assigned in the next rebalance cycle.
> This assumption is based on the 
> [RebalanceProtocol|https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.RebalanceProtocol.html]
>  documentation and few blog posts that describe the protocol, like [this 
> one|https://www.confluent.io/blog/cooperative-rebalancing-in-kafka-streams-consumer-ksqldb/]
>  on Confluent blog.
> {quote}The assignor should not reassign any owned partitions immediately, but 
> instead may indicate consumers the need for partition revocation so that the 
> revoked partitions can be reassigned to other consumers in the next rebalance 
> event. This is designed for sticky assignment logic which attempts to 
> minimize partition reassignment with cooperative adjustments.
> {quote}
> {quote}Any member that revoked partitions then rejoins the group, triggering 
> a second rebalance so that its revoked partitions can be assigned. Until 
> then, these partitions are unowned and unassigned.
> {quote}
> These are the logs from the application that uses 
> {{{}protocol='cooperative-sticky'{}}}. In the same rebalance cycle 
> ({{{}generationId=640{}}}) {{partition 74}} moves from {{consumer-3}} to 
> {{{}consumer-4{}}}. I omitted the lines that are logged by the other 4 
> consumers.
> Mind that the log is in reverse(bottom to top)
> {code:java}
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : New 
> partition assignment: partition-59, seek to min common offset: 85120524
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler2 : Partitions 
> [partition-59] assigned successfully
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Partitions 
> assigned: [partition-59]
> 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Adding newly assigned partitions: partition-59
> 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Notifying assignor about the new 
> Assignment(partitions=[partition-59])
> 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Request joining group due to: need to revoke partitions 
> [partition-26, partition-74] as indicated by the current assignment and 
> re-join
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler2 : Partitions 
> [partition-26, partition-74] revoked successfully
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Finished 
> removing partition data
> 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-4-my-client-id-my-group-id, 
> groupId=my-group-id] (Re-)joining group
> 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : New 
> partition assignment: partition-74, seek to min common offset: 107317730
> 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler2 : Partitions 
> [partition-74] assigned successfully
> 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : Partitions 
> assigned: [partition-74]
> 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-4-my-client-id-my-group-id, 
> groupId=my-group-id] Adding newly assigned partitions: partition-74
> 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-4-my-client-id-my-group-id, 
> groupId=my-group-id] Notifying assignor about the new 
> Assignment(partitions=[partition-74])
> 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-4-my-client-id-my-group-id, 
> groupId=my-group-id] Request joining group due to: need to revoke partitions 
> [partition-57] as indicated by the current assignment and re-join
> 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler2 : Partitions 
> [partition-57] revoked successfully
> 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : Finished 
> removing partition data
> 2022-12-14 11:18:22 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Partitions 
> revoked: [partition-26, partition-74]
> 2022-12-14 11:18:22 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Revoke previously assigned partitions partition-26, 
> partition-74
> 2022-12-14 11:18:22 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Updating assignment with\n\tAssigned partitions: 
> [partition-59]\n\tCurrent owned partitions: [partition-26, 
> partition-74]\n\tAdded partitions (assigned - owned): 
> [partition-59]\n\tRevoked partitions (owned - assigned): [partition-26, 
> partition-74]
> 2022-12-14 11:18:22 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Successfully synced group in generation 
> Generation{generationId=640, 
> memberId='partition-3-my-client-id-my-group-id-c31afd19-3f22-43cb-ad07-9088aa98d3af',
>  protocol='cooperative-sticky'}
> 2022-12-14 11:18:22 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Successfully joined group with generation 
> Generation{generationId=640, 
> memberId='partition-3-my-client-id-my-group-id-c31afd19-3f22-43cb-ad07-9088aa98d3af',
>  protocol='cooperative-sticky'}
> 2022-12-14 11:18:22 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : Partitions 
> revoked: [partition-57]
> 2022-12-14 11:18:22 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-4-my-client-id-my-group-id, 
> groupId=my-group-id] Revoke previously assigned partitions partition-57
> 2022-12-14 11:18:22 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-4-my-client-id-my-group-id, 
> groupId=my-group-id] Updating assignment with\n\tAssigned partitions: 
> [partition-74]\n\tCurrent owned partitions: [partition-57]\n\tAdded 
> partitions (assigned - owned): [partition-74]\n\tRevoked partitions (owned - 
> assigned): [partition-57]
> 2022-12-14 11:18:21 1 — [id-1] o.a.k.c.c.internals.ConsumerCoordinator : 
> [Consumer clientId=partition-4-my-client-id-my-group-id, groupId=my-group-id] 
> Successfully synced group in generation Generation{generationId=640, 
> memberId='partition-4-my-client-id-my-group-id-ae2af665-edc9-4a8e-b658-98372d142477',
>  protocol='cooperative-sticky'}
> 2022-12-14 11:18:21 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-4-my-client-id-my-group-id, 
> groupId=my-group-id] Successfully joined group with generation 
> Generation{generationId=640, 
> memberId='partition-4-my-client-id-my-group-id-ae2af665-edc9-4a8e-b658-98372d142477',
>  protocol='cooperative-sticky'} {code}
> Is this expected?
> Kafka client version is 3.2.1.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to