[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart

2022-11-07 Thread Mikael (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17629832#comment-17629832
 ] 

Mikael commented on KAFKA-14362:


The main thing that has caught my attention is the tight loop of 'Failing 
OffsetCommit request since the consumer is not part of an active group' 
messages for the consumer that is not restarted. Could it have given up on 
committing the offset?

> Same message consumed by two consumers in the same group  after client restart
> --
>
> Key: KAFKA-14362
> URL: https://issues.apache.org/jira/browse/KAFKA-14362
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.1.1
> Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64
>Reporter: Mikael
>Priority: Major
>
> Trigger scenario:
> Two Kafka client application instances on separate EC2 instances with one 
> consumer each, consuming from the same 8 partition topic using the same group 
> ID. Duplicate consumption of a handful of messages sometimes happens right 
> after one of the application instances has been restarted.
> Additional information:
> Messages are produced to the topic by a Kafka streams topology deployed on 
> four application instances. I have verified that each message is only 
> produced once by enabling debug logging in the topology flow right before 
> producing each message to the topic.
> Example logs below are from a test run when a batch of 11 messages were 
> consumed at 10:28:26,771 on the restarted instance and 9 of them were 
> consumed as part of a larger batch at 10:28:23,824 on the other instance. 
> Application shutdown was initiated at  10:27:13,086 and completed at 
> 10:27:15,164, startup was initiated at 10:28:05,029 and completed at 
> 10:28:37,491.
> Kafka consumer group logs after restart on the instance that was restarted:
>  
> {code:java}
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata 
> [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, 
> groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:853] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Discovered group coordinator 
> b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: 
> 2147483646 rack: null)
> 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:1000] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Request joining group due to: need to re-join with the given member-id
> 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:595] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully joined group with generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:761] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully synced group in generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:395] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Updating assignment with
>         Assigned partitions:                       []
>         Current owned partitions:                  []
>         Added partitions (assigned - owned):       []
>         Revoked partitions (owned - assigned):     []
> 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:279] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Notifying assignor about the new Assignment(partitions=[])
> 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:291] [Consumer 
> cl

[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart

2022-11-08 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17630717#comment-17630717
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14362:


Hey [~Carlstedt] you're right about why this is happening, because of the 
restart a rebalance is kicked off which means that any further attempts to 
commit offsets by other members of the group will fail. After a rebalance, if 
for example a partition is reassigned from consumer A to consumer B, then 
consumer B knows where to pick up and resume processing by seeking to the 
position that corresponds with the latest committed offsets for that partition.

 

If consumer A had processed a message right before the rebalance but then 
failed to commit an offset for it, this message will be reprocessed by consumer 
B – essentially to Kafka it looks like this message was never fully processed 
by A and therefore B should make sure to process it again. 

 

In other words, this is the intended behavior – there's no guarantee that a 
message will only be _consumed_ once, instead Kafka guarantees _at least once_ 
processing semantics – this is to make sure that every record is fully 
processed and handled by your application logic before moving on to the next 
record. Otherwise it wouldn't be fault tolerant, ie you might lose a record 
completely for example if the app crashed immediately after the record was 
polled from the consumer

> Same message consumed by two consumers in the same group  after client restart
> --
>
> Key: KAFKA-14362
> URL: https://issues.apache.org/jira/browse/KAFKA-14362
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.1.1
> Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64
>Reporter: Mikael
>Priority: Major
>
> Trigger scenario:
> Two Kafka client application instances on separate EC2 instances with one 
> consumer each, consuming from the same 8 partition topic using the same group 
> ID. Duplicate consumption of a handful of messages sometimes happens right 
> after one of the application instances has been restarted.
> Additional information:
> Messages are produced to the topic by a Kafka streams topology deployed on 
> four application instances. I have verified that each message is only 
> produced once by enabling debug logging in the topology flow right before 
> producing each message to the topic.
> Example logs below are from a test run when a batch of 11 messages were 
> consumed at 10:28:26,771 on the restarted instance and 9 of them were 
> consumed as part of a larger batch at 10:28:23,824 on the other instance. 
> Application shutdown was initiated at  10:27:13,086 and completed at 
> 10:27:15,164, startup was initiated at 10:28:05,029 and completed at 
> 10:28:37,491.
> Kafka consumer group logs after restart on the instance that was restarted:
>  
> {code:java}
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata 
> [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, 
> groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:853] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Discovered group coordinator 
> b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: 
> 2147483646 rack: null)
> 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:1000] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Request joining group due to: need to re-join with the given member-id
> 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:595] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully joined group with generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:761] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> 

