[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat
[ https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16498412#comment-16498412 ] Kevin Lafferty commented on KAFKA-6870: --- This depends on [KAFKA-6765|https://issues.apache.org/jira/browse/KAFKA-6765] so that would have to be backported to 1.0 as well. [~rsivaram] > Concurrency conflicts in SampledStat > > > Key: KAFKA-6870 > URL: https://issues.apache.org/jira/browse/KAFKA-6870 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > Fix For: 2.0.0, 1.1.1 > > > The samples stored in SampledStat is not thread-safe. However, > ReplicaFetcherThreads used to handle replica to specified brokers may update > (when the samples is empty, we will add a new sample to it) and iterate the > samples concurrently, and then cause the ConcurrentModificationException. > {code:java} > [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, > fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76) > java.util.ConcurrentModificationException > at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) > at java.util.ArrayList$Itr.next(ArrayList.java:859) > at > org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132) > at > org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78) > at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66) > at > org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192) > at > kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104) > at > kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {code} > Before > [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35] > the ConcurrentModificationException doesn't exist since all changes to > samples is "add" currently. Using the get(index) is able to avoid the > ConcurrentModificationException. > In short, we can just make samples thread-safe. Or just replace the foreach > loop by get(index) if we have concerns about the performance of thread-safe > list... > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat
[ https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16497450#comment-16497450 ] Chia-Ping Tsai commented on KAFKA-6870: --- {quote}Assuming there will be a 1.0.2, can this fix be included there? {quote} I'm happy to backport this fix to 1.0.2. Should I file a PR against the branch 1.0? > Concurrency conflicts in SampledStat > > > Key: KAFKA-6870 > URL: https://issues.apache.org/jira/browse/KAFKA-6870 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > Fix For: 2.0.0, 1.1.1 > > > The samples stored in SampledStat is not thread-safe. However, > ReplicaFetcherThreads used to handle replica to specified brokers may update > (when the samples is empty, we will add a new sample to it) and iterate the > samples concurrently, and then cause the ConcurrentModificationException. > {code:java} > [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, > fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76) > java.util.ConcurrentModificationException > at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) > at java.util.ArrayList$Itr.next(ArrayList.java:859) > at > org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132) > at > org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78) > at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66) > at > org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192) > at > kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104) > at > kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {code} > Before > [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35] > the ConcurrentModificationException doesn't exist since all changes to > samples is "add" currently. Using the get(index) is able to avoid the > ConcurrentModificationException. > In short, we can just make samples thread-safe. Or just replace the foreach > loop by get(index) if we have concerns about the performance of thread-safe > list... > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat
[ https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16497222#comment-16497222 ] Kevin Lafferty commented on KAFKA-6870: --- Assuming there will be a 1.0.2, can this fix be included there? > Concurrency conflicts in SampledStat > > > Key: KAFKA-6870 > URL: https://issues.apache.org/jira/browse/KAFKA-6870 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > Fix For: 2.0.0, 1.1.1 > > > The samples stored in SampledStat is not thread-safe. However, > ReplicaFetcherThreads used to handle replica to specified brokers may update > (when the samples is empty, we will add a new sample to it) and iterate the > samples concurrently, and then cause the ConcurrentModificationException. > {code:java} > [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, > fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76) > java.util.ConcurrentModificationException > at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) > at java.util.ArrayList$Itr.next(ArrayList.java:859) > at > org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132) > at > org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78) > at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66) > at > org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192) > at > kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104) > at > kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {code} > Before > [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35] > the ConcurrentModificationException doesn't exist since all changes to > samples is "add" currently. Using the get(index) is able to avoid the > ConcurrentModificationException. > In short, we can just make samples thread-safe. Or just replace the foreach > loop by get(index) if we have concerns about the performance of thread-safe > list... > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat
[ https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16470241#comment-16470241 ] ASF GitHub Bot commented on KAFKA-6870: --- rajinisivaram closed pull request #4985: KAFKA-6870 Concurrency conflicts in SampledStat URL: https://github.com/apache/kafka/pull/4985 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java index f04981aee39..48999e1057d 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java @@ -55,9 +55,7 @@ public MetricName metricName() { @Override @Deprecated public double value() { -synchronized (this.lock) { -return measurableValue(time.milliseconds()); -} +return measurableValue(time.milliseconds()); } @Override @@ -81,10 +79,12 @@ public Measurable measurable() { } double measurableValue(long timeMs) { -if (this.metricValueProvider instanceof Measurable) -return ((Measurable) metricValueProvider).measure(config, timeMs); -else -return 0; +synchronized (this.lock) { +if (this.metricValueProvider instanceof Measurable) +return ((Measurable) metricValueProvider).measure(config, timeMs); +else +return 0; +} } public void config(MetricConfig config) { diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java index a1c0814c343..8e3dfeb3b9f 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java @@ -19,19 +19,29 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.metrics.stats.Sum; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.junit.Test; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -124,4 +134,56 @@ public void testIdempotentAdd() { assertEquals(1, sensor.metrics().size()); assertEquals(org.apache.kafka.common.metrics.stats.Avg.class, sensor.metrics().get(0).measurable().getClass()); } + +/** + * The Sensor#checkQuotas should be thread-safe since the method may be used by many ReplicaFetcherThreads. + */ +@Test +public void testCheckQuotasInMultiThreads() throws InterruptedException, ExecutionException { +final Metrics metrics = new Metrics(new MetricConfig().quota(Quota.upperBound(Double.MAX_VALUE)) +// decreasing the value of time window make SampledStat always record the given value +.timeWindow(1, TimeUnit.MILLISECONDS) +// increasing the value of samples make SampledStat store more samples +.samples(100)); +final Sensor sensor = metrics.sensor("sensor"); + +assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), new Rate())); +final int threadCount = 10; +final CountDownLatch latch = new CountDownLatch(1); +ExecutorService service = Executors.newFixedThreadPool(threadCount); +List> workers = new ArrayList<>(threadCount); +boolean needShutdown = true; +try { +for (int i = 0; i != threadCount; ++i) { +final int index = i; +workers.add(service.submit(new Callable() { +@Override +public Throwable call() { +try { +assertTrue(latch.await(5, TimeUnit.SECONDS)); +for (int j
[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat
[ https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468587#comment-16468587 ] ASF GitHub Bot commented on KAFKA-6870: --- chia7712 opened a new pull request #4985: KAFKA-6870 Concurrency conflicts in SampledStat URL: https://github.com/apache/kafka/pull/4985 see [KAFKA-6870](https://issues.apache.org/jira/browse/KAFKA-6870) for more details. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Concurrency conflicts in SampledStat > > > Key: KAFKA-6870 > URL: https://issues.apache.org/jira/browse/KAFKA-6870 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > Fix For: 2.0.0, 1.1.1 > > > The samples stored in SampledStat is not thread-safe. However, > ReplicaFetcherThreads used to handle replica to specified brokers may update > (when the samples is empty, we will add a new sample to it) and iterate the > samples concurrently, and then cause the ConcurrentModificationException. > {code:java} > [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, > fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76) > java.util.ConcurrentModificationException > at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) > at java.util.ArrayList$Itr.next(ArrayList.java:859) > at > org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132) > at > org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78) > at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66) > at > org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192) > at > kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104) > at > kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {code} > Before > [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35] > the ConcurrentModificationException doesn't exist since all changes to > samples is "add" currently. Using the get(index) is able to avoid the > ConcurrentModificationException. > In short, we can just make samples thread-safe. Or just replace the foreach > loop by get(index) if we have concerns about the performance of thread-safe > list... > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat
[ https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468575#comment-16468575 ] Chia-Ping Tsai commented on KAFKA-6870: --- Thanks! [~rsivaram] > Concurrency conflicts in SampledStat > > > Key: KAFKA-6870 > URL: https://issues.apache.org/jira/browse/KAFKA-6870 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > Fix For: 2.0.0, 1.1.1 > > > The samples stored in SampledStat is not thread-safe. However, > ReplicaFetcherThreads used to handle replica to specified brokers may update > (when the samples is empty, we will add a new sample to it) and iterate the > samples concurrently, and then cause the ConcurrentModificationException. > {code:java} > [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, > fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76) > java.util.ConcurrentModificationException > at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) > at java.util.ArrayList$Itr.next(ArrayList.java:859) > at > org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132) > at > org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78) > at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66) > at > org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192) > at > kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104) > at > kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {code} > Before > [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35] > the ConcurrentModificationException doesn't exist since all changes to > samples is "add" currently. Using the get(index) is able to avoid the > ConcurrentModificationException. > In short, we can just make samples thread-safe. Or just replace the foreach > loop by get(index) if we have concerns about the performance of thread-safe > list... > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat
[ https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468572#comment-16468572 ] Rajini Sivaram commented on KAFKA-6870: --- I have added you as Contributor and assigned the Jira to you. > Concurrency conflicts in SampledStat > > > Key: KAFKA-6870 > URL: https://issues.apache.org/jira/browse/KAFKA-6870 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > Fix For: 2.0.0, 1.1.1 > > > The samples stored in SampledStat is not thread-safe. However, > ReplicaFetcherThreads used to handle replica to specified brokers may update > (when the samples is empty, we will add a new sample to it) and iterate the > samples concurrently, and then cause the ConcurrentModificationException. > {code:java} > [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, > fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76) > java.util.ConcurrentModificationException > at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) > at java.util.ArrayList$Itr.next(ArrayList.java:859) > at > org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132) > at > org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78) > at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66) > at > org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192) > at > kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104) > at > kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {code} > Before > [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35] > the ConcurrentModificationException doesn't exist since all changes to > samples is "add" currently. Using the get(index) is able to avoid the > ConcurrentModificationException. > In short, we can just make samples thread-safe. Or just replace the foreach > loop by get(index) if we have concerns about the performance of thread-safe > list... > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat
[ https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468565#comment-16468565 ] Chia-Ping Tsai commented on KAFKA-6870: --- {quote}What is your user id? {quote} chia7712 > Concurrency conflicts in SampledStat > > > Key: KAFKA-6870 > URL: https://issues.apache.org/jira/browse/KAFKA-6870 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Priority: Major > Fix For: 2.0.0, 1.1.1 > > > The samples stored in SampledStat is not thread-safe. However, > ReplicaFetcherThreads used to handle replica to specified brokers may update > (when the samples is empty, we will add a new sample to it) and iterate the > samples concurrently, and then cause the ConcurrentModificationException. > {code:java} > [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, > fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76) > java.util.ConcurrentModificationException > at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) > at java.util.ArrayList$Itr.next(ArrayList.java:859) > at > org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132) > at > org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78) > at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66) > at > org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192) > at > kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104) > at > kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {code} > Before > [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35] > the ConcurrentModificationException doesn't exist since all changes to > samples is "add" currently. Using the get(index) is able to avoid the > ConcurrentModificationException. > In short, we can just make samples thread-safe. Or just replace the foreach > loop by get(index) if we have concerns about the performance of thread-safe > list... > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat
[ https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468563#comment-16468563 ] Rajini Sivaram commented on KAFKA-6870: --- What is your user id? > Concurrency conflicts in SampledStat > > > Key: KAFKA-6870 > URL: https://issues.apache.org/jira/browse/KAFKA-6870 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Priority: Major > Fix For: 2.0.0, 1.1.1 > > > The samples stored in SampledStat is not thread-safe. However, > ReplicaFetcherThreads used to handle replica to specified brokers may update > (when the samples is empty, we will add a new sample to it) and iterate the > samples concurrently, and then cause the ConcurrentModificationException. > {code:java} > [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, > fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76) > java.util.ConcurrentModificationException > at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) > at java.util.ArrayList$Itr.next(ArrayList.java:859) > at > org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132) > at > org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78) > at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66) > at > org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192) > at > kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104) > at > kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {code} > Before > [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35] > the ConcurrentModificationException doesn't exist since all changes to > samples is "add" currently. Using the get(index) is able to avoid the > ConcurrentModificationException. > In short, we can just make samples thread-safe. Or just replace the foreach > loop by get(index) if we have concerns about the performance of thread-safe > list... > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat
[ https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468549#comment-16468549 ] Chia-Ping Tsai commented on KAFKA-6870: --- {quote}You can just assign it to yourself {quote} That is the problem. I can't assign this issue to myself... > Concurrency conflicts in SampledStat > > > Key: KAFKA-6870 > URL: https://issues.apache.org/jira/browse/KAFKA-6870 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Priority: Major > Fix For: 2.0.0, 1.1.1 > > > The samples stored in SampledStat is not thread-safe. However, > ReplicaFetcherThreads used to handle replica to specified brokers may update > (when the samples is empty, we will add a new sample to it) and iterate the > samples concurrently, and then cause the ConcurrentModificationException. > {code:java} > [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, > fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76) > java.util.ConcurrentModificationException > at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) > at java.util.ArrayList$Itr.next(ArrayList.java:859) > at > org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132) > at > org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78) > at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66) > at > org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192) > at > kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104) > at > kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {code} > Before > [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35] > the ConcurrentModificationException doesn't exist since all changes to > samples is "add" currently. Using the get(index) is able to avoid the > ConcurrentModificationException. > In short, we can just make samples thread-safe. Or just replace the foreach > loop by get(index) if we have concerns about the performance of thread-safe > list... > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat
[ https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468545#comment-16468545 ] Rajini Sivaram commented on KAFKA-6870: --- [~chia7712] Yes, of course. You can just assign it to yourself and submit a PR. > Concurrency conflicts in SampledStat > > > Key: KAFKA-6870 > URL: https://issues.apache.org/jira/browse/KAFKA-6870 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Priority: Major > Fix For: 2.0.0, 1.1.1 > > > The samples stored in SampledStat is not thread-safe. However, > ReplicaFetcherThreads used to handle replica to specified brokers may update > (when the samples is empty, we will add a new sample to it) and iterate the > samples concurrently, and then cause the ConcurrentModificationException. > {code:java} > [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, > fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76) > java.util.ConcurrentModificationException > at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) > at java.util.ArrayList$Itr.next(ArrayList.java:859) > at > org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132) > at > org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78) > at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66) > at > org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192) > at > kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104) > at > kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {code} > Before > [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35] > the ConcurrentModificationException doesn't exist since all changes to > samples is "add" currently. Using the get(index) is able to avoid the > ConcurrentModificationException. > In short, we can just make samples thread-safe. Or just replace the foreach > loop by get(index) if we have concerns about the performance of thread-safe > list... > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat
[ https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468540#comment-16468540 ] Chia-Ping Tsai commented on KAFKA-6870: --- [~rsivaram] Could I take over this issue? > Concurrency conflicts in SampledStat > > > Key: KAFKA-6870 > URL: https://issues.apache.org/jira/browse/KAFKA-6870 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Priority: Major > Fix For: 2.0.0, 1.1.1 > > > The samples stored in SampledStat is not thread-safe. However, > ReplicaFetcherThreads used to handle replica to specified brokers may update > (when the samples is empty, we will add a new sample to it) and iterate the > samples concurrently, and then cause the ConcurrentModificationException. > {code:java} > [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, > fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76) > java.util.ConcurrentModificationException > at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) > at java.util.ArrayList$Itr.next(ArrayList.java:859) > at > org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132) > at > org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78) > at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66) > at > org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192) > at > kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104) > at > kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {code} > Before > [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35] > the ConcurrentModificationException doesn't exist since all changes to > samples is "add" currently. Using the get(index) is able to avoid the > ConcurrentModificationException. > In short, we can just make samples thread-safe. Or just replace the foreach > loop by get(index) if we have concerns about the performance of thread-safe > list... > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat
[ https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468538#comment-16468538 ] Rajini Sivaram commented on KAFKA-6870: --- [~ijuma] This is a different issue from KAFKA-6765, there is missing synchronization. [~chia7712] Yes, I think it makes sense to add `synchronized(this.lock)` in `KafkaMetric#measurableValue`. > Concurrency conflicts in SampledStat > > > Key: KAFKA-6870 > URL: https://issues.apache.org/jira/browse/KAFKA-6870 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Priority: Major > Fix For: 2.0.0, 1.1.1 > > > The samples stored in SampledStat is not thread-safe. However, > ReplicaFetcherThreads used to handle replica to specified brokers may update > (when the samples is empty, we will add a new sample to it) and iterate the > samples concurrently, and then cause the ConcurrentModificationException. > {code:java} > [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, > fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76) > java.util.ConcurrentModificationException > at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) > at java.util.ArrayList$Itr.next(ArrayList.java:859) > at > org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132) > at > org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78) > at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66) > at > org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192) > at > kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104) > at > kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {code} > Before > [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35] > the ConcurrentModificationException doesn't exist since all changes to > samples is "add" currently. Using the get(index) is able to avoid the > ConcurrentModificationException. > In short, we can just make samples thread-safe. Or just replace the foreach > loop by get(index) if we have concerns about the performance of thread-safe > list... > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat
[ https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468303#comment-16468303 ] Chia-Ping Tsai commented on KAFKA-6870: --- After reading the Sensor and KafkaMetrics, I feel we should make KafkaMetric#measurableValue thread-safe to resolve this issue. We have passed a lock to KafkaMetrics and KafkaMetrics should use the lock when trying to get the measure value from metricValueProvider. [~ijuma] [~rsivaram] Any suggestions? > Concurrency conflicts in SampledStat > > > Key: KAFKA-6870 > URL: https://issues.apache.org/jira/browse/KAFKA-6870 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Priority: Major > Fix For: 2.0.0, 1.1.1 > > > The samples stored in SampledStat is not thread-safe. However, > ReplicaFetcherThreads used to handle replica to specified brokers may update > (when the samples is empty, we will add a new sample to it) and iterate the > samples concurrently, and then cause the ConcurrentModificationException. > {code:java} > [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, > fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76) > java.util.ConcurrentModificationException > at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) > at java.util.ArrayList$Itr.next(ArrayList.java:859) > at > org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132) > at > org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78) > at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66) > at > org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192) > at > kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104) > at > kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {code} > Before > [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35] > the ConcurrentModificationException doesn't exist since all changes to > samples is "add" currently. Using the get(index) is able to avoid the > ConcurrentModificationException. > In short, we can just make samples thread-safe. Or just replace the foreach > loop by get(index) if we have concerns about the performance of thread-safe > list... > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat
[ https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16465207#comment-16465207 ] Ismael Juma commented on KAFKA-6870: Thanks for the report. There was a recent related by [~rsivaram], but not sure if it was the same underlying issue. [~rsivaram], can you please check? > Concurrency conflicts in SampledStat > > > Key: KAFKA-6870 > URL: https://issues.apache.org/jira/browse/KAFKA-6870 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Priority: Major > > The samples stored in SampledStat is not thread-safe. However, > ReplicaFetcherThreads used to handle replica to specified brokers may update > (when the samples is empty, we will add a new sample to it) and iterate the > samples concurrently, and then cause the ConcurrentModificationException. > {code:java} > [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, > fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76) > java.util.ConcurrentModificationException > at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) > at java.util.ArrayList$Itr.next(ArrayList.java:859) > at > org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132) > at > org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78) > at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66) > at > org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192) > at > kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104) > at > kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {code} > Before > [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35,] > the ConcurrentModificationException doesn't exist since all changes to > samples is "add" currently. Using the get(index) is able to avoid the > ConcurrentModificationException. > In short, we can just make samples thread-safe. Or just replace the foreach > loop by get(index) if we have concerns about the performance of thread-safe > list... > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)