[ https://issues.apache.org/jira/browse/KAFKA-14419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
A. Sophie Blee-Goldman updated KAFKA-14419: ------------------------------------------- Summary: Failed SyncGroup leading to partitions lost due to processing during rebalances (was: Same message consumed again by the same stream task after partition is lost and reassigned) > Failed SyncGroup leading to partitions lost due to processing during > rebalances > ------------------------------------------------------------------------------- > > Key: KAFKA-14419 > URL: https://issues.apache.org/jira/browse/KAFKA-14419 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.3.1 > Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64 > Reporter: Mikael > Priority: Major > > Trigger scenario: > Four Kafka client application instances on separate EC2 instances with a > total of 8 active and 8 standby stream tasks for the same stream topology, > consuming from an input topic with 8 partitions. Sometimes a handful of > messages are consumed twice by one of the stream tasks when stream tasks on > another application instance join the consumer group after an application > instance restart. > Additional information: > Messages are produced to the topic by another Kafka streams topology deployed > on the same four application instances. I have verified that each message is > only produced once by enabling debug logging in the topology flow right > before producing each message to the topic. > Logs from stream thread with duplicate consumption: > > {code:java} > 2022-11-21 15:09:33,677 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Request joining group due to: group is > already rebalancing > 2022-11-21 15:09:33,677 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:566] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] (Re-)joining group > Input records consumed for the first time > 2022-11-21 15:09:33,919 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Successfully joined group with > generation Generation{generationId=8017, > memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', > protocol='stream'} > 2022-11-21 15:09:33,920 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:826] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] SyncGroup failed: The group began > another rebalance. Need to re-join the group. Sent generation was > Generation{generationId=8017, > memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', > protocol='stream'} > 2022-11-21 15:09:33,922 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1019] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Resetting generation due to: > encountered REBALANCE_IN_PROGRESS from SYNC_GROUP response > 2022-11-21 15:09:33,922 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Request joining group due to: > encountered REBALANCE_IN_PROGRESS from SYNC_GROUP response > 2022-11-21 15:09:33,923 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:819] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Giving away all assigned partitions as > lost since generation/memberID has been reset,indicating that consumer is in > old state or no longer part of the group > 2022-11-21 15:09:33,923 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:354] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Lost previously assigned partitions > messages.xms.mt.batch.enqueue.sms-1 > 2022-11-21 15:09:33,923 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.s.p.i.StreamThread [StreamsRebalanceListener.java:104] stream-thread > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > at state RUNNING: partitions [messages.xms.mt.batch.enqueue.sms-1] lost due > to missed rebalance. > lost active tasks: [0_1] > lost assigned standby tasks: [] > 2022-11-21 15:09:33,941 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.s.p.i.StreamTask [StreamTask.java:1220] stream-thread > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > task [0_1] Suspended RUNNING > 2022-11-21 15:09:33,941 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.s.p.i.StreamTask [StreamTask.java:295] stream-thread > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > task [0_1] Suspended running > 2022-11-21 15:09:33,941 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.KafkaConsumer [KafkaConsumer.java:1082] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-restore-consumer, > groupId=null] Unsubscribed all topics or patterns and assigned partitions > 2022-11-21 15:09:33,942 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.s.p.i.RecordCollectorImpl [RecordCollectorImpl.java:333] stream-thread > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > task [0_1] Closing record collector dirty > 2022-11-21 15:09:33,942 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.s.p.i.StreamTask [StreamTask.java:537] stream-thread > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > task [0_1] Closed dirty > 2022-11-21 15:09:33,942 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.s.p.i.StreamThread [StreamsRebalanceListener.java:117] stream-thread > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > partitions lost took 19 ms. > 2022-11-21 15:09:33,942 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Request joining group due to: rebalance > failed due to 'The group is rebalancing, so a rejoin is needed.' > (RebalanceInProgressException) > 2022-11-21 15:09:33,942 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:566] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] (Re-)joining group > 2022-11-21 15:09:35,391 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Successfully joined group with > generation Generation{generationId=8018, > memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', > protocol='stream'} > 2022-11-21 15:09:35,395 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:802] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Successfully synced group in generation > Generation{generationId=8018, > memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', > protocol='stream'} > 2022-11-21 15:09:35,396 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:428] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Updating assignment with > Assigned partitions: > [messages.xms.mt.batch.enqueue.sms-1] > Current owned partitions: [] > Added partitions (assigned - owned): > [messages.xms.mt.batch.enqueue.sms-1] > Revoked partitions (owned - assigned): [] > 2022-11-21 15:09:35,396 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:300] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Notifying assignor about the new > Assignment(partitions=[messages.xms.mt.batch.enqueue.sms-1], userDataSize=52) > 2022-11-21 15:09:35,396 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.s.p.i.StreamsPartitionAssignor [StreamsPartitionAssignor.java:1361] > stream-thread > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer] > No followup rebalance was requested, resetting the rebalance schedule. > 2022-11-21 15:09:35,396 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.s.p.i.TaskManager [TaskManager.java:273] stream-thread > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > Handle new assignment with: > New active tasks: [0_1] > New standby tasks: [] > Existing active tasks: [] > Existing standby tasks: [] > 2022-11-21 15:09:35,396 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:312] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Adding newly assigned partitions: > messages.xms.mt.batch.enqueue.sms-1 > 2022-11-21 15:09:35,396 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.s.p.i.StreamThread [StreamThread.java:234] stream-thread > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > State transition from RUNNING to PARTITIONS_ASSIGNED > 2022-11-21 15:09:35,398 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:968] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Setting offset for partition > messages.xms.mt.batch.enqueue.sms-1 to the committed offset > FetchPosition{offset=26744389, offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=Optional[b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 > (id: 1 rack: use1-az6)], epoch=19}} > 2022-11-21 15:09:35,444 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.s.p.i.StreamTask [StreamTask.java:235] stream-thread > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > task [0_1] Initialized > 2022-11-21 15:09:35,445 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.s.p.i.StreamTask [StreamTask.java:260] stream-thread > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > task [0_1] Restored and ready to run > 2022-11-21 15:09:35,445 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.s.p.i.StreamThread [StreamThread.java:866] stream-thread > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > Restoration took 49 ms for all tasks [0_1] > 2022-11-21 15:09:35,445 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.s.p.i.StreamThread [StreamThread.java:234] stream-thread > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > State transition from PARTITIONS_ASSIGNED to RUNNING > 22022-11-21 15:09:35,446 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.s.KafkaStreams [KafkaStreams.java:342] stream-client > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b] State > transition from REBALANCING to RUNNING > 2022-11-21 15:09:35,446 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.KafkaConsumer [KafkaConsumer.java:2270] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Requesting the log end offset for > messages.xms.mt.batch.enqueue.sms-1 in order to compute lag > Same input records consumed for the second time{code} > Streams consumer configuration: > {noformat} > allow.auto.create.topics = false > auto.commit.interval.ms = 5000 > auto.offset.reset = earliest > bootstrap.servers = > [b-3.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094, > b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094, > b-2.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094] > check.crcs = true > client.dns.lookup = use_all_dns_ips > client.id = > messages.xms.mms.mt-05bfc9d3-7f4b-48d4-9c8c-cf9d3e496fef-StreamThread-1-consumer > client.rack = > connections.max.idle.ms = 540000 > default.api.timeout.ms = 60000 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = messages.xms.mms.mt > group.instance.id = null > heartbeat.interval.ms = 1500 > interceptor.classes = [] > internal.leave.group.on.close = true > internal.throw.on.fetch.stable.offset.unsupported = false > isolation.level = read_committed > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 300000 > max.poll.records = 1000 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partition.assignment.strategy = > [org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 30000 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.connect.timeout.ms = null > sasl.login.read.timeout.ms = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.login.retry.backoff.max.ms = 10000 > sasl.login.retry.backoff.ms = 100 > sasl.mechanism = GSSAPI > sasl.oauthbearer.clock.skew.seconds = 30 > sasl.oauthbearer.expected.audience = null > sasl.oauthbearer.expected.issuer = null > sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000 > sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000 > sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100 > sasl.oauthbearer.jwks.endpoint.url = null > sasl.oauthbearer.scope.claim.name = scope > sasl.oauthbearer.sub.claim.name = sub > sasl.oauthbearer.token.endpoint.url = null > security.protocol = SSL > security.providers = null > send.buffer.bytes = 131072 > session.timeout.ms = 6000 > socket.connection.setup.timeout.max.ms = 30000 > socket.connection.setup.timeout.ms = 10000 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.3] > ssl.endpoint.identification.algorithm = https > ssl.engine.factory.class = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.certificate.chain = null > ssl.keystore.key = null > ssl.keystore.location = /opt/apps/msl/xms-gateway/conf/xms.us1tst.jks > ssl.keystore.password = [hidden] > ssl.keystore.type = JKS > ssl.protocol = TLSv1.3 > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.certificates = null > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > value.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer{noformat} > > The message about lost partition that is highlighted in red above only occurs > when messages are consumed twice, which happens roughly two times out of ten > in my application restart test scenario. > This issue no longer occurs when the patch suggested in KAFKA-14362 is > applied. -- This message was sent by Atlassian Jira (v8.20.10#820010)