[ 
https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17636769#comment-17636769
 ] 

Mikael commented on KAFKA-14362:
--------------------------------

I think I have identified the root cause of the duplication:

KafkaConsumer.poll(Duration) returns records from a previous fetch immediately 
even when a rebalance is initiated. Any offset commit before the rebalance is 
completed will fail, and if any of the committed offsets belong to a partition 
that is subsequently revoked from the consumer, offset commit will also fail if 
retried after the rebalance is completed (which is what spring-kafka does).

By contrast, the deprecated method KafkaConsumer.poll(long) awaits completion 
of rebalance before returning data.

I reran my application restart test using a spring-kafka snapshot which calls 
KafkaConsumer.poll(long) instead of KafkaConsumer.poll(Duration), and didn't 
see any duplications when repeating the test 10 times. When i ran the test 
previously it resulted in duplications 2-3 times out of 10.

As expected, I am also no longer seeing these logs after the change:
{code:java}
2022-11-17 07:27:27,149 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
[ConsumerCoordinator.java:1156] [Consumer 
clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
Failing OffsetCommit request since the consumer is not part of an active group 
{code}
The issue appears to be that records that have already been fetched are 
returned immediately before knowing whether the rebalance will revoke the 
partitions that those records belong to, since those records could potentially 
become stale.

In contrast, the deprecated method waits indefinitely for rebalancing to 
complete. Couldn't there instead be a timeout on waiting for the rebalance 
completion?

> Same message consumed by two consumers in the same group  after client restart
> ------------------------------------------------------------------------------
>
>                 Key: KAFKA-14362
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14362
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 3.1.1
>         Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64
>            Reporter: Mikael
>            Priority: Major
>
> Trigger scenario:
> Two Kafka client application instances on separate EC2 instances with one 
> consumer each, consuming from the same 8 partition topic using the same group 
> ID. Duplicate consumption of a handful of messages sometimes happens right 
> after one of the application instances has been restarted.
> Additional information:
> Messages are produced to the topic by a Kafka streams topology deployed on 
> four application instances. I have verified that each message is only 
> produced once by enabling debug logging in the topology flow right before 
> producing each message to the topic.
> Example logs below are from a test run when a batch of 11 messages were 
> consumed at 10:28:26,771 on the restarted instance and 9 of them were 
> consumed as part of a larger batch at 10:28:23,824 on the other instance. 
> Application shutdown was initiated at  10:27:13,086 and completed at 
> 10:27:15,164, startup was initiated at 10:28:05,029 and completed at 
> 10:28:37,491.
> Kafka consumer group logs after restart on the instance that was restarted:
>  
> {code:java}
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata 
> [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, 
> groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:853] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Discovered group coordinator 
> b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: 
> 2147483646 rack: null)
> 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:1000] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Request joining group due to: need to re-join with the given member-id
> 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:595] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully joined group with generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:761] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully synced group in generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:395] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Updating assignment with
>         Assigned partitions:                       []
>         Current owned partitions:                  []
>         Added partitions (assigned - owned):       []
>         Revoked partitions (owned - assigned):     []
> 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:279] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Notifying assignor about the new Assignment(partitions=[])
> 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:291] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Adding newly assigned partitions: 
> 2022-11-07 10:28:25,315 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:1000] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Request joining group due to: group is already rebalancing
> 2022-11-07 10:28:25,317 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:25,319 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:595] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully joined group with generation Generation{generationId=677, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:25,327 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:761] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully synced group in generation Generation{generationId=677, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:25,327 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:395] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Updating assignment with
>         Assigned partitions:                       
> [messages.xms.mt.batch.report-4, messages.xms.mt.batch.report-6, 
> messages.xms.mt.batch.report-5, messages.xms.mt.batch.report-7]
>         Current owned partitions:                  []
>         Added partitions (assigned - owned):       
> [messages.xms.mt.batch.report-4, messages.xms.mt.batch.report-6, 
> messages.xms.mt.batch.report-5, messages.xms.mt.batch.report-7]
>         Revoked partitions (owned - assigned):     []
> 2022-11-07 10:28:25,327 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:279] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Notifying assignor about the new 
> Assignment(partitions=[messages.xms.mt.batch.report-4, 
> messages.xms.mt.batch.report-5, messages.xms.mt.batch.report-6, 
> messages.xms.mt.batch.report-7])
> 2022-11-07 10:28:25,327 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:291] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Adding newly assigned partitions: messages.xms.mt.batch.report-4, 
> messages.xms.mt.batch.report-6, messages.xms.mt.batch.report-5, 
> messages.xms.mt.batch.report-7
> 2022-11-07 10:28:25,365 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:846] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Setting offset for partition messages.xms.mt.batch.report-4 to the committed 
> offset FetchPosition{offset=55456568, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[b-2.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094
>  (id: 2 rack: use1-az1)], epoch=55}}
> 2022-11-07 10:28:25,365 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:846] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Setting offset for partition messages.xms.mt.batch.report-6 to the committed 
> offset FetchPosition{offset=55817359, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094
>  (id: 1 rack: use1-az6)], epoch=53}}
> 2022-11-07 10:28:25,370 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:846] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Setting offset for partition messages.xms.mt.batch.report-5 to the committed 
> offset FetchPosition{offset=55700347, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[b-3.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094
>  (id: 3 rack: use1-az4)], epoch=53}}
> 2022-11-07 10:28:25,371 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:846] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Setting offset for partition messages.xms.mt.batch.report-7 to the committed 
> offset FetchPosition{offset=55642775, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[b-2.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094
>  (id: 2 rack: use1-az1)], epoch=55}} {code}
> Kafka consumer group logs before and after the restart on the other instance:
>  
>  
> {code:java}
> 2022-11-07 10:27:14,618 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:1000] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Request joining group due to: group is already rebalancing
> 2022-11-07 10:27:14,622 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:27:14,623 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:595] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully joined group with generation Generation{generationId=675, 
> memberId='consumer-xms-batch-mt-callback-3-22806c53-d6ce-4f58-b86d-65496f7a63bd',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:27:14,623 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:654] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Finished assignment for group at generation 675: 
> {consumer-xms-batch-mt-callback-3-22806c53-d6ce-4f58-b86d-65496f7a63bd=Assignment(partitions=[messages.xms.mt.batch.report-4,
>  messages.xms.mt.batch.report-5, messages.xms.mt.batch.report-6, 
> messages.xms.mt.batch.report-7, messages.xms.mt.batch.report-0, 
> messages.xms.mt.batch.report-1, messages.xms.mt.batch.report-2, 
> messages.xms.mt.batch.report-3])}
> 2022-11-07 10:27:14,627 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:761] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully synced group in generation Generation{generationId=675, 
> memberId='consumer-xms-batch-mt-callback-3-22806c53-d6ce-4f58-b86d-65496f7a63bd',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:27:14,627 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:395] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Updating assignment with
>         Assigned partitions:                       
> [messages.xms.mt.batch.report-4, messages.xms.mt.batch.report-3, 
> messages.xms.mt.batch.report-6, messages.xms.mt.batch.report-5, 
> messages.xms.mt.batch.report-0, messages.xms.mt.batch.report-2, 
> messages.xms.mt.batch.report-1, messages.xms.mt.batch.report-7]
>         Current owned partitions:                  
> [messages.xms.mt.batch.report-4, messages.xms.mt.batch.report-6, 
> messages.xms.mt.batch.report-5, messages.xms.mt.batch.report-7]
>         Added partitions (assigned - owned):       
> [messages.xms.mt.batch.report-3, messages.xms.mt.batch.report-0, 
> messages.xms.mt.batch.report-2, messages.xms.mt.batch.report-1]
>         Revoked partitions (owned - assigned):     []
> 2022-11-07 10:27:14,627 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:279] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Notifying assignor about the new 
> Assignment(partitions=[messages.xms.mt.batch.report-4, 
> messages.xms.mt.batch.report-5, messages.xms.mt.batch.report-6, 
> messages.xms.mt.batch.report-7, messages.xms.mt.batch.report-0, 
> messages.xms.mt.batch.report-1, messages.xms.mt.batch.report-2, 
> messages.xms.mt.batch.report-3])
> 2022-11-07 10:27:14,627 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:291] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Adding newly assigned partitions: messages.xms.mt.batch.report-3, 
> messages.xms.mt.batch.report-0, messages.xms.mt.batch.report-2, 
> messages.xms.mt.batch.report-1
> 2022-11-07 10:27:14,627 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:846] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Setting offset for partition messages.xms.mt.batch.report-3 to the committed 
> offset FetchPosition{offset=55784517, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094
>  (id: 1 rack: use1-az6)], epoch=53}}
> 2022-11-07 10:27:14,627 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:846] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Setting offset for partition messages.xms.mt.batch.report-0 to the committed 
> offset FetchPosition{offset=55502621, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[b-3.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094
>  (id: 3 rack: use1-az4)], epoch=66}}
> 2022-11-07 10:27:14,627 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:846] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Setting offset for partition messages.xms.mt.batch.report-2 to the committed 
> offset FetchPosition{offset=55713067, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[b-3.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094
>  (id: 3 rack: use1-az4)], epoch=54}}
> 2022-11-07 10:27:14,627 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:846] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Setting offset for partition messages.xms.mt.batch.report-1 to the committed 
> offset FetchPosition{offset=55457508, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[b-2.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094
>  (id: 2 rack: use1-az1)], epoch=55}}
> 2022-11-07 10:28:23,810 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:1000] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Request joining group due to: group is already rebalancing
> 2022-11-07 10:28:23,813 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,824 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:1156] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Failing OffsetCommit request since the consumer is not part of an active group
> 2022-11-07 10:28:23,824 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:595] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully joined group with generation Generation
> {generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-22806c53-d6ce-4f58-b86d-65496f7a63bd',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,825 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:654] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Finished assignment for group at generation 676: 
> {consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243=Assignment(partitions=[]),
>  
> consumer-xms-batch-mt-callback-3-22806c53-d6ce-4f58-b86d-65496f7a63bd=Assignment(partitions=[messages.xms.mt.batch.report-0,
>  messages.xms.mt.batch.report-1, messages.xms.mt.batch.report-2, 
> messages.xms.mt.batch.report-3])}
> 2022-11-07 10:28:23,828 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:761] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully synced group in generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-22806c53-d6ce-4f58-b86d-65496f7a63bd',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,829 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:395] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Updating assignment with
>         Assigned partitions:                       
> [messages.xms.mt.batch.report-3, messages.xms.mt.batch.report-0, 
> messages.xms.mt.batch.report-2, messages.xms.mt.batch.report-1]
>         Current owned partitions:                  
> [messages.xms.mt.batch.report-4, messages.xms.mt.batch.report-3, 
> messages.xms.mt.batch.report-6, messages.xms.mt.batch.report-5, 
> messages.xms.mt.batch.report-0, messages.xms.mt.batch.report-2, 
> messages.xms.mt.batch.report-1, messages.xms.mt.batch.report-7]
>         Added partitions (assigned - owned):       []
>         Revoked partitions (owned - assigned):     
> [messages.xms.mt.batch.report-4, messages.xms.mt.batch.report-6, 
> messages.xms.mt.batch.report-5, messages.xms.mt.batch.report-7]
> 2022-11-07 10:28:23,829 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:310] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Revoke previously assigned partitions messages.xms.mt.batch.report-4, 
> messages.xms.mt.batch.report-6, messages.xms.mt.batch.report-5, 
> messages.xms.mt.batch.report-7
> 2022-11-07 10:28:23,830 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:1000] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Request joining group due to: need to revoke partitions 
> [messages.xms.mt.batch.report-4, messages.xms.mt.batch.report-6, 
> messages.xms.mt.batch.report-5, messages.xms.mt.batch.report-7] as indicated 
> by the current assignment and re-join
> 2022-11-07 10:28:23,830 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:279] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Notifying assignor about the new 
> Assignment(partitions=[messages.xms.mt.batch.report-0, 
> messages.xms.mt.batch.report-1, messages.xms.mt.batch.report-2, 
> messages.xms.mt.batch.report-3])
> 2022-11-07 10:28:23,830 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:291] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Adding newly assigned partitions: 
> 2022-11-07 10:28:23,830 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,849 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:1156] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Failing OffsetCommit request since the consumer is not part of an active group
> …
> Repeated 211 times
> …
> 2022-11-07 10:28:25,342 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:1156] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Failing OffsetCommit request since the consumer is not part of an active group
> 2022-11-07 10:28:25,342 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:595] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully joined group with generation Generation{generationId=677, 
> memberId='consumer-xms-batch-mt-callback-3-22806c53-d6ce-4f58-b86d-65496f7a63bd',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:25,343 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:654] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Finished assignment for group at generation 677: 
> {consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243=Assignment(partitions=[messages.xms.mt.batch.report-4,
>  messages.xms.mt.batch.report-5, messages.xms.mt.batch.report-6, 
> messages.xms.mt.batch.report-7]), 
> consumer-xms-batch-mt-callback-3-22806c53-d6ce-4f58-b86d-65496f7a63bd=Assignment(partitions=[messages.xms.mt.batch.report-0,
>  messages.xms.mt.batch.report-1, messages.xms.mt.batch.report-2, 
> messages.xms.mt.batch.report-3])}
> 2022-11-07 10:28:25,347 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:761] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully synced group in generation Generation{generationId=677, 
> memberId='consumer-xms-batch-mt-callback-3-22806c53-d6ce-4f58-b86d-65496f7a63bd',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:25,347 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:395] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Updating assignment with
>         Assigned partitions:                       
> [messages.xms.mt.batch.report-3, messages.xms.mt.batch.report-0, 
> messages.xms.mt.batch.report-2, messages.xms.mt.batch.report-1]
>         Current owned partitions:                  
> [messages.xms.mt.batch.report-3, messages.xms.mt.batch.report-0, 
> messages.xms.mt.batch.report-2, messages.xms.mt.batch.report-1]
>         Added partitions (assigned - owned):       []
>         Revoked partitions (owned - assigned):     []
> 2022-11-07 10:28:25,347 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:279] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Notifying assignor about the new 
> Assignment(partitions=[messages.xms.mt.batch.report-0, 
> messages.xms.mt.batch.report-1, messages.xms.mt.batch.report-2, 
> messages.xms.mt.batch.report-3])
> 2022-11-07 10:28:25,347 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:291] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Adding newly assigned partitions:  {code}
> Consumer configuration on restarted instance:
>  
>  
> {code:java}
>         allow.auto.create.topics = true
>         auto.commit.interval.ms = 5000
>         auto.offset.reset = earliest
>         bootstrap.servers = 
> [b-3.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094, 
> b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094, 
> b-2.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094]
>         check.crcs = true
>         client.dns.lookup = use_all_dns_ips
>         client.id = consumer-xms-batch-mt-callback-3
>         client.rack = 
>         connections.max.idle.ms = 540000
>         default.api.timeout.ms = 60000
>         enable.auto.commit = false
>         exclude.internal.topics = true
>         fetch.max.bytes = 52428800
>         fetch.max.wait.ms = 500
>         fetch.min.bytes = 1
>         group.id = xms-batch-mt-callback
>         group.instance.id = null
>         heartbeat.interval.ms = 1500
>         interceptor.classes = []
>         internal.leave.group.on.close = true
>         internal.throw.on.fetch.stable.offset.unsupported = false
>         isolation.level = read_committed
>         key.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
>         max.partition.fetch.bytes = 1048576
>         max.poll.interval.ms = 300000
>         max.poll.records = 1000
>         metadata.max.age.ms = 300000
>         metric.reporters = []
>         metrics.num.samples = 2
>         metrics.recording.level = INFO
>         metrics.sample.window.ms = 30000
>         partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
>         receive.buffer.bytes = 65536
>         reconnect.backoff.max.ms = 1000
>         reconnect.backoff.ms = 50
>         request.timeout.ms = 30000
>         retry.backoff.ms = 100
>         sasl.client.callback.handler.class = null
>         sasl.jaas.config = null
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>         sasl.kerberos.min.time.before.relogin = 60000
>         sasl.kerberos.service.name = null
>         sasl.kerberos.ticket.renew.jitter = 0.05
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>         sasl.login.callback.handler.class = null
>         sasl.login.class = null
>         sasl.login.connect.timeout.ms = null
>         sasl.login.read.timeout.ms = null
>         sasl.login.refresh.buffer.seconds = 300
>         sasl.login.refresh.min.period.seconds = 60
>         sasl.login.refresh.window.factor = 0.8
>         sasl.login.refresh.window.jitter = 0.05
>         sasl.login.retry.backoff.max.ms = 10000
>         sasl.login.retry.backoff.ms = 100
>         sasl.mechanism = GSSAPI
>         sasl.oauthbearer.clock.skew.seconds = 30
>         sasl.oauthbearer.expected.audience = null
>         sasl.oauthbearer.expected.issuer = null
>         sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
>         sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
>         sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
>         sasl.oauthbearer.jwks.endpoint.url = null
>         sasl.oauthbearer.scope.claim.name = scope
>         sasl.oauthbearer.sub.claim.name = sub
>         sasl.oauthbearer.token.endpoint.url = null
>         security.protocol = SSL
>         security.providers = null
>         send.buffer.bytes = 131072
>         session.timeout.ms = 6000
>         socket.connection.setup.timeout.max.ms = 30000
>         socket.connection.setup.timeout.ms = 10000
>         ssl.cipher.suites = null
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
>         ssl.endpoint.identification.algorithm = https
>         ssl.engine.factory.class = null
>         ssl.key.password = null
>         ssl.keymanager.algorithm = SunX509
>         ssl.keystore.certificate.chain = null
>         ssl.keystore.key = null
>         ssl.keystore.location = /opt/apps/msl/xms-gateway/conf/xms.us1tst.jks
>         ssl.keystore.password = [hidden]
>         ssl.keystore.type = JKS
>         ssl.protocol = TLSv1.3
>         ssl.provider = null
>         ssl.secure.random.implementation = null
>         ssl.trustmanager.algorithm = PKIX
>         ssl.truststore.certificates = null
>         ssl.truststore.location = null
>         ssl.truststore.password = null
>         ssl.truststore.type = JKS
>         value.deserializer = class 
> com.sinch.xms.model.kafka.serdes.MtBatchDeliveryReportDeserializer {code}
> Consumer configuration on the other instance:
> {code:java}
>         allow.auto.create.topics = true
>         auto.commit.interval.ms = 5000
>         auto.offset.reset = earliest
>         bootstrap.servers = 
> [b-3.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094, 
> b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094, 
> b-2.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094]
>         check.crcs = true
>         client.dns.lookup = use_all_dns_ips
>         client.id = consumer-xms-batch-mt-callback-3
>         client.rack = 
>         connections.max.idle.ms = 540000
>         default.api.timeout.ms = 60000
>         enable.auto.commit = false
>         exclude.internal.topics = true
>         fetch.max.bytes = 52428800
>         fetch.max.wait.ms = 500
>         fetch.min.bytes = 1
>         group.id = xms-batch-mt-callback
>         group.instance.id = null
>         heartbeat.interval.ms = 1500
>         interceptor.classes = []
>         internal.leave.group.on.close = true
>         internal.throw.on.fetch.stable.offset.unsupported = false
>         isolation.level = read_committed
>         key.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
>         max.partition.fetch.bytes = 1048576
>         max.poll.interval.ms = 300000
>         max.poll.records = 1000
>         metadata.max.age.ms = 300000
>         metric.reporters = []
>         metrics.num.samples = 2
>         metrics.recording.level = INFO
>         metrics.sample.window.ms = 30000
>         partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
>         receive.buffer.bytes = 65536
>         reconnect.backoff.max.ms = 1000
>         reconnect.backoff.ms = 50
>         request.timeout.ms = 30000
>         retry.backoff.ms = 100
>         sasl.client.callback.handler.class = null
>         sasl.jaas.config = null
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>         sasl.kerberos.min.time.before.relogin = 60000
>         sasl.kerberos.service.name = null
>         sasl.kerberos.ticket.renew.jitter = 0.05
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>         sasl.login.callback.handler.class = null
>         sasl.login.class = null
>         sasl.login.connect.timeout.ms = null
>         sasl.login.read.timeout.ms = null
>         sasl.login.refresh.buffer.seconds = 300
>         sasl.login.refresh.min.period.seconds = 60
>         sasl.login.refresh.window.factor = 0.8
>         sasl.login.refresh.window.jitter = 0.05
>         sasl.login.retry.backoff.max.ms = 10000
>         sasl.login.retry.backoff.ms = 100
>         sasl.mechanism = GSSAPI
>         sasl.oauthbearer.clock.skew.seconds = 30
>         sasl.oauthbearer.expected.audience = null
>         sasl.oauthbearer.expected.issuer = null
>         sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
>         sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
>         sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
>         sasl.oauthbearer.jwks.endpoint.url = null
>         sasl.oauthbearer.scope.claim.name = scope
>         sasl.oauthbearer.sub.claim.name = sub
>         sasl.oauthbearer.token.endpoint.url = null
>         security.protocol = SSL
>         security.providers = null
>         send.buffer.bytes = 131072
>         session.timeout.ms = 6000
>         socket.connection.setup.timeout.max.ms = 30000
>         socket.connection.setup.timeout.ms = 10000
>         ssl.cipher.suites = null
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
>         ssl.endpoint.identification.algorithm = https
>         ssl.engine.factory.class = null
>         ssl.key.password = null
>         ssl.keymanager.algorithm = SunX509
>         ssl.keystore.certificate.chain = null
>         ssl.keystore.key = null
>         ssl.keystore.location = /opt/apps/msl/xms-gateway/conf/xms.us1tst.jks
>         ssl.keystore.password = [hidden]
>         ssl.keystore.type = JKS
>         ssl.protocol = TLSv1.3
>         ssl.provider = null
>         ssl.secure.random.implementation = null
>         ssl.trustmanager.algorithm = PKIX
>         ssl.truststore.certificates = null
>         ssl.truststore.location = null
>         ssl.truststore.password = null
>         ssl.truststore.type = JKS
>         value.deserializer = class 
> com.sinch.xms.model.kafka.serdes.MtBatchDeliveryReportDeserializer {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to