Hi Team,
We are experiencing some odd behaviour of CooperativeStickyAssignor - during
rebalancing of partitions a consumer receives new partition and keeps old ones.
Still there is CommitFailedException received for one of the old partitions and
the offsets are not retried, we receive directly the next batch and thus
skipping some of the offsets.Partition in question is receivedPartition: [27]
and as you can see offset 21551049 is received, then the exception and next
received offset is 21551051. How can we handle this? Logs:
INFO;com.rewe.digital.mystique.kafka.consumer.listener.StringKafkaListener;Received
kafka message for topic: [picking-job-v1], key:
[d52e171e-7a53-4e8c-8f4a-220d8e493237], offset: [21551051], receivedPartition:
[27], time: 1725975726868, publishedOn:
1725975726848;INFO;org.springframework.kafka.listener.KafkaMessageListenerContainer;productrecall-pickingjob-message-consumer:
partitions assigned:
[picking-job-v1-20];INFO;com.rewe.digital.mystique.kafka.consumer.listener.StringKafkaListener;onPartitionsAssigned
was called for topic:
picking-job-v1;INFO;org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;[Consumer
clientId=consumer-productrecall-pickingjob-message-consumer-34,
groupId=productrecall-pickingjob-message-consumer] Setting offset for partition
picking-job-v1-20 to the committed offset FetchPosition{offset=21471231,
offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=Optional[broker-12.ffp-prd.kafka-private.rewe.cloud:9093
(id: 12 rack: europe-west1-d)],
epoch=408}};INFO;org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;[Consumer
clientId=consumer-productrecall-pickingjob-message-consumer-34,
groupId=productrecall-pickingjob-message-consumer] Adding newly assigned
partitions:
picking-job-v1-20;INFO;org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;[Consumer
clientId=consumer-productrecall-pickingjob-message-consumer-34,
groupId=productrecall-pickingjob-message-consumer] Notifying assignor about the
new Assignment(partitions=[picking-job-v1-27, picking-job-v1-37,
picking-job-v1-44, picking-job-v1-51,
picking-job-v1-20]);INFO;org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;"[Consumer
clientId=consumer-productrecall-pickingjob-message-consumer-34,
groupId=productrecall-pickingjob-message-consumer] Updating assignment
withAssigned partitions: [picking-job-v1-20,
picking-job-v1-27, picking-job-v1-37, picking-job-v1-44,
picking-job-v1-51]Current owned partitions:
[picking-job-v1-27, picking-job-v1-37, picking-job-v1-44,
picking-job-v1-51]Added partitions (assigned - owned):
[picking-job-v1-20]Revoked partitions (owned - assigned):
[]";ERROR;org.springframework.kafka.listener.KafkaMessageListenerContainer;"Consumer
exception: java.lang.IllegalStateException: This error handler cannot process
'org.apache.kafka.clients.consumer.CommitFailedException's; no record
information is availableat
org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:198)at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1966)at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1382)at
java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(Unknown
Source)at java.base/java.lang.Thread.run(Unknown Source)Caused by:
org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot
be completed since the consumer is not part of an active group for auto
partition assignment; it is likely that the consumer was kicked out of the
group.at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1351)at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1188)at
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1450)at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:3300)at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:3295)at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.checkRebalanceCommits(KafkaMessageListenerContainer.java:1710)at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1658)at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1439)at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1330)...
2 common frames
omitted";INFO;org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;[Consumer
clientId=consumer-productrecall-pickingjob-message-consumer-34,
groupId=productrecall-pickingjob-message-consumer] Failing OffsetCommit request
since the consumer is not part of an active
group;INFO;org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;[Consumer
clientId=consumer-productrecall-pickingjob-message-consumer-34,
groupId=productrecall-pickingjob-message-consumer] Successfully joined group
with generation Generation{generationId=2222,
memberId='consumer-productrecall-pickingjob-message-consumer-34-50a678c9-ccd6-4610-b606-f31093711e48',
protocol='cooperative-sticky'};INFO;org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;[Consumer
clientId=consumer-productrecall-pickingjob-message-consumer-34,
groupId=productrecall-pickingjob-message-consumer] Failing OffsetCommit request
since the consumer is not part of an active
group;INFO;com.rewe.digital.mystique.kafka.consumer.listener.StringKafkaListener;Received
kafka message for topic: [picking-job-v1], key:
[a4ef4bf4-0028-4f32-9ffa-c78b44886167], offset: [21551049], receivedPartition:
[27], time: 1725975724934, publishedOn:
1725975724734;INFO;org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;[Consumer
clientId=consumer-productrecall-pickingjob-message-consumer-34,
groupId=productrecall-pickingjob-message-consumer] Failing OffsetCommit request
since the consumer is not part of an active
group;INFO;org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;[Consumer
clientId=consumer-productrecall-pickingjob-message-consumer-34,
groupId=productrecall-pickingjob-message-consumer] Failing OffsetCommit request
since the consumer is not part of an active group;