GitHub user slaunay opened a pull request: https://github.com/apache/kafka/pull/3907
KAFKA-4950: Fix ConcurrentModificationException on assigned-partitions metric Code change: - prevent `java.util.ConcurrentModificationException` being thrown when fetching the consumer coordinator assigned-partitions metric value from a `MetricsReporter` (e.g. a reporter exporting metrics periodically running in a separate thread) because of a race condition by using a volatile field for storing the number of assigned partitions: ``` java.util.ConcurrentModificationException: null at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) at java.util.AbstractCollection.addAll(AbstractCollection.java:343) at java.util.HashSet.<init>(HashSet.java:119) at org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66) at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:293) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:880) ... at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` - new unit test to reproduce the issue and detect potential future regression I am using a volatile field on `SubscriptionState` rather than changing the `PartitionStates.map` field to some thread safe `LinkedHashMap` alternative to avoid bringing an unnecessary concurrent structure to other components relying on `PartitionStates`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/slaunay/kafka bugfix/KAFKA-4950-cme-assigned-partitions-metric Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3907.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3907 ---- commit 3787ac4c6070d0d4271cac27d9b43b2fdbe8b78f Author: Sebastien Launay <sebast...@opendns.com> Date: 2017-09-15T23:26:44Z KAFKA-4950; Fix CME on assigned-partitions metric - prevent java.util.ConcurrentModificationException being thrown when fetching the consumer coordinator assigned-partitions metric value from a MetricsReporter (e.g. a reporter exporting metrics periodically running in a separate thread) because of a race condition by using a volatile field for storing the number of assigned partitions: java.util.ConcurrentModificationException: null at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) at java.util.AbstractCollection.addAll(AbstractCollection.java:343) at java.util.HashSet.<init>(HashSet.java:119) at org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66) at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:293) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:880) ... at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) - new unit test to reproduce the issue and detect potential future regression ---- ---