[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart
[ https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17629832#comment-17629832 ] Mikael commented on KAFKA-14362: The main thing that has caught my attention is the tight loop of 'Failing OffsetCommit request since the consumer is not part of an active group' messages for the consumer that is not restarted. Could it have given up on committing the offset? > Same message consumed by two consumers in the same group after client restart > -- > > Key: KAFKA-14362 > URL: https://issues.apache.org/jira/browse/KAFKA-14362 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.1.1 > Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64 >Reporter: Mikael >Priority: Major > > Trigger scenario: > Two Kafka client application instances on separate EC2 instances with one > consumer each, consuming from the same 8 partition topic using the same group > ID. Duplicate consumption of a handful of messages sometimes happens right > after one of the application instances has been restarted. > Additional information: > Messages are produced to the topic by a Kafka streams topology deployed on > four application instances. I have verified that each message is only > produced once by enabling debug logging in the topology flow right before > producing each message to the topic. > Example logs below are from a test run when a batch of 11 messages were > consumed at 10:28:26,771 on the restarted instance and 9 of them were > consumed as part of a larger batch at 10:28:23,824 on the other instance. > Application shutdown was initiated at 10:27:13,086 and completed at > 10:27:15,164, startup was initiated at 10:28:05,029 and completed at > 10:28:37,491. > Kafka consumer group logs after restart on the instance that was restarted: > > {code:java} > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata > [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, > groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:853] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Discovered group coordinator > b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: > 2147483646 rack: null) > 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:1000] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Request joining group due to: need to re-join with the given member-id > 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:595] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully joined group with generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > protocol='cooperative-sticky'} > 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:761] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully synced group in generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > protocol='cooperative-sticky'} > 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [ConsumerCoordinator.java:395] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Updating assignment with > Assigned partitions: [] > Current owned partitions: [] > Added partitions (assigned - owned): [] > Revoked partitions (owned - assigned): [] > 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [ConsumerCoordinator.java:279] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Notifying assignor about the new Assignment(partitions=[]) > 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [ConsumerCoordinator.java:291] [Consumer > cl
[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart
[ https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17630717#comment-17630717 ] A. Sophie Blee-Goldman commented on KAFKA-14362: Hey [~Carlstedt] you're right about why this is happening, because of the restart a rebalance is kicked off which means that any further attempts to commit offsets by other members of the group will fail. After a rebalance, if for example a partition is reassigned from consumer A to consumer B, then consumer B knows where to pick up and resume processing by seeking to the position that corresponds with the latest committed offsets for that partition. If consumer A had processed a message right before the rebalance but then failed to commit an offset for it, this message will be reprocessed by consumer B – essentially to Kafka it looks like this message was never fully processed by A and therefore B should make sure to process it again. In other words, this is the intended behavior – there's no guarantee that a message will only be _consumed_ once, instead Kafka guarantees _at least once_ processing semantics – this is to make sure that every record is fully processed and handled by your application logic before moving on to the next record. Otherwise it wouldn't be fault tolerant, ie you might lose a record completely for example if the app crashed immediately after the record was polled from the consumer > Same message consumed by two consumers in the same group after client restart > -- > > Key: KAFKA-14362 > URL: https://issues.apache.org/jira/browse/KAFKA-14362 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.1.1 > Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64 >Reporter: Mikael >Priority: Major > > Trigger scenario: > Two Kafka client application instances on separate EC2 instances with one > consumer each, consuming from the same 8 partition topic using the same group > ID. Duplicate consumption of a handful of messages sometimes happens right > after one of the application instances has been restarted. > Additional information: > Messages are produced to the topic by a Kafka streams topology deployed on > four application instances. I have verified that each message is only > produced once by enabling debug logging in the topology flow right before > producing each message to the topic. > Example logs below are from a test run when a batch of 11 messages were > consumed at 10:28:26,771 on the restarted instance and 9 of them were > consumed as part of a larger batch at 10:28:23,824 on the other instance. > Application shutdown was initiated at 10:27:13,086 and completed at > 10:27:15,164, startup was initiated at 10:28:05,029 and completed at > 10:28:37,491. > Kafka consumer group logs after restart on the instance that was restarted: > > {code:java} > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata > [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, > groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:853] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Discovered group coordinator > b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: > 2147483646 rack: null) > 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:1000] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Request joining group due to: need to re-join with the given member-id > 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:595] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully joined group with generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > protocol='cooperative-sticky'} > 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:761] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] >
[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart
[ https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17631265#comment-17631265 ] Mikael commented on KAFKA-14362: I would have thought that an orderly rebalance wouldn't cause any duplication. I understand that an uncontrolled restart can cause duplication, but in this case it's a consumer that just leaves the group and then joins it again later. Surely it can't be expected behaviour to randomly duplicate messages in that use case? It's also strange that the presumable offset commit failure is not reported as a warning or error level log entry. > Same message consumed by two consumers in the same group after client restart > -- > > Key: KAFKA-14362 > URL: https://issues.apache.org/jira/browse/KAFKA-14362 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.1.1 > Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64 >Reporter: Mikael >Priority: Major > > Trigger scenario: > Two Kafka client application instances on separate EC2 instances with one > consumer each, consuming from the same 8 partition topic using the same group > ID. Duplicate consumption of a handful of messages sometimes happens right > after one of the application instances has been restarted. > Additional information: > Messages are produced to the topic by a Kafka streams topology deployed on > four application instances. I have verified that each message is only > produced once by enabling debug logging in the topology flow right before > producing each message to the topic. > Example logs below are from a test run when a batch of 11 messages were > consumed at 10:28:26,771 on the restarted instance and 9 of them were > consumed as part of a larger batch at 10:28:23,824 on the other instance. > Application shutdown was initiated at 10:27:13,086 and completed at > 10:27:15,164, startup was initiated at 10:28:05,029 and completed at > 10:28:37,491. > Kafka consumer group logs after restart on the instance that was restarted: > > {code:java} > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata > [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, > groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:853] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Discovered group coordinator > b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: > 2147483646 rack: null) > 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:1000] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Request joining group due to: need to re-join with the given member-id > 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:595] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully joined group with generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > protocol='cooperative-sticky'} > 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:761] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully synced group in generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > protocol='cooperative-sticky'} > 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [ConsumerCoordinator.java:395] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Updating assignment with > Assigned partitions: [] > Current owned partitions: [] > Added partitions (assigned - owned): [] > Revoked partitions (owned - assigned): [] > 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [ConsumerCoordinator.java:279] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callba
[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart
[ https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17632082#comment-17632082 ] A. Sophie Blee-Goldman commented on KAFKA-14362: {quote}I would have thought that an orderly rebalance wouldn't cause any duplication {quote} Well in general it shouldn't, actually, because even if the offset commit fails/is preventing while the rebalance is in progress, if any partitions are migrated from one consumer to another then the original owner will get the chance to finish committing when it revokes those partitions – ie during the ConsumerRebalanceListener#onPartitionsRevoked callback. Now that I think of it, that's probably the issue you're experiencing: not committing offsets on revocation. If you use the default rebalance listener/don't explicitly pass one in when subscribing to topics, Kafka will default to a "no-op" listener that does not do this commit in the callback. Tbh this feels like a poor design decision but it comes down to a core design philosophy: to guarantee every record gets fully processed "at least once", rather than "at most once" Anyways, if you're not already utilizing a rebalance listener to commit offsets when partitions are revoked, I would definitely start there. > Same message consumed by two consumers in the same group after client restart > -- > > Key: KAFKA-14362 > URL: https://issues.apache.org/jira/browse/KAFKA-14362 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.1.1 > Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64 >Reporter: Mikael >Priority: Major > > Trigger scenario: > Two Kafka client application instances on separate EC2 instances with one > consumer each, consuming from the same 8 partition topic using the same group > ID. Duplicate consumption of a handful of messages sometimes happens right > after one of the application instances has been restarted. > Additional information: > Messages are produced to the topic by a Kafka streams topology deployed on > four application instances. I have verified that each message is only > produced once by enabling debug logging in the topology flow right before > producing each message to the topic. > Example logs below are from a test run when a batch of 11 messages were > consumed at 10:28:26,771 on the restarted instance and 9 of them were > consumed as part of a larger batch at 10:28:23,824 on the other instance. > Application shutdown was initiated at 10:27:13,086 and completed at > 10:27:15,164, startup was initiated at 10:28:05,029 and completed at > 10:28:37,491. > Kafka consumer group logs after restart on the instance that was restarted: > > {code:java} > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata > [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, > groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:853] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Discovered group coordinator > b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: > 2147483646 rack: null) > 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:1000] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Request joining group due to: need to re-join with the given member-id > 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:595] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully joined group with generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > protocol='cooperative-sticky'} > 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:761] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully synced group in generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > prot
[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart
[ https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17633763#comment-17633763 ] Mikael commented on KAFKA-14362: We are using KafkaMessageListenerContainer from spring-kafka, which registers a ConsumerRebalanceListener that commits all offsets in its onPartitionsRevoked() method. I have now changed the application logic to always commit offsets synchronously in the same thread that calls Consumer.poll(), but the duplication is still happening. While trying to understand the Kafka consumer source code, I have one doubt: although no new records are fetched from a partition that has been revoked, what happens with the records from revoked partitons that have already been fetched but not yet returned by Consumer.poll()? Are they filtered out from the next poll? Otherwise I imagine they could result in duplication. > Same message consumed by two consumers in the same group after client restart > -- > > Key: KAFKA-14362 > URL: https://issues.apache.org/jira/browse/KAFKA-14362 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.1.1 > Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64 >Reporter: Mikael >Priority: Major > > Trigger scenario: > Two Kafka client application instances on separate EC2 instances with one > consumer each, consuming from the same 8 partition topic using the same group > ID. Duplicate consumption of a handful of messages sometimes happens right > after one of the application instances has been restarted. > Additional information: > Messages are produced to the topic by a Kafka streams topology deployed on > four application instances. I have verified that each message is only > produced once by enabling debug logging in the topology flow right before > producing each message to the topic. > Example logs below are from a test run when a batch of 11 messages were > consumed at 10:28:26,771 on the restarted instance and 9 of them were > consumed as part of a larger batch at 10:28:23,824 on the other instance. > Application shutdown was initiated at 10:27:13,086 and completed at > 10:27:15,164, startup was initiated at 10:28:05,029 and completed at > 10:28:37,491. > Kafka consumer group logs after restart on the instance that was restarted: > > {code:java} > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata > [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, > groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:853] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Discovered group coordinator > b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: > 2147483646 rack: null) > 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:1000] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Request joining group due to: need to re-join with the given member-id > 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:595] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully joined group with generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > protocol='cooperative-sticky'} > 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:761] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully synced group in generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > protocol='cooperative-sticky'} > 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [ConsumerCoordinator.java:395] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Updating assignment with > Assigned partitions: [] > Current owned partitions: [] > Added partitions (assig
[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart
[ https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17635370#comment-17635370 ] Mikael commented on KAFKA-14362: When comparing successful test runs with those that generate duplicate consumption of messages, I noticed that in the success case, for the consumer that gets some of its partitions revoked, there is FIRST this log message: {code:java} 2022-11-17 07:27:27,141 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:395] [Consumer clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] Updating assignment with Assigned partitions: [messages.xms.mt.batch.report-3, messages.xms.mt.batch.report-0, messages.xms.mt.batch.report-2, messages.xms.mt.batch.report-1] Current owned partitions: [messages.xms.mt.batch.report-4, messages.xms.mt.batch.report-3, messages.xms.mt.batch.report-6, messages.xms.mt.batch.report-5, messages.xms.mt.batch.report-0, messages.xms.mt.batch.report-2, m essages.xms.mt.batch.report-1, messages.xms.mt.batch.report-7] Added partitions (assigned - owned): [] Revoked partitions (owned - assigned): [messages.xms.mt.batch.report-4, messages.xms.mt.batch.report-6, messages.xms.mt.batch.report-5, messages.xms.mt.batch.report-7] {code} and THEN a large number of this type of log message: {code:java} 2022-11-17 07:27:27,149 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:1156] [Consumer clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] Failing OffsetCommit request since the consumer is not part of an active group {code} whereas in the duplication case, at least one of the latter message is logged BEFORE the former one. > Same message consumed by two consumers in the same group after client restart > -- > > Key: KAFKA-14362 > URL: https://issues.apache.org/jira/browse/KAFKA-14362 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.1.1 > Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64 >Reporter: Mikael >Priority: Major > > Trigger scenario: > Two Kafka client application instances on separate EC2 instances with one > consumer each, consuming from the same 8 partition topic using the same group > ID. Duplicate consumption of a handful of messages sometimes happens right > after one of the application instances has been restarted. > Additional information: > Messages are produced to the topic by a Kafka streams topology deployed on > four application instances. I have verified that each message is only > produced once by enabling debug logging in the topology flow right before > producing each message to the topic. > Example logs below are from a test run when a batch of 11 messages were > consumed at 10:28:26,771 on the restarted instance and 9 of them were > consumed as part of a larger batch at 10:28:23,824 on the other instance. > Application shutdown was initiated at 10:27:13,086 and completed at > 10:27:15,164, startup was initiated at 10:28:05,029 and completed at > 10:28:37,491. > Kafka consumer group logs after restart on the instance that was restarted: > > {code:java} > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata > [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, > groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:853] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Discovered group coordinator > b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: > 2147483646 rack: null) > 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:1000] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Request joining group due to: need to re-join with the given member-id > 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:595] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully joined group w
[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart
[ https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17635833#comment-17635833 ] A. Sophie Blee-Goldman commented on KAFKA-14362: Do you ever see anything about partitions being "lost"? Or rebalances failing/needing to be retried or restarted in the duplicates case? > Same message consumed by two consumers in the same group after client restart > -- > > Key: KAFKA-14362 > URL: https://issues.apache.org/jira/browse/KAFKA-14362 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.1.1 > Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64 >Reporter: Mikael >Priority: Major > > Trigger scenario: > Two Kafka client application instances on separate EC2 instances with one > consumer each, consuming from the same 8 partition topic using the same group > ID. Duplicate consumption of a handful of messages sometimes happens right > after one of the application instances has been restarted. > Additional information: > Messages are produced to the topic by a Kafka streams topology deployed on > four application instances. I have verified that each message is only > produced once by enabling debug logging in the topology flow right before > producing each message to the topic. > Example logs below are from a test run when a batch of 11 messages were > consumed at 10:28:26,771 on the restarted instance and 9 of them were > consumed as part of a larger batch at 10:28:23,824 on the other instance. > Application shutdown was initiated at 10:27:13,086 and completed at > 10:27:15,164, startup was initiated at 10:28:05,029 and completed at > 10:28:37,491. > Kafka consumer group logs after restart on the instance that was restarted: > > {code:java} > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata > [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, > groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:853] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Discovered group coordinator > b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: > 2147483646 rack: null) > 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:1000] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Request joining group due to: need to re-join with the given member-id > 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:595] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully joined group with generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > protocol='cooperative-sticky'} > 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:761] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully synced group in generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > protocol='cooperative-sticky'} > 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [ConsumerCoordinator.java:395] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Updating assignment with > Assigned partitions: [] > Current owned partitions: [] > Added partitions (assigned - owned): [] > Revoked partitions (owned - assigned): [] > 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [ConsumerCoordinator.java:279] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Notifying assignor about the new Assignment(partitions=[]) > 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [ConsumerCoordinator.java:291] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Ad
[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart
[ https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17635921#comment-17635921 ] Mikael commented on KAFKA-14362: No, no errors or warnings related to rebalancing. I'm currently trying to work around it by implementing a ConsumerRebalanceListener and validating that a batch doesn't contain any records for revoked partitions before consuming it. > Same message consumed by two consumers in the same group after client restart > -- > > Key: KAFKA-14362 > URL: https://issues.apache.org/jira/browse/KAFKA-14362 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.1.1 > Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64 >Reporter: Mikael >Priority: Major > > Trigger scenario: > Two Kafka client application instances on separate EC2 instances with one > consumer each, consuming from the same 8 partition topic using the same group > ID. Duplicate consumption of a handful of messages sometimes happens right > after one of the application instances has been restarted. > Additional information: > Messages are produced to the topic by a Kafka streams topology deployed on > four application instances. I have verified that each message is only > produced once by enabling debug logging in the topology flow right before > producing each message to the topic. > Example logs below are from a test run when a batch of 11 messages were > consumed at 10:28:26,771 on the restarted instance and 9 of them were > consumed as part of a larger batch at 10:28:23,824 on the other instance. > Application shutdown was initiated at 10:27:13,086 and completed at > 10:27:15,164, startup was initiated at 10:28:05,029 and completed at > 10:28:37,491. > Kafka consumer group logs after restart on the instance that was restarted: > > {code:java} > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata > [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, > groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:853] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Discovered group coordinator > b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: > 2147483646 rack: null) > 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:1000] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Request joining group due to: need to re-join with the given member-id > 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:595] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully joined group with generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > protocol='cooperative-sticky'} > 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:761] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully synced group in generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > protocol='cooperative-sticky'} > 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [ConsumerCoordinator.java:395] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Updating assignment with > Assigned partitions: [] > Current owned partitions: [] > Added partitions (assigned - owned): [] > Revoked partitions (owned - assigned): [] > 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [ConsumerCoordinator.java:279] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Notifying assignor about the new Assignment(partitions=[]) > 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [ConsumerCoordinator.java:291] [Consumer > clientId=consu
[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart
[ https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17636116#comment-17636116 ] A. Sophie Blee-Goldman commented on KAFKA-14362: Well I mean, regardless of whatever is happening here, failures obviously do happen so your handling logic would need to consider the possibility of re-consuming offsets anyways. If you're interested in an example of how to implement exactly-once semantics, Kafka Streams does exactly this via transactional producers. There are probably hundreds of docs and blog posts on how it works by now. But of course this only helps/works when the record processing terminates in another topic > Same message consumed by two consumers in the same group after client restart > -- > > Key: KAFKA-14362 > URL: https://issues.apache.org/jira/browse/KAFKA-14362 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.1.1 > Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64 >Reporter: Mikael >Priority: Major > > Trigger scenario: > Two Kafka client application instances on separate EC2 instances with one > consumer each, consuming from the same 8 partition topic using the same group > ID. Duplicate consumption of a handful of messages sometimes happens right > after one of the application instances has been restarted. > Additional information: > Messages are produced to the topic by a Kafka streams topology deployed on > four application instances. I have verified that each message is only > produced once by enabling debug logging in the topology flow right before > producing each message to the topic. > Example logs below are from a test run when a batch of 11 messages were > consumed at 10:28:26,771 on the restarted instance and 9 of them were > consumed as part of a larger batch at 10:28:23,824 on the other instance. > Application shutdown was initiated at 10:27:13,086 and completed at > 10:27:15,164, startup was initiated at 10:28:05,029 and completed at > 10:28:37,491. > Kafka consumer group logs after restart on the instance that was restarted: > > {code:java} > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata > [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, > groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:853] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Discovered group coordinator > b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: > 2147483646 rack: null) > 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:1000] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Request joining group due to: need to re-join with the given member-id > 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:595] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully joined group with generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > protocol='cooperative-sticky'} > 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:761] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully synced group in generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > protocol='cooperative-sticky'} > 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [ConsumerCoordinator.java:395] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Updating assignment with > Assigned partitions: [] > Current owned partitions: [] > Added partitions (assigned - owned): [] > Revoked partitions (owned - assigned): [] > 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [ConsumerCoordinator.java:279] [C
[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart
[ https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17636182#comment-17636182 ] Mikael commented on KAFKA-14362: Workaround didn't help. It seems that the first failing offset commit happens before onPartitionsRevoked() is called, due to rebalance in progress. But if rebalance is in progress, preventing offsets from being committed, then shouldn't KafkaConsumer.poll() return an empty list rather than records that can't be committed after consumption? > Same message consumed by two consumers in the same group after client restart > -- > > Key: KAFKA-14362 > URL: https://issues.apache.org/jira/browse/KAFKA-14362 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.1.1 > Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64 >Reporter: Mikael >Priority: Major > > Trigger scenario: > Two Kafka client application instances on separate EC2 instances with one > consumer each, consuming from the same 8 partition topic using the same group > ID. Duplicate consumption of a handful of messages sometimes happens right > after one of the application instances has been restarted. > Additional information: > Messages are produced to the topic by a Kafka streams topology deployed on > four application instances. I have verified that each message is only > produced once by enabling debug logging in the topology flow right before > producing each message to the topic. > Example logs below are from a test run when a batch of 11 messages were > consumed at 10:28:26,771 on the restarted instance and 9 of them were > consumed as part of a larger batch at 10:28:23,824 on the other instance. > Application shutdown was initiated at 10:27:13,086 and completed at > 10:27:15,164, startup was initiated at 10:28:05,029 and completed at > 10:28:37,491. > Kafka consumer group logs after restart on the instance that was restarted: > > {code:java} > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata > [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, > groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:853] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Discovered group coordinator > b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: > 2147483646 rack: null) > 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:1000] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Request joining group due to: need to re-join with the given member-id > 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:595] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully joined group with generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > protocol='cooperative-sticky'} > 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:761] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully synced group in generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > protocol='cooperative-sticky'} > 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [ConsumerCoordinator.java:395] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Updating assignment with > Assigned partitions: [] > Current owned partitions: [] > Added partitions (assigned - owned): [] > Revoked partitions (owned - assigned): [] > 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [ConsumerCoordinator.java:279] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Notifying assignor about the new Assignment(partitions=[]) > 2022-11-07 10:28:23,838 INFO
[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart
[ https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17636398#comment-17636398 ] A. Sophie Blee-Goldman commented on KAFKA-14362: The offset commit failing due to ongoing rebalances shouldn't in itself cause problems/duplicates since it can be retried when the rebalance is over (or in the #onPartitionsRevoked callback). The consumer can/will continue to poll for more records and return them during a rebalance so it can keep processing while it's in progress – this was a new feature added with KIP-429 a while back, it's part of cooperative rebalancing. If you really want to, you can turn this off by switching to a different version of the partition assignor (eg plain StickyAssignor) However it's probably best to figure out why this is actually happening first, and also it kind of sucks if you have to go back to stop-the-world rebalances. But on that note, I think I finally understand why this is happening – it sounds like you aren't updating the offsets to be committed after additional records are processed during the rebalance. Why not? Why do you think that these records "can't be committed after consumption?" If the partition was revoked then you should commit offsets for it in the #onPartitionsRevoked callback of course, and any other partition can be committed when the rebalance is over. But in both cases you of course need to make sure you're committing offsets all the way up to the last consumed/processed message... > Same message consumed by two consumers in the same group after client restart > -- > > Key: KAFKA-14362 > URL: https://issues.apache.org/jira/browse/KAFKA-14362 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.1.1 > Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64 >Reporter: Mikael >Priority: Major > > Trigger scenario: > Two Kafka client application instances on separate EC2 instances with one > consumer each, consuming from the same 8 partition topic using the same group > ID. Duplicate consumption of a handful of messages sometimes happens right > after one of the application instances has been restarted. > Additional information: > Messages are produced to the topic by a Kafka streams topology deployed on > four application instances. I have verified that each message is only > produced once by enabling debug logging in the topology flow right before > producing each message to the topic. > Example logs below are from a test run when a batch of 11 messages were > consumed at 10:28:26,771 on the restarted instance and 9 of them were > consumed as part of a larger batch at 10:28:23,824 on the other instance. > Application shutdown was initiated at 10:27:13,086 and completed at > 10:27:15,164, startup was initiated at 10:28:05,029 and completed at > 10:28:37,491. > Kafka consumer group logs after restart on the instance that was restarted: > > {code:java} > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata > [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, > groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:853] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Discovered group coordinator > b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: > 2147483646 rack: null) > 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:1000] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Request joining group due to: need to re-join with the given member-id > 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:595] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully joined group with generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > protocol='cooperative-sticky'} > 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:761] [Cons
[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart
[ https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17636769#comment-17636769 ] Mikael commented on KAFKA-14362: I think I have identified the root cause of the duplication: KafkaConsumer.poll(Duration) returns records from a previous fetch immediately even when a rebalance is initiated. Any offset commit before the rebalance is completed will fail, and if any of the committed offsets belong to a partition that is subsequently revoked from the consumer, offset commit will also fail if retried after the rebalance is completed (which is what spring-kafka does). By contrast, the deprecated method KafkaConsumer.poll(long) awaits completion of rebalance before returning data. I reran my application restart test using a spring-kafka snapshot which calls KafkaConsumer.poll(long) instead of KafkaConsumer.poll(Duration), and didn't see any duplications when repeating the test 10 times. When i ran the test previously it resulted in duplications 2-3 times out of 10. As expected, I am also no longer seeing these logs after the change: {code:java} 2022-11-17 07:27:27,149 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:1156] [Consumer clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] Failing OffsetCommit request since the consumer is not part of an active group {code} The issue appears to be that records that have already been fetched are returned immediately before knowing whether the rebalance will revoke the partitions that those records belong to, since those records could potentially become stale. In contrast, the deprecated method waits indefinitely for rebalancing to complete. Couldn't there instead be a timeout on waiting for the rebalance completion? > Same message consumed by two consumers in the same group after client restart > -- > > Key: KAFKA-14362 > URL: https://issues.apache.org/jira/browse/KAFKA-14362 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.1.1 > Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64 >Reporter: Mikael >Priority: Major > > Trigger scenario: > Two Kafka client application instances on separate EC2 instances with one > consumer each, consuming from the same 8 partition topic using the same group > ID. Duplicate consumption of a handful of messages sometimes happens right > after one of the application instances has been restarted. > Additional information: > Messages are produced to the topic by a Kafka streams topology deployed on > four application instances. I have verified that each message is only > produced once by enabling debug logging in the topology flow right before > producing each message to the topic. > Example logs below are from a test run when a batch of 11 messages were > consumed at 10:28:26,771 on the restarted instance and 9 of them were > consumed as part of a larger batch at 10:28:23,824 on the other instance. > Application shutdown was initiated at 10:27:13,086 and completed at > 10:27:15,164, startup was initiated at 10:28:05,029 and completed at > 10:28:37,491. > Kafka consumer group logs after restart on the instance that was restarted: > > {code:java} > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata > [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, > groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:853] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Discovered group coordinator > b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: > 2147483646 rack: null) > 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:1000] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Request joining group due to: need to re-join with the given member-id > 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:595] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully joined group with gene
[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart
[ https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17637614#comment-17637614 ] Mikael commented on KAFKA-14362: This patch in kafka-clients fixed the duplication issue with my application restart test case: [^KAFKA-14362_Do_not_return_any_records_from_KafkaConsumer_poll()_while_rebalance_is_in_prog.patch] ^I've run my restart test 20 times repeatedly now without any duplication.^ ^The difference in behaviour would be that KafkaConsumer.poll(Duration) waits until the poll timeout and returns no records if a rebalance is still in progress, instead of immediately returning records from previous fetch, if there are any left.^ ^Without this change I have actually seen duplication both with streams consumers and normal consumers.^ > Same message consumed by two consumers in the same group after client restart > -- > > Key: KAFKA-14362 > URL: https://issues.apache.org/jira/browse/KAFKA-14362 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.1.1 > Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64 >Reporter: Mikael >Priority: Major > Attachments: > KAFKA-14362_Do_not_return_any_records_from_KafkaConsumer_poll()_while_rebalance_is_in_prog.patch > > > Trigger scenario: > Two Kafka client application instances on separate EC2 instances with one > consumer each, consuming from the same 8 partition topic using the same group > ID. Duplicate consumption of a handful of messages sometimes happens right > after one of the application instances has been restarted. > Additional information: > Messages are produced to the topic by a Kafka streams topology deployed on > four application instances. I have verified that each message is only > produced once by enabling debug logging in the topology flow right before > producing each message to the topic. > Example logs below are from a test run when a batch of 11 messages were > consumed at 10:28:26,771 on the restarted instance and 9 of them were > consumed as part of a larger batch at 10:28:23,824 on the other instance. > Application shutdown was initiated at 10:27:13,086 and completed at > 10:27:15,164, startup was initiated at 10:28:05,029 and completed at > 10:28:37,491. > Kafka consumer group logs after restart on the instance that was restarted: > > {code:java} > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata > [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, > groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:853] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Discovered group coordinator > b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: > 2147483646 rack: null) > 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:1000] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Request joining group due to: need to re-join with the given member-id > 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:595] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully joined group with generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > protocol='cooperative-sticky'} > 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:761] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully synced group in generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > protocol='cooperative-sticky'} > 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [ConsumerCoordinator.java:395] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Updating assignment with > Assigned partitions: [] > Current owned partitions:
[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart
[ https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17637682#comment-17637682 ] Mikael commented on KAFKA-14362: Hang on, I just spotted something in spring-kafka: failed offset commits (due to rebalancing in progress) are not retried in the onPartitionsRevoked() callback. I'm going to try with a modified spring-kafka build and see if that fixes the issue. > Same message consumed by two consumers in the same group after client restart > -- > > Key: KAFKA-14362 > URL: https://issues.apache.org/jira/browse/KAFKA-14362 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.0.1, 2.1.1, 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.3, 2.7.2, > 2.8.2, 3.0.2, 3.1.2, 3.2.3 > Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64 >Reporter: Mikael >Priority: Major > Attachments: > KAFKA-14362_Do_not_return_any_records_from_KafkaConsumer_poll()_while_rebalance_is_in_prog-1.patch, > > KAFKA-14362_Do_not_return_any_records_from_KafkaConsumer_poll()_while_rebalance_is_in_prog.patch > > > Trigger scenario: > Two Kafka client application instances on separate EC2 instances with one > consumer each, consuming from the same 8 partition topic using the same group > ID. Duplicate consumption of a handful of messages sometimes happens right > after one of the application instances has been restarted. > Additional information: > Messages are produced to the topic by a Kafka streams topology deployed on > four application instances. I have verified that each message is only > produced once by enabling debug logging in the topology flow right before > producing each message to the topic. > Example logs below are from a test run when a batch of 11 messages were > consumed at 10:28:26,771 on the restarted instance and 9 of them were > consumed as part of a larger batch at 10:28:23,824 on the other instance. > Application shutdown was initiated at 10:27:13,086 and completed at > 10:27:15,164, startup was initiated at 10:28:05,029 and completed at > 10:28:37,491. > Kafka consumer group logs after restart on the instance that was restarted: > > {code:java} > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata > [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, > groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:853] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Discovered group coordinator > b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: > 2147483646 rack: null) > 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:1000] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Request joining group due to: need to re-join with the given member-id > 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:595] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully joined group with generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > protocol='cooperative-sticky'} > 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:761] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully synced group in generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > protocol='cooperative-sticky'} > 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [ConsumerCoordinator.java:395] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Updating assignment with > Assigned partitions: [] > Current owned partitions: [] > Added partitions (assigned - owned): [] > Revoked partitions (owned - assigned): [] > 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator
[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart
[ https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17638225#comment-17638225 ] Mikael commented on KAFKA-14362: I can confirm that there is no duplication in this scenario if spring-kafka retries offset commits that have failed due to RebalanceInProgressException when onPartitionsRevoked() is called. The other scenario with Kafka streams consumer duplication that I mentioned previously doesn't involve spring-kafka, but it also seems to be a distinctly different issue from the one detailed here, so I'll open a separate Jira issue for that. > Same message consumed by two consumers in the same group after client restart > -- > > Key: KAFKA-14362 > URL: https://issues.apache.org/jira/browse/KAFKA-14362 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.0.1, 2.1.1, 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.3, 2.7.2, > 2.8.2, 3.0.2, 3.1.2, 3.2.3 > Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64 >Reporter: Mikael >Priority: Major > Attachments: > KAFKA-14362_Do_not_return_any_records_from_KafkaConsumer_poll()_while_rebalance_is_in_prog-1.patch, > > KAFKA-14362_Do_not_return_any_records_from_KafkaConsumer_poll()_while_rebalance_is_in_prog.patch > > > Trigger scenario: > Two Kafka client application instances on separate EC2 instances with one > consumer each, consuming from the same 8 partition topic using the same group > ID. Duplicate consumption of a handful of messages sometimes happens right > after one of the application instances has been restarted. > Additional information: > Messages are produced to the topic by a Kafka streams topology deployed on > four application instances. I have verified that each message is only > produced once by enabling debug logging in the topology flow right before > producing each message to the topic. > Example logs below are from a test run when a batch of 11 messages were > consumed at 10:28:26,771 on the restarted instance and 9 of them were > consumed as part of a larger batch at 10:28:23,824 on the other instance. > Application shutdown was initiated at 10:27:13,086 and completed at > 10:27:15,164, startup was initiated at 10:28:05,029 and completed at > 10:28:37,491. > Kafka consumer group logs after restart on the instance that was restarted: > > {code:java} > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata > [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, > groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:853] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Discovered group coordinator > b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: > 2147483646 rack: null) > 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:1000] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Request joining group due to: need to re-join with the given member-id > 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:595] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully joined group with generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > protocol='cooperative-sticky'} > 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:761] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully synced group in generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > protocol='cooperative-sticky'} > 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [ConsumerCoordinator.java:395] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Updating assignment with > Assigned partitions: [] > Current owned partitions: [] >
[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart
[ https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17638282#comment-17638282 ] Mikael commented on KAFKA-14362: Created KAFKA-14419 for the stream consumer duplication scenario. > Same message consumed by two consumers in the same group after client restart > -- > > Key: KAFKA-14362 > URL: https://issues.apache.org/jira/browse/KAFKA-14362 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.0.1, 2.1.1, 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.3, 2.7.2, > 2.8.2, 3.0.2, 3.1.2, 3.2.3 > Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64 >Reporter: Mikael >Priority: Major > Attachments: > KAFKA-14362_Do_not_return_any_records_from_KafkaConsumer_poll()_while_rebalance_is_in_prog-1.patch, > > KAFKA-14362_Do_not_return_any_records_from_KafkaConsumer_poll()_while_rebalance_is_in_prog.patch > > > Trigger scenario: > Two Kafka client application instances on separate EC2 instances with one > consumer each, consuming from the same 8 partition topic using the same group > ID. Duplicate consumption of a handful of messages sometimes happens right > after one of the application instances has been restarted. > Additional information: > Messages are produced to the topic by a Kafka streams topology deployed on > four application instances. I have verified that each message is only > produced once by enabling debug logging in the topology flow right before > producing each message to the topic. > Example logs below are from a test run when a batch of 11 messages were > consumed at 10:28:26,771 on the restarted instance and 9 of them were > consumed as part of a larger batch at 10:28:23,824 on the other instance. > Application shutdown was initiated at 10:27:13,086 and completed at > 10:27:15,164, startup was initiated at 10:28:05,029 and completed at > 10:28:37,491. > Kafka consumer group logs after restart on the instance that was restarted: > > {code:java} > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata > [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, > groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:853] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Discovered group coordinator > b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: > 2147483646 rack: null) > 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:1000] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Request joining group due to: need to re-join with the given member-id > 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:595] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully joined group with generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > protocol='cooperative-sticky'} > 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:761] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully synced group in generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > protocol='cooperative-sticky'} > 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [ConsumerCoordinator.java:395] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Updating assignment with > Assigned partitions: [] > Current owned partitions: [] > Added partitions (assigned - owned): [] > Revoked partitions (owned - assigned): [] > 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [ConsumerCoordinator.java:279] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Notifying assignor about the new Assignment(partitions=[])
[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart
[ https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17645541#comment-17645541 ] A. Sophie Blee-Goldman commented on KAFKA-14362: Nice find, I would not at all be surprised to find out there's a bug in how spring-kafka handles things (or doesn't). I saw that you filled in the Affects Versions but only up to 3.0, does that just mean you haven't tested beyond that point or does the issue go away? Or is that just the highest version of kafka available in spring right now? There have been many, many fixes and improvements to the rebalancing protocol since those older versions, and at least a few that I know of even since 3.0. I think we should focus on making sure there's no issue in the current/recent versions of Kafka (like with the Streams ticket you filed) and if there is, whether it's truly a bug in kafka or something that should be fixed in spring-kafka. Even it is the latter, that's really good to know and I'm happy to help review a patch for them although I can't merge over there > Same message consumed by two consumers in the same group after client restart > -- > > Key: KAFKA-14362 > URL: https://issues.apache.org/jira/browse/KAFKA-14362 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.0.1, 2.1.1, 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.3, 2.7.2, > 2.8.2, 3.0.2, 3.1.2, 3.2.3 > Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64 >Reporter: Mikael >Priority: Major > Attachments: > KAFKA-14362_Do_not_return_any_records_from_KafkaConsumer_poll()_while_rebalance_is_in_prog.patch > > > Trigger scenario: > Two Kafka client application instances on separate EC2 instances with one > consumer each, consuming from the same 8 partition topic using the same group > ID. Duplicate consumption of a handful of messages sometimes happens right > after one of the application instances has been restarted. > Additional information: > Messages are produced to the topic by a Kafka streams topology deployed on > four application instances. I have verified that each message is only > produced once by enabling debug logging in the topology flow right before > producing each message to the topic. > Example logs below are from a test run when a batch of 11 messages were > consumed at 10:28:26,771 on the restarted instance and 9 of them were > consumed as part of a larger batch at 10:28:23,824 on the other instance. > Application shutdown was initiated at 10:27:13,086 and completed at > 10:27:15,164, startup was initiated at 10:28:05,029 and completed at > 10:28:37,491. > Kafka consumer group logs after restart on the instance that was restarted: > > {code:java} > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata > [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, > groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:853] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Discovered group coordinator > b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: > 2147483646 rack: null) > 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:1000] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Request joining group due to: need to re-join with the given member-id > 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:595] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully joined group with generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > protocol='cooperative-sticky'} > 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:761] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully synced group in generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > protoc
[jira] [Commented] (KAFKA-14362) Same message consumed by two consumers in the same group after client restart
[ https://issues.apache.org/jira/browse/KAFKA-14362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17645580#comment-17645580 ] Mikael commented on KAFKA-14362: The spring-kafka defect is fixed in spring-kafka 3.0.1 and backported to 2.9.4. We are still using the attached patch though, as a workaround for KAFKA-14419 until that has been resolved. > Same message consumed by two consumers in the same group after client restart > -- > > Key: KAFKA-14362 > URL: https://issues.apache.org/jira/browse/KAFKA-14362 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.0.1, 2.1.1, 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.3, 2.7.2, > 2.8.2, 3.0.2, 3.1.2, 3.2.3 > Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64 >Reporter: Mikael >Priority: Major > Attachments: > KAFKA-14362_Do_not_return_any_records_from_KafkaConsumer_poll()_while_rebalance_is_in_prog.patch > > > Trigger scenario: > Two Kafka client application instances on separate EC2 instances with one > consumer each, consuming from the same 8 partition topic using the same group > ID. Duplicate consumption of a handful of messages sometimes happens right > after one of the application instances has been restarted. > Additional information: > Messages are produced to the topic by a Kafka streams topology deployed on > four application instances. I have verified that each message is only > produced once by enabling debug logging in the topology flow right before > producing each message to the topic. > Example logs below are from a test run when a batch of 11 messages were > consumed at 10:28:26,771 on the restarted instance and 9 of them were > consumed as part of a larger batch at 10:28:23,824 on the other instance. > Application shutdown was initiated at 10:27:13,086 and completed at > 10:27:15,164, startup was initiated at 10:28:05,029 and completed at > 10:28:37,491. > Kafka consumer group logs after restart on the instance that was restarted: > > {code:java} > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.Metadata > [Metadata.java:287] [Consumer clientId=consumer-xms-batch-mt-callback-3, > groupId=xms-batch-mt-callback] Cluster ID: B7Q1-xJpQW6EGbp35t2VnA > 2022-11-07 10:28:23,678 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:853] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Discovered group coordinator > b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: > 2147483646 rack: null) > 2022-11-07 10:28:23,688 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,736 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:1000] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Request joining group due to: need to re-join with the given member-id > 2022-11-07 10:28:23,737 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:535] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > (Re-)joining group > 2022-11-07 10:28:23,797 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:595] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully joined group with generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > protocol='cooperative-sticky'} > 2022-11-07 10:28:23,835 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [AbstractCoordinator.java:761] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Successfully synced group in generation Generation{generationId=676, > memberId='consumer-xms-batch-mt-callback-3-bfceacd5-99de-44d5-9a36-eae6e5d99243', > protocol='cooperative-sticky'} > 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [ConsumerCoordinator.java:395] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Updating assignment with > Assigned partitions: [] > Current owned partitions: [] > Added partitions (assigned - owned): [] > Revoked partitions (owned - assigned): [] > 2022-11-07 10:28:23,838 INFO [consumer-0-C-1] o.a.k.c.c.i.ConsumerCoordinator > [ConsumerCoordinator.java:279] [Consumer > clientId=consumer-xms-batch-mt-callback-3, groupId=xms-batch-mt-callback] > Notifying assignor about the new Assignm