[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart

2022-11-09 Thread Mikael (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17631265#comment-17631265
 ] 

Mikael commented on KAFKA-14362:


I would have thought that an orderly rebalance wouldn't cause any duplication. 
I understand that an uncontrolled restart can cause duplication, but in this 
case it's a consumer that just leaves the group and then joins it again later. 
Surely it can't be expected behaviour to randomly duplicate messages in that 
use case? It's also strange that the presumable offset commit failure is not 
reported as a warning or error level log entry.

> Same message consumed by two consumers in the same group  after client restart
> --
>
> Key: KAFKA-14362
> URL: https://issues.apache.org/jira/browse/KAFKA-14362
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.1.1
> Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64
>Reporter: Mikael
>Priority: Major
>
> Trigger scenario:
> Two Kafka client application instances on separate EC2 instances with one 
> consumer each, consuming from the same 8 partition topic using the same group 
> ID. Duplicate consumption of a handful of messages sometimes happens right 
> after one of the application instances has been restarted.
> Additional information:
> Messages are produced to the topic by a Kafka streams topology deployed on 
> four application instances. I have verified that each message is only 
> produced once by enabling debug logging in the topology flow right before 
> producing each message to the topic.
> Example logs below are from a test run when a batch of 11 messages were 
> consumed at 10:28:26,771 on the restarted instance and 9 of them were 
> consumed as part of a larger batch at 10:28:23,824 on the other instance. 
> Application shutdown was initiated at  10:27:13,086 and completed at 
> 10:27:15,164, startup was initiated at 10:28:05,029 and completed at 
> 10:28:37,491.
> Kafka consumer group logs after restart on the instance that was restarted:
>  
> {code:java}
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata 
> [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, 
> groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:853] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Discovered group coordinator 
> b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: 
> 2147483646 rack: null)
> 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:1000] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Request joining group due to: need to re-join with the given member-id
> 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:595] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully joined group with generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:761] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully synced group in generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:395] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Updating assignment with
>         Assigned partitions:                       []
>         Current owned partitions:                  []
>         Added partitions (assigned - owned):       []
>         Revoked partitions (owned - assigned):     []
> 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:279] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callba

[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart

2022-11-10 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17632082#comment-17632082
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14362:


{quote}I would have thought that an orderly rebalance wouldn't cause any 
duplication
{quote}
Well in general it shouldn't, actually, because even if the offset commit 
fails/is preventing while the rebalance is in progress, if any partitions are 
migrated from one consumer to another then the original owner will get the 
chance to finish committing when it revokes those partitions – ie during the 
ConsumerRebalanceListener#onPartitionsRevoked callback.

Now that I think of it, that's probably the issue you're experiencing: not 
committing offsets on revocation. If you use the default rebalance 
listener/don't explicitly pass one in when subscribing to topics, Kafka will 
default to a "no-op" listener that does not do this commit in the callback. Tbh 
this feels like a poor design decision but it comes down to a core design 
philosophy: to guarantee every record gets fully processed "at least once", 
rather than "at most once"

 

Anyways, if you're not already utilizing a rebalance listener to commit offsets 
when partitions are revoked, I would definitely start there.

> Same message consumed by two consumers in the same group  after client restart
> --
>
> Key: KAFKA-14362
> URL: https://issues.apache.org/jira/browse/KAFKA-14362
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.1.1
> Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64
>Reporter: Mikael
>Priority: Major
>
> Trigger scenario:
> Two Kafka client application instances on separate EC2 instances with one 
> consumer each, consuming from the same 8 partition topic using the same group 
> ID. Duplicate consumption of a handful of messages sometimes happens right 
> after one of the application instances has been restarted.
> Additional information:
> Messages are produced to the topic by a Kafka streams topology deployed on 
> four application instances. I have verified that each message is only 
> produced once by enabling debug logging in the topology flow right before 
> producing each message to the topic.
> Example logs below are from a test run when a batch of 11 messages were 
> consumed at 10:28:26,771 on the restarted instance and 9 of them were 
> consumed as part of a larger batch at 10:28:23,824 on the other instance. 
> Application shutdown was initiated at  10:27:13,086 and completed at 
> 10:27:15,164, startup was initiated at 10:28:05,029 and completed at 
> 10:28:37,491.
> Kafka consumer group logs after restart on the instance that was restarted:
>  
> {code:java}
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata 
> [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, 
> groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:853] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Discovered group coordinator 
> b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: 
> 2147483646 rack: null)
> 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:1000] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Request joining group due to: need to re-join with the given member-id
> 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:595] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully joined group with generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:761] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully synced group in generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  prot

[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart

2022-11-14 Thread Mikael (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17633763#comment-17633763
 ] 

Mikael commented on KAFKA-14362:


We are using KafkaMessageListenerContainer from spring-kafka, which registers a 
ConsumerRebalanceListener that commits all offsets in its onPartitionsRevoked() 
method. I have now changed the application logic to always commit offsets 
synchronously in the same thread that calls Consumer.poll(), but the 
duplication is still happening.

While trying to understand the Kafka consumer source code, I have one doubt: 
although no new records are fetched from a partition that has been revoked, 
what happens with the records from revoked partitons that have already been 
fetched but not yet returned by Consumer.poll()? Are they filtered out from the 
next poll? Otherwise I imagine they could result in duplication.

> Same message consumed by two consumers in the same group  after client restart
> --
>
> Key: KAFKA-14362
> URL: https://issues.apache.org/jira/browse/KAFKA-14362
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.1.1
> Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64
>Reporter: Mikael
>Priority: Major
>
> Trigger scenario:
> Two Kafka client application instances on separate EC2 instances with one 
> consumer each, consuming from the same 8 partition topic using the same group 
> ID. Duplicate consumption of a handful of messages sometimes happens right 
> after one of the application instances has been restarted.
> Additional information:
> Messages are produced to the topic by a Kafka streams topology deployed on 
> four application instances. I have verified that each message is only 
> produced once by enabling debug logging in the topology flow right before 
> producing each message to the topic.
> Example logs below are from a test run when a batch of 11 messages were 
> consumed at 10:28:26,771 on the restarted instance and 9 of them were 
> consumed as part of a larger batch at 10:28:23,824 on the other instance. 
> Application shutdown was initiated at  10:27:13,086 and completed at 
> 10:27:15,164, startup was initiated at 10:28:05,029 and completed at 
> 10:28:37,491.
> Kafka consumer group logs after restart on the instance that was restarted:
>  
> {code:java}
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata 
> [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, 
> groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:853] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Discovered group coordinator 
> b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: 
> 2147483646 rack: null)
> 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:1000] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Request joining group due to: need to re-join with the given member-id
> 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:595] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully joined group with generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:761] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully synced group in generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:395] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Updating assignment with
>         Assigned partitions:                       []
>         Current owned partitions:                  []
>         Added partitions (assig

[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart

2022-11-17 Thread Mikael (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17635370#comment-17635370
 ] 

Mikael commented on KAFKA-14362:


When comparing successful test runs with those that generate duplicate 
consumption of messages, I noticed that in the success case, for the consumer 
that gets some of its partitions revoked, there is FIRST this log message:
{code:java}
2022-11-17 07:27:27,141 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
[ConsumerCoordinator.java:395] [Consumer 
clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
Updating assignment with
        Assigned partitions:                       
[messages.xms.mt.batch.report-3, messages.xms.mt.batch.report-0, 
messages.xms.mt.batch.report-2, messages.xms.mt.batch.report-1]
        Current owned partitions:                  
[messages.xms.mt.batch.report-4, messages.xms.mt.batch.report-3, 
messages.xms.mt.batch.report-6, messages.xms.mt.batch.report-5, 
messages.xms.mt.batch.report-0, messages.xms.mt.batch.report-2, m
essages.xms.mt.batch.report-1, messages.xms.mt.batch.report-7]
        Added partitions (assigned - owned):       []
        Revoked partitions (owned - assigned):     
[messages.xms.mt.batch.report-4, messages.xms.mt.batch.report-6, 
messages.xms.mt.batch.report-5, messages.xms.mt.batch.report-7] {code}
and THEN a large number of this type of log message:
{code:java}
2022-11-17 07:27:27,149 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
[ConsumerCoordinator.java:1156] [Consumer 
clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
Failing OffsetCommit request since the consumer is not part of an active group 
{code}
whereas in the duplication case, at least one of the latter message is logged 
BEFORE the former one.

> Same message consumed by two consumers in the same group  after client restart
> --
>
> Key: KAFKA-14362
> URL: https://issues.apache.org/jira/browse/KAFKA-14362
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.1.1
> Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64
>Reporter: Mikael
>Priority: Major
>
> Trigger scenario:
> Two Kafka client application instances on separate EC2 instances with one 
> consumer each, consuming from the same 8 partition topic using the same group 
> ID. Duplicate consumption of a handful of messages sometimes happens right 
> after one of the application instances has been restarted.
> Additional information:
> Messages are produced to the topic by a Kafka streams topology deployed on 
> four application instances. I have verified that each message is only 
> produced once by enabling debug logging in the topology flow right before 
> producing each message to the topic.
> Example logs below are from a test run when a batch of 11 messages were 
> consumed at 10:28:26,771 on the restarted instance and 9 of them were 
> consumed as part of a larger batch at 10:28:23,824 on the other instance. 
> Application shutdown was initiated at  10:27:13,086 and completed at 
> 10:27:15,164, startup was initiated at 10:28:05,029 and completed at 
> 10:28:37,491.
> Kafka consumer group logs after restart on the instance that was restarted:
>  
> {code:java}
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata 
> [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, 
> groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:853] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Discovered group coordinator 
> b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: 
> 2147483646 rack: null)
> 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:1000] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Request joining group due to: need to re-join with the given member-id
> 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:595] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully joined group w

[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart

2022-11-18 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17635833#comment-17635833
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14362:


Do you ever see anything about partitions being "lost"? Or rebalances 
failing/needing to be retried or restarted in the duplicates case?

> Same message consumed by two consumers in the same group  after client restart
> --
>
> Key: KAFKA-14362
> URL: https://issues.apache.org/jira/browse/KAFKA-14362
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.1.1
> Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64
>Reporter: Mikael
>Priority: Major
>
> Trigger scenario:
> Two Kafka client application instances on separate EC2 instances with one 
> consumer each, consuming from the same 8 partition topic using the same group 
> ID. Duplicate consumption of a handful of messages sometimes happens right 
> after one of the application instances has been restarted.
> Additional information:
> Messages are produced to the topic by a Kafka streams topology deployed on 
> four application instances. I have verified that each message is only 
> produced once by enabling debug logging in the topology flow right before 
> producing each message to the topic.
> Example logs below are from a test run when a batch of 11 messages were 
> consumed at 10:28:26,771 on the restarted instance and 9 of them were 
> consumed as part of a larger batch at 10:28:23,824 on the other instance. 
> Application shutdown was initiated at  10:27:13,086 and completed at 
> 10:27:15,164, startup was initiated at 10:28:05,029 and completed at 
> 10:28:37,491.
> Kafka consumer group logs after restart on the instance that was restarted:
>  
> {code:java}
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata 
> [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, 
> groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:853] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Discovered group coordinator 
> b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: 
> 2147483646 rack: null)
> 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:1000] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Request joining group due to: need to re-join with the given member-id
> 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:595] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully joined group with generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:761] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully synced group in generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:395] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Updating assignment with
>         Assigned partitions:                       []
>         Current owned partitions:                  []
>         Added partitions (assigned - owned):       []
>         Revoked partitions (owned - assigned):     []
> 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:279] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Notifying assignor about the new Assignment(partitions=[])
> 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:291] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Ad

[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart

2022-11-18 Thread Mikael (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17635921#comment-17635921
 ] 

Mikael commented on KAFKA-14362:


No, no errors or warnings related to rebalancing. I'm currently trying to work 
around it by implementing a ConsumerRebalanceListener and validating that a 
batch doesn't contain any records for revoked partitions before consuming it.

> Same message consumed by two consumers in the same group  after client restart
> --
>
> Key: KAFKA-14362
> URL: https://issues.apache.org/jira/browse/KAFKA-14362
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.1.1
> Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64
>Reporter: Mikael
>Priority: Major
>
> Trigger scenario:
> Two Kafka client application instances on separate EC2 instances with one 
> consumer each, consuming from the same 8 partition topic using the same group 
> ID. Duplicate consumption of a handful of messages sometimes happens right 
> after one of the application instances has been restarted.
> Additional information:
> Messages are produced to the topic by a Kafka streams topology deployed on 
> four application instances. I have verified that each message is only 
> produced once by enabling debug logging in the topology flow right before 
> producing each message to the topic.
> Example logs below are from a test run when a batch of 11 messages were 
> consumed at 10:28:26,771 on the restarted instance and 9 of them were 
> consumed as part of a larger batch at 10:28:23,824 on the other instance. 
> Application shutdown was initiated at  10:27:13,086 and completed at 
> 10:27:15,164, startup was initiated at 10:28:05,029 and completed at 
> 10:28:37,491.
> Kafka consumer group logs after restart on the instance that was restarted:
>  
> {code:java}
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata 
> [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, 
> groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:853] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Discovered group coordinator 
> b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: 
> 2147483646 rack: null)
> 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:1000] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Request joining group due to: need to re-join with the given member-id
> 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:595] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully joined group with generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:761] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully synced group in generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:395] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Updating assignment with
>         Assigned partitions:                       []
>         Current owned partitions:                  []
>         Added partitions (assigned - owned):       []
>         Revoked partitions (owned - assigned):     []
> 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:279] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Notifying assignor about the new Assignment(partitions=[])
> 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:291] [Consumer 
> clientId=consu

[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart

2022-11-18 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17636116#comment-17636116
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14362:


Well I mean, regardless of whatever is happening here, failures obviously do 
happen so your handling logic would need to consider the possibility of 
re-consuming offsets anyways.

If you're interested in an example of how to implement exactly-once semantics, 
Kafka Streams does exactly this via transactional producers. There are probably 
hundreds of docs and blog posts on how it works by now. But of course this only 
helps/works when the record processing terminates in another topic

> Same message consumed by two consumers in the same group  after client restart
> --
>
> Key: KAFKA-14362
> URL: https://issues.apache.org/jira/browse/KAFKA-14362
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.1.1
> Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64
>Reporter: Mikael
>Priority: Major
>
> Trigger scenario:
> Two Kafka client application instances on separate EC2 instances with one 
> consumer each, consuming from the same 8 partition topic using the same group 
> ID. Duplicate consumption of a handful of messages sometimes happens right 
> after one of the application instances has been restarted.
> Additional information:
> Messages are produced to the topic by a Kafka streams topology deployed on 
> four application instances. I have verified that each message is only 
> produced once by enabling debug logging in the topology flow right before 
> producing each message to the topic.
> Example logs below are from a test run when a batch of 11 messages were 
> consumed at 10:28:26,771 on the restarted instance and 9 of them were 
> consumed as part of a larger batch at 10:28:23,824 on the other instance. 
> Application shutdown was initiated at  10:27:13,086 and completed at 
> 10:27:15,164, startup was initiated at 10:28:05,029 and completed at 
> 10:28:37,491.
> Kafka consumer group logs after restart on the instance that was restarted:
>  
> {code:java}
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata 
> [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, 
> groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:853] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Discovered group coordinator 
> b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: 
> 2147483646 rack: null)
> 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:1000] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Request joining group due to: need to re-join with the given member-id
> 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:595] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully joined group with generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:761] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully synced group in generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:395] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Updating assignment with
>         Assigned partitions:                       []
>         Current owned partitions:                  []
>         Added partitions (assigned - owned):       []
>         Revoked partitions (owned - assigned):     []
> 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:279] [C

[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart

2022-11-19 Thread Mikael (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17636182#comment-17636182
 ] 

Mikael commented on KAFKA-14362:


Workaround didn't help. It seems that the first failing offset commit happens 
before onPartitionsRevoked() is called, due to rebalance in progress. But if 
rebalance is in progress, preventing offsets from being committed, then 
shouldn't KafkaConsumer.poll() return an empty list rather than records that 
can't be committed after consumption?

> Same message consumed by two consumers in the same group  after client restart
> --
>
> Key: KAFKA-14362
> URL: https://issues.apache.org/jira/browse/KAFKA-14362
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.1.1
> Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64
>Reporter: Mikael
>Priority: Major
>
> Trigger scenario:
> Two Kafka client application instances on separate EC2 instances with one 
> consumer each, consuming from the same 8 partition topic using the same group 
> ID. Duplicate consumption of a handful of messages sometimes happens right 
> after one of the application instances has been restarted.
> Additional information:
> Messages are produced to the topic by a Kafka streams topology deployed on 
> four application instances. I have verified that each message is only 
> produced once by enabling debug logging in the topology flow right before 
> producing each message to the topic.
> Example logs below are from a test run when a batch of 11 messages were 
> consumed at 10:28:26,771 on the restarted instance and 9 of them were 
> consumed as part of a larger batch at 10:28:23,824 on the other instance. 
> Application shutdown was initiated at  10:27:13,086 and completed at 
> 10:27:15,164, startup was initiated at 10:28:05,029 and completed at 
> 10:28:37,491.
> Kafka consumer group logs after restart on the instance that was restarted:
>  
> {code:java}
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata 
> [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, 
> groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:853] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Discovered group coordinator 
> b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: 
> 2147483646 rack: null)
> 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:1000] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Request joining group due to: need to re-join with the given member-id
> 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:595] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully joined group with generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:761] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully synced group in generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:395] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Updating assignment with
>         Assigned partitions:                       []
>         Current owned partitions:                  []
>         Added partitions (assigned - owned):       []
>         Revoked partitions (owned - assigned):     []
> 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:279] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Notifying assignor about the new Assignment(partitions=[])
> 2022-11-07 10:28:23,838 INFO

[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart

2022-11-20 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17636398#comment-17636398
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14362:


The offset commit failing due to ongoing rebalances shouldn't in itself cause 
problems/duplicates since it can be retried when the rebalance is over (or in 
the #onPartitionsRevoked callback). The consumer can/will continue to poll for 
more records and return them during a rebalance so it can keep processing while 
it's in progress – this was a new feature added with KIP-429 a while back, it's 
part of cooperative rebalancing. If you really want to, you can turn this off 
by switching to a different version of the partition assignor (eg plain 
StickyAssignor) However it's probably best to figure out why this is actually 
happening first, and also it kind of sucks if you have to go back to 
stop-the-world rebalances.

But on that note, I think I finally understand why this is happening – it 
sounds like you aren't updating the offsets to be committed after additional 
records are processed during the rebalance. Why not? Why do you think that 
these records "can't be committed after consumption?" If the partition was 
revoked then you should commit offsets for it in the #onPartitionsRevoked 
callback of course, and any other partition can be committed when the rebalance 
is over. But in both cases you of course need to make sure you're committing 
offsets all the way up to the last consumed/processed message...

> Same message consumed by two consumers in the same group  after client restart
> --
>
> Key: KAFKA-14362
> URL: https://issues.apache.org/jira/browse/KAFKA-14362
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.1.1
> Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64
>Reporter: Mikael
>Priority: Major
>
> Trigger scenario:
> Two Kafka client application instances on separate EC2 instances with one 
> consumer each, consuming from the same 8 partition topic using the same group 
> ID. Duplicate consumption of a handful of messages sometimes happens right 
> after one of the application instances has been restarted.
> Additional information:
> Messages are produced to the topic by a Kafka streams topology deployed on 
> four application instances. I have verified that each message is only 
> produced once by enabling debug logging in the topology flow right before 
> producing each message to the topic.
> Example logs below are from a test run when a batch of 11 messages were 
> consumed at 10:28:26,771 on the restarted instance and 9 of them were 
> consumed as part of a larger batch at 10:28:23,824 on the other instance. 
> Application shutdown was initiated at  10:27:13,086 and completed at 
> 10:27:15,164, startup was initiated at 10:28:05,029 and completed at 
> 10:28:37,491.
> Kafka consumer group logs after restart on the instance that was restarted:
>  
> {code:java}
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata 
> [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, 
> groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:853] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Discovered group coordinator 
> b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: 
> 2147483646 rack: null)
> 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:1000] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Request joining group due to: need to re-join with the given member-id
> 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:595] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully joined group with generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:761] [Cons

[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart

2022-11-21 Thread Mikael (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17636769#comment-17636769
 ] 

Mikael commented on KAFKA-14362:


I think I have identified the root cause of the duplication:

KafkaConsumer.poll(Duration) returns records from a previous fetch immediately 
even when a rebalance is initiated. Any offset commit before the rebalance is 
completed will fail, and if any of the committed offsets belong to a partition 
that is subsequently revoked from the consumer, offset commit will also fail if 
retried after the rebalance is completed (which is what spring-kafka does).

By contrast, the deprecated method KafkaConsumer.poll(long) awaits completion 
of rebalance before returning data.

I reran my application restart test using a spring-kafka snapshot which calls 
KafkaConsumer.poll(long) instead of KafkaConsumer.poll(Duration), and didn't 
see any duplications when repeating the test 10 times. When i ran the test 
previously it resulted in duplications 2-3 times out of 10.

As expected, I am also no longer seeing these logs after the change:
{code:java}
2022-11-17 07:27:27,149 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
[ConsumerCoordinator.java:1156] [Consumer 
clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
Failing OffsetCommit request since the consumer is not part of an active group 
{code}
The issue appears to be that records that have already been fetched are 
returned immediately before knowing whether the rebalance will revoke the 
partitions that those records belong to, since those records could potentially 
become stale.

In contrast, the deprecated method waits indefinitely for rebalancing to 
complete. Couldn't there instead be a timeout on waiting for the rebalance 
completion?

> Same message consumed by two consumers in the same group  after client restart
> --
>
> Key: KAFKA-14362
> URL: https://issues.apache.org/jira/browse/KAFKA-14362
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.1.1
> Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64
>Reporter: Mikael
>Priority: Major
>
> Trigger scenario:
> Two Kafka client application instances on separate EC2 instances with one 
> consumer each, consuming from the same 8 partition topic using the same group 
> ID. Duplicate consumption of a handful of messages sometimes happens right 
> after one of the application instances has been restarted.
> Additional information:
> Messages are produced to the topic by a Kafka streams topology deployed on 
> four application instances. I have verified that each message is only 
> produced once by enabling debug logging in the topology flow right before 
> producing each message to the topic.
> Example logs below are from a test run when a batch of 11 messages were 
> consumed at 10:28:26,771 on the restarted instance and 9 of them were 
> consumed as part of a larger batch at 10:28:23,824 on the other instance. 
> Application shutdown was initiated at  10:27:13,086 and completed at 
> 10:27:15,164, startup was initiated at 10:28:05,029 and completed at 
> 10:28:37,491.
> Kafka consumer group logs after restart on the instance that was restarted:
>  
> {code:java}
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata 
> [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, 
> groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:853] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Discovered group coordinator 
> b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: 
> 2147483646 rack: null)
> 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:1000] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Request joining group due to: need to re-join with the given member-id
> 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:595] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully joined group with gene

[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart

2022-11-23 Thread Mikael (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17637614#comment-17637614
 ] 

Mikael commented on KAFKA-14362:


This patch in kafka-clients fixed the duplication issue with my application 
restart test case:

[^KAFKA-14362_Do_not_return_any_records_from_KafkaConsumer_poll()_while_rebalance_is_in_prog.patch]

^I've run my restart test 20 times repeatedly now without any duplication.^

^The difference in behaviour would be that KafkaConsumer.poll(Duration) waits 
until the poll timeout and returns no records if a rebalance is still in 
progress, instead of immediately returning records from previous fetch, if 
there are any left.^

^Without this change I have actually seen duplication both with streams 
consumers and normal consumers.^

> Same message consumed by two consumers in the same group  after client restart
> --
>
> Key: KAFKA-14362
> URL: https://issues.apache.org/jira/browse/KAFKA-14362
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.1.1
> Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64
>Reporter: Mikael
>Priority: Major
> Attachments: 
> KAFKA-14362_Do_not_return_any_records_from_KafkaConsumer_poll()_while_rebalance_is_in_prog.patch
>
>
> Trigger scenario:
> Two Kafka client application instances on separate EC2 instances with one 
> consumer each, consuming from the same 8 partition topic using the same group 
> ID. Duplicate consumption of a handful of messages sometimes happens right 
> after one of the application instances has been restarted.
> Additional information:
> Messages are produced to the topic by a Kafka streams topology deployed on 
> four application instances. I have verified that each message is only 
> produced once by enabling debug logging in the topology flow right before 
> producing each message to the topic.
> Example logs below are from a test run when a batch of 11 messages were 
> consumed at 10:28:26,771 on the restarted instance and 9 of them were 
> consumed as part of a larger batch at 10:28:23,824 on the other instance. 
> Application shutdown was initiated at  10:27:13,086 and completed at 
> 10:27:15,164, startup was initiated at 10:28:05,029 and completed at 
> 10:28:37,491.
> Kafka consumer group logs after restart on the instance that was restarted:
>  
> {code:java}
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata 
> [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, 
> groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:853] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Discovered group coordinator 
> b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: 
> 2147483646 rack: null)
> 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:1000] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Request joining group due to: need to re-join with the given member-id
> 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:595] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully joined group with generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:761] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully synced group in generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:395] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Updating assignment with
>         Assigned partitions:                       []
>         Current owned partitions:               

[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart

2022-11-23 Thread Mikael (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17637682#comment-17637682
 ] 

Mikael commented on KAFKA-14362:


Hang on, I just spotted something in spring-kafka: failed offset commits (due 
to rebalancing in progress) are not retried in the onPartitionsRevoked() 
callback. I'm going to try with a modified spring-kafka build and see if that 
fixes the issue.

> Same message consumed by two consumers in the same group  after client restart
> --
>
> Key: KAFKA-14362
> URL: https://issues.apache.org/jira/browse/KAFKA-14362
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.0.1, 2.1.1, 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.3, 2.7.2, 
> 2.8.2, 3.0.2, 3.1.2, 3.2.3
> Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64
>Reporter: Mikael
>Priority: Major
> Attachments: 
> KAFKA-14362_Do_not_return_any_records_from_KafkaConsumer_poll()_while_rebalance_is_in_prog-1.patch,
>  
> KAFKA-14362_Do_not_return_any_records_from_KafkaConsumer_poll()_while_rebalance_is_in_prog.patch
>
>
> Trigger scenario:
> Two Kafka client application instances on separate EC2 instances with one 
> consumer each, consuming from the same 8 partition topic using the same group 
> ID. Duplicate consumption of a handful of messages sometimes happens right 
> after one of the application instances has been restarted.
> Additional information:
> Messages are produced to the topic by a Kafka streams topology deployed on 
> four application instances. I have verified that each message is only 
> produced once by enabling debug logging in the topology flow right before 
> producing each message to the topic.
> Example logs below are from a test run when a batch of 11 messages were 
> consumed at 10:28:26,771 on the restarted instance and 9 of them were 
> consumed as part of a larger batch at 10:28:23,824 on the other instance. 
> Application shutdown was initiated at  10:27:13,086 and completed at 
> 10:27:15,164, startup was initiated at 10:28:05,029 and completed at 
> 10:28:37,491.
> Kafka consumer group logs after restart on the instance that was restarted:
>  
> {code:java}
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata 
> [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, 
> groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:853] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Discovered group coordinator 
> b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: 
> 2147483646 rack: null)
> 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:1000] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Request joining group due to: need to re-join with the given member-id
> 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:595] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully joined group with generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:761] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully synced group in generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:395] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Updating assignment with
>         Assigned partitions:                       []
>         Current owned partitions:                  []
>         Added partitions (assigned - owned):       []
>         Revoked partitions (owned - assigned):     []
> 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator

[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart

2022-11-24 Thread Mikael (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17638225#comment-17638225
 ] 

Mikael commented on KAFKA-14362:


I can confirm that there is no duplication in this scenario if spring-kafka 
retries offset commits that have failed due to RebalanceInProgressException 
when onPartitionsRevoked() is called.

The other scenario with Kafka streams consumer duplication that I mentioned 
previously doesn't involve spring-kafka, but it also seems to be a distinctly 
different issue from the one detailed here, so I'll open a separate Jira issue 
for that.

> Same message consumed by two consumers in the same group  after client restart
> --
>
> Key: KAFKA-14362
> URL: https://issues.apache.org/jira/browse/KAFKA-14362
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.0.1, 2.1.1, 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.3, 2.7.2, 
> 2.8.2, 3.0.2, 3.1.2, 3.2.3
> Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64
>Reporter: Mikael
>Priority: Major
> Attachments: 
> KAFKA-14362_Do_not_return_any_records_from_KafkaConsumer_poll()_while_rebalance_is_in_prog-1.patch,
>  
> KAFKA-14362_Do_not_return_any_records_from_KafkaConsumer_poll()_while_rebalance_is_in_prog.patch
>
>
> Trigger scenario:
> Two Kafka client application instances on separate EC2 instances with one 
> consumer each, consuming from the same 8 partition topic using the same group 
> ID. Duplicate consumption of a handful of messages sometimes happens right 
> after one of the application instances has been restarted.
> Additional information:
> Messages are produced to the topic by a Kafka streams topology deployed on 
> four application instances. I have verified that each message is only 
> produced once by enabling debug logging in the topology flow right before 
> producing each message to the topic.
> Example logs below are from a test run when a batch of 11 messages were 
> consumed at 10:28:26,771 on the restarted instance and 9 of them were 
> consumed as part of a larger batch at 10:28:23,824 on the other instance. 
> Application shutdown was initiated at  10:27:13,086 and completed at 
> 10:27:15,164, startup was initiated at 10:28:05,029 and completed at 
> 10:28:37,491.
> Kafka consumer group logs after restart on the instance that was restarted:
>  
> {code:java}
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata 
> [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, 
> groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:853] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Discovered group coordinator 
> b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: 
> 2147483646 rack: null)
> 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:1000] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Request joining group due to: need to re-join with the given member-id
> 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:595] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully joined group with generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:761] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully synced group in generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:395] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Updating assignment with
>         Assigned partitions:                       []
>         Current owned partitions:                  []
>

[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart

2022-11-24 Thread Mikael (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17638282#comment-17638282
 ] 

Mikael commented on KAFKA-14362:


Created KAFKA-14419 for the stream consumer duplication scenario.

> Same message consumed by two consumers in the same group  after client restart
> --
>
> Key: KAFKA-14362
> URL: https://issues.apache.org/jira/browse/KAFKA-14362
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.0.1, 2.1.1, 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.3, 2.7.2, 
> 2.8.2, 3.0.2, 3.1.2, 3.2.3
> Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64
>Reporter: Mikael
>Priority: Major
> Attachments: 
> KAFKA-14362_Do_not_return_any_records_from_KafkaConsumer_poll()_while_rebalance_is_in_prog-1.patch,
>  
> KAFKA-14362_Do_not_return_any_records_from_KafkaConsumer_poll()_while_rebalance_is_in_prog.patch
>
>
> Trigger scenario:
> Two Kafka client application instances on separate EC2 instances with one 
> consumer each, consuming from the same 8 partition topic using the same group 
> ID. Duplicate consumption of a handful of messages sometimes happens right 
> after one of the application instances has been restarted.
> Additional information:
> Messages are produced to the topic by a Kafka streams topology deployed on 
> four application instances. I have verified that each message is only 
> produced once by enabling debug logging in the topology flow right before 
> producing each message to the topic.
> Example logs below are from a test run when a batch of 11 messages were 
> consumed at 10:28:26,771 on the restarted instance and 9 of them were 
> consumed as part of a larger batch at 10:28:23,824 on the other instance. 
> Application shutdown was initiated at  10:27:13,086 and completed at 
> 10:27:15,164, startup was initiated at 10:28:05,029 and completed at 
> 10:28:37,491.
> Kafka consumer group logs after restart on the instance that was restarted:
>  
> {code:java}
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata 
> [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, 
> groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:853] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Discovered group coordinator 
> b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: 
> 2147483646 rack: null)
> 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:1000] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Request joining group due to: need to re-join with the given member-id
> 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:595] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully joined group with generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:761] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully synced group in generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:395] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Updating assignment with
>         Assigned partitions:                       []
>         Current owned partitions:                  []
>         Added partitions (assigned - owned):       []
>         Revoked partitions (owned - assigned):     []
> 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:279] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Notifying assignor about the new Assignment(partitions=[])

[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart

2022-12-09 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17645541#comment-17645541
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14362:


Nice find, I would not at all be surprised to find out there's a bug in how 
spring-kafka handles things (or doesn't). I saw that you filled in the Affects 
Versions but only up to 3.0, does that just mean you haven't tested beyond that 
point or does the issue go away? Or is that just the highest version of kafka 
available in spring right now?

There have been many, many fixes and improvements to the rebalancing protocol 
since those older versions, and at least a few that I know of even since 3.0. I 
think we should focus on making sure there's no issue in the current/recent 
versions of Kafka (like with the Streams ticket you filed) and if there is, 
whether it's truly a bug in kafka or something that should be fixed in 
spring-kafka. Even it is the latter, that's really good to know and I'm happy 
to help review a patch for them although I can't merge over there

> Same message consumed by two consumers in the same group  after client restart
> --
>
> Key: KAFKA-14362
> URL: https://issues.apache.org/jira/browse/KAFKA-14362
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.0.1, 2.1.1, 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.3, 2.7.2, 
> 2.8.2, 3.0.2, 3.1.2, 3.2.3
> Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64
>Reporter: Mikael
>Priority: Major
> Attachments: 
> KAFKA-14362_Do_not_return_any_records_from_KafkaConsumer_poll()_while_rebalance_is_in_prog.patch
>
>
> Trigger scenario:
> Two Kafka client application instances on separate EC2 instances with one 
> consumer each, consuming from the same 8 partition topic using the same group 
> ID. Duplicate consumption of a handful of messages sometimes happens right 
> after one of the application instances has been restarted.
> Additional information:
> Messages are produced to the topic by a Kafka streams topology deployed on 
> four application instances. I have verified that each message is only 
> produced once by enabling debug logging in the topology flow right before 
> producing each message to the topic.
> Example logs below are from a test run when a batch of 11 messages were 
> consumed at 10:28:26,771 on the restarted instance and 9 of them were 
> consumed as part of a larger batch at 10:28:23,824 on the other instance. 
> Application shutdown was initiated at  10:27:13,086 and completed at 
> 10:27:15,164, startup was initiated at 10:28:05,029 and completed at 
> 10:28:37,491.
> Kafka consumer group logs after restart on the instance that was restarted:
>  
> {code:java}
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata 
> [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, 
> groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:853] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Discovered group coordinator 
> b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: 
> 2147483646 rack: null)
> 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:1000] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Request joining group due to: need to re-join with the given member-id
> 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:595] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully joined group with generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:761] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully synced group in generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protoc

[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart

2022-12-10 Thread Mikael (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17645580#comment-17645580
 ] 

Mikael commented on KAFKA-14362:


The spring-kafka defect is fixed in spring-kafka 3.0.1 and backported to 2.9.4. 
We are still using the attached patch though, as a workaround for KAFKA-14419 
until that has been resolved.

> Same message consumed by two consumers in the same group  after client restart
> --
>
> Key: KAFKA-14362
> URL: https://issues.apache.org/jira/browse/KAFKA-14362
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.0.1, 2.1.1, 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.3, 2.7.2, 
> 2.8.2, 3.0.2, 3.1.2, 3.2.3
> Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64
>Reporter: Mikael
>Priority: Major
> Attachments: 
> KAFKA-14362_Do_not_return_any_records_from_KafkaConsumer_poll()_while_rebalance_is_in_prog.patch
>
>
> Trigger scenario:
> Two Kafka client application instances on separate EC2 instances with one 
> consumer each, consuming from the same 8 partition topic using the same group 
> ID. Duplicate consumption of a handful of messages sometimes happens right 
> after one of the application instances has been restarted.
> Additional information:
> Messages are produced to the topic by a Kafka streams topology deployed on 
> four application instances. I have verified that each message is only 
> produced once by enabling debug logging in the topology flow right before 
> producing each message to the topic.
> Example logs below are from a test run when a batch of 11 messages were 
> consumed at 10:28:26,771 on the restarted instance and 9 of them were 
> consumed as part of a larger batch at 10:28:23,824 on the other instance. 
> Application shutdown was initiated at  10:27:13,086 and completed at 
> 10:27:15,164, startup was initiated at 10:28:05,029 and completed at 
> 10:28:37,491.
> Kafka consumer group logs after restart on the instance that was restarted:
>  
> {code:java}
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata 
> [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, 
> groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA
> 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:853] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Discovered group coordinator 
> b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: 
> 2147483646 rack: null)
> 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:1000] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Request joining group due to: need to re-join with the given member-id
> 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:535] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> (Re-)joining group
> 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:595] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully joined group with generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [AbstractCoordinator.java:761] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Successfully synced group in generation Generation{generationId=676, 
> memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243',
>  protocol='cooperative-sticky'}
> 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:395] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Updating assignment with
>         Assigned partitions:                       []
>         Current owned partitions:                  []
>         Added partitions (assigned - owned):       []
>         Revoked partitions (owned - assigned):     []
> 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator 
> [ConsumerCoordinator.java:279] [Consumer 
> clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] 
> Notifying assignor about the new Assignm