[ https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468540#comment-16468540 ]
Chia-Ping Tsai commented on KAFKA-6870: --------------------------------------- [~rsivaram] Could I take over this issue? > Concurrency conflicts in SampledStat > ------------------------------------ > > Key: KAFKA-6870 > URL: https://issues.apache.org/jira/browse/KAFKA-6870 > Project: Kafka > Issue Type: Bug > Reporter: Chia-Ping Tsai > Priority: Major > Fix For: 2.0.0, 1.1.1 > > > The samples stored in SampledStat is not thread-safe. However, > ReplicaFetcherThreads used to handle replica to specified brokers may update > (when the samples is empty, we will add a new sample to it) and iterate the > samples concurrently, and then cause the ConcurrentModificationException. > {code:java} > [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, > fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76) > java.util.ConcurrentModificationException > at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) > at java.util.ArrayList$Itr.next(ArrayList.java:859) > at > org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132) > at > org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78) > at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66) > at > org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192) > at > kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104) > at > kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {code} > Before > [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35] > the ConcurrentModificationException doesn't exist since all changes to > samples is "add" currently. Using the get(index) is able to avoid the > ConcurrentModificationException. > In short, we can just make samples thread-safe. Or just replace the foreach > loop by get(index) if we have concerns about the performance of thread-safe > list... > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)