Kamil Adam Nowak created KAFKA-18024:
----------------------------------------
Summary: ConcurrentModificationException in Kafka OffsetFetcher -
Proposal for Thread-Safety Fix
Key: KAFKA-18024
URL: https://issues.apache.org/jira/browse/KAFKA-18024
Project: Kafka
Issue Type: Bug
Reporter: Kamil Adam Nowak
Assignee: Kamil Adam Nowak
I am using Kafka (specifically kafka-clients.jar version 3.5.1).
I am experiencing an issue that, based on the code, has not been fixed in newer
versions.
During the working of my microservice, the following error appears:
{code}
[2024-11-12T10:57:37.997+02:00] [ERROR]
[myservice.kafka.consumer.handler.KafkaConsumerHandler]
[dataChangeEventConsumer_0] [] [] [null] [:
java.util.ConcurrentModificationException
at java.base/java.util.HashMap$HashIterator.remove(HashMap.java:1507)
at java.base/java.util.AbstractCollection.retainAll(AbstractCollection.java:420)
at
org.apache.kafka.clients.consumer.internals.OffsetFetcher$1.onSuccess(OffsetFetcher.java:228)
at
org.apache.kafka.clients.consumer.internals.OffsetFetcher$1.onSuccess(OffsetFetcher.java:223)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
at
org.apache.kafka.clients.consumer.internals.OffsetFetcher$4.onSuccess(OffsetFetcher.java:489)
at
org.apache.kafka.clients.consumer.internals.OffsetFetcher$4.onSuccess(OffsetFetcher.java:480)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
at
org.apache.kafka.clients.consumer.internals.OffsetFetcher.handleListOffsetResponse(OffsetFetcher.java:665)
at
org.apache.kafka.clients.consumer.internals.OffsetFetcher.access$1000(OffsetFetcher.java:67)
at
org.apache.kafka.clients.consumer.internals.OffsetFetcher$5.onSuccess(OffsetFetcher.java:572)
at
org.apache.kafka.clients.consumer.internals.OffsetFetcher$5.onSuccess(OffsetFetcher.java:567)
at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:617)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:427)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:312)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:321)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1460)
]
{code}
These are new logs from 2024-11-12, but I had the same issue in another
microservice a few months ago, on 2024-07-05:
{code}
2024-07-05 01:11:11,341 [ERROR]
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
[kafka-coordinator-heartbeat-thread | myServiceResult_consumer] [] [] [Consumer
clientId=myServiceResult_consumer_0, groupId=myServiceResult_consumer]
Heartbeat thread failed due to unexpected error:
java.util.ConcurrentModificationException
at java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1493)
at java.base/java.util.HashMap$KeyIterator.next(HashMap.java:1516)
at java.base/java.util.AbstractCollection.retainAll(AbstractCollection.java:419)
at
org.apache.kafka.clients.consumer.internals.OffsetFetcher$1.onSuccess(OffsetFetcher.java:228)
at
org.apache.kafka.clients.consumer.internals.OffsetFetcher$1.onSuccess(OffsetFetcher.java:223)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
at
org.apache.kafka.clients.consumer.internals.OffsetFetcher$4.onSuccess(OffsetFetcher.java:489)
at
org.apache.kafka.clients.consumer.internals.OffsetFetcher$4.onSuccess(OffsetFetcher.java:480)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
at
org.apache.kafka.clients.consumer.internals.OffsetFetcher.handleListOffsetResponse(OffsetFetcher.java:665)
at
org.apache.kafka.clients.consumer.internals.OffsetFetcher.access$1000(OffsetFetcher.java:67)
at
org.apache.kafka.clients.consumer.internals.OffsetFetcher$5.onSuccess(OffsetFetcher.java:572)
at
org.apache.kafka.clients.consumer.internals.OffsetFetcher$5.onSuccess(OffsetFetcher.java:567)
at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:617)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:427)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:312)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:321)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1460)
{code}
It is the same error, just from a different place. But the source is the same:
ConcurrentModificationException from OffsetFetcher:
https://github.com/apache/kafka/blob/3.5.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L228
{code}
...
219 Map<TopicPartition, Long> remainingToSearch = new
HashMap<>(timestampsToSearch);
220 do {
221 RequestFuture<ListOffsetResult> future =
sendListOffsetsRequests(remainingToSearch, requireTimestamps);
222
223 future.addListener(new RequestFutureListener<ListOffsetResult>()
{
224 @Override
225 public void onSuccess(ListOffsetResult value) {
226 synchronized (future) {
227 result.fetchedOffsets.putAll(value.fetchedOffsets);
228
remainingToSearch.keySet().retainAll(value.partitionsToRetry);
...
{code}
So the issue is the call:
{code}
remainingToSearch.keySet().retainAll(value.partitionsToRetry);
{code}
on `remainingToSearch`, which is a regular `HashMap`. For some reason,
concurrent modification is possible here. The `.keySet()` returns a simple
`Set`, which is not thread-safe.
I believe it would be good to make the code thread-safe to prevent this error.
My proposal is to replace:
{code}
Map<TopicPartition, Long> remainingToSearch = new HashMap<>(timestampsToSearch);
{code}
with:
{code}
Map<TopicPartition, Long> remainingToSearch = new
ConcurrentHashMap<>(timestampsToSearch);
{code}
`ConcurrentHashMap` returns a thread-safe `KeySetView` when `.keySet()` is
called.
In newer versions of kafka-clients.jar (OffsetFetcher.java), this behavior
remains the same (as in my used version 3.5.1), so the same error will occur.
Therefore, I suggest applying this fix starting from the latest version.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)