James,

If I were you I would start investigating what is causing this network drops between your cluster and your consumers. The following messages are some indications of this:

* "Offset commit failed on partition MyTopic-53 at offset 957: The request *timed out*."

* "Caused by: org.apache.kafka.common.errors.*DisconnectException*"

* "Group coordinator b-2.redacted.amazonaws.com:9094<http://2.redacted.amazonaws.com:9094> (id: redacted rack: null) *is unavailable* or invalid, will attempt rediscovery"

In Kafka, the group coordinator is one of the brokers that receives heartbeats and pull requests from consumers. Heartbeats are used to detect when a consumer is no longer available; whereas pull requests are literally the pull requests sent by consumers. Regardless, when no heartbeats are detected from a given period the group coordinator consider the consumer dead and triggers and rebalance where the partitions will be reassigned. If the group coordinator is no longer available (as described in one of the error messages) then this whole process becomes stale.

Moreover, `commitAsync()` calls as the name implies are asynchronous and doesn't block the consumer thread until an response is sent from the cluster. However, if this response never comes then it will count towards the amount of time specified in the property `max.poll.interval.ms` which if maxed out will trigger the consumer to leave the consumer group. Again, it all boils down to how fast the network is enabling all of this without taking to much time.

Since you are using AWS MSK then you can use AWS native tools (such as CloudWatch, VPC logs, and the AWSSupport-SetupIPMonitoringFromVPC) to better troubleshoot these networking issues. I would also file a support ticket against the MSK service since some of these networking issues has to do with one of the brokers being unavailable -- something that is not supposed to happen.

Thanks,

-- Ricardo

On 6/18/20 9:18 PM, James Olsen wrote:
We are using AWS MSK with Kafka 2.4.1 (and same client version), 3 Brokers.  We 
are seeing fairly frequent consumer offset commit fails as shown in the example 
logs below.  Things continue working as they are all retriable, however I would 
like to improve this situation.

The issue occurs most often on the Consumer processing our busiest partition 
(MyTopic-50 in the case below).

We are using KafkaConsumer::commitAsync to manage the offsets and calling it 
after processing all the records in a given poll - probably mostly one message 
per poll and around 10 messages per second.  Doesn't seem like a heavy load and 
the consumer itself is keeping up fine.

The consumer is processing 10 Partitions on the Topic, most of which have not 
changed, e.g. in the logs below the first message refers to MyTopic-53 at 
offset 957, which actually hadn't changed for several minutes.

I note the the standard auto-commit-offsets functionality throttles the commit 
to once every 5 seconds by default.

Are we expecting too much to do commitAsync each time as we do?  We could build 
in a throttling like auto-commit does.
Is it possible that the unchanged partition offsets that commitAsync sends is 
creating unnecessary load? We could use the version of commitAsync that takes 
the map of offsets and only commit the ones we know have changed.
Does auto-commit already optimise to send only changed offsets?  If so we could 
consider switching to auto-commit.

Any advice or thoughts on the best option is appreciated.

Example logs...

2020-06-18 23:53:01,225 WARN  
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 
'pool-5-thread-4' [Consumer clientId=consumer-MyTopicService-group-4, 
groupId=MyTopicService-group] Offset commit failed on partition MyTopic-53 at 
offset 957: The request timed out.

2020-06-18 23:53:01,225 INFO  
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator] 'pool-5-thread-4' 
[Consumer clientId=consumer-MyTopicService-group-4, groupId=MyTopicService-group] 
Group coordinator 
b-2.redacted.amazonaws.com:9094<http://2.redacted.amazonaws.com:9094> (id: 
redacted rack: null) is unavailable or invalid, will attempt rediscovery

2020-06-18 23:53:01,225 ERROR 
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 
'pool-5-thread-4' [Consumer clientId=consumer-MyTopicService-group-4, 
groupId=MyTopicService-group] Offset commit with offsets 
{MyTopic-48=OffsetAndMetadata{offset=615, leaderEpoch=1, metadata=''}, 
MyTopic-50=OffsetAndMetadata{offset=131419049, leaderEpoch=1, metadata=''}, 
MyTopic-49=OffsetAndMetadata{offset=937, leaderEpoch=2, metadata=''}, 
MyTopic-52=OffsetAndMetadata{offset=934, leaderEpoch=2, metadata=''}, 
MyTopic-51=OffsetAndMetadata{offset=969, leaderEpoch=1, metadata=''}, 
MyTopic-54=OffsetAndMetadata{offset=779, leaderEpoch=1, metadata=''}, 
MyTopic-53=OffsetAndMetadata{offset=957, leaderEpoch=1, metadata=''}, 
MyTopic-55=OffsetAndMetadata{offset=514, leaderEpoch=2, metadata=''}} failed: 
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.TimeoutException: The request timed 
out.

2020-06-18 23:53:01,353 INFO  
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator] 'pool-5-thread-4' 
[Consumer clientId=consumer-MyTopicService-group-4, groupId=MyTopicService-group] 
Discovered group coordinator 
b-2.redacted.amazonaws.com:9094<http://b-2.redacted.amazonaws.com:9094> (id: 
redacted rack: null)

2020-06-18 23:53:01,355 ERROR 
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 
'pool-5-thread-4' [Consumer clientId=consumer-MyTopicService-group-4, 
groupId=MyTopicService-group] Offset commit with offsets 
{MyTopic-48=OffsetAndMetadata{offset=615, leaderEpoch=1, metadata=''}, 
MyTopic-50=OffsetAndMetadata{offset=131419050, leaderEpoch=1, metadata=''}, 
MyTopic-49=OffsetAndMetadata{offset=937, leaderEpoch=2, metadata=''}, 
MyTopic-52=OffsetAndMetadata{offset=934, leaderEpoch=2, metadata=''}, 
MyTopic-51=OffsetAndMetadata{offset=969, leaderEpoch=1, metadata=''}, 
MyTopic-54=OffsetAndMetadata{offset=779, leaderEpoch=1, metadata=''}, 
MyTopic-53=OffsetAndMetadata{offset=957, leaderEpoch=1, metadata=''}, 
MyTopic-55=OffsetAndMetadata{offset=514, leaderEpoch=2, metadata=''}} failed: 
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.DisconnectException

2020-06-18 23:53:01,355 ERROR 
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 
'pool-5-thread-4' [Consumer clientId=consumer-MyTopicService-group-4, 
groupId=MyTopicService-group] Offset commit with offsets 
{MyTopic-48=OffsetAndMetadata{offset=615, leaderEpoch=1, metadata=''}, 
MyTopic-50=OffsetAndMetadata{offset=131419051, leaderEpoch=1, metadata=''}, 
MyTopic-49=OffsetAndMetadata{offset=937, leaderEpoch=2, metadata=''}, 
MyTopic-52=OffsetAndMetadata{offset=934, leaderEpoch=2, metadata=''}, 
MyTopic-51=OffsetAndMetadata{offset=969, leaderEpoch=1, metadata=''}, 
MyTopic-54=OffsetAndMetadata{offset=779, leaderEpoch=1, metadata=''}, 
MyTopic-53=OffsetAndMetadata{offset=957, leaderEpoch=1, metadata=''}, 
MyTopic-55=OffsetAndMetadata{offset=514, leaderEpoch=2, metadata=''}} failed: 
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.DisconnectException

... plus many more with increasing 50=OffsetAndMetadata{offset=xxx ...} (the 
busy Partition).

Thanks, James.

Reply via email to