[ https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16470241#comment-16470241 ]
ASF GitHub Bot commented on KAFKA-6870: --------------------------------------- rajinisivaram closed pull request #4985: KAFKA-6870 Concurrency conflicts in SampledStat URL: https://github.com/apache/kafka/pull/4985 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java index f04981aee39..48999e1057d 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java @@ -55,9 +55,7 @@ public MetricName metricName() { @Override @Deprecated public double value() { - synchronized (this.lock) { - return measurableValue(time.milliseconds()); - } + return measurableValue(time.milliseconds()); } @Override @@ -81,10 +79,12 @@ public Measurable measurable() { } double measurableValue(long timeMs) { - if (this.metricValueProvider instanceof Measurable) - return ((Measurable) metricValueProvider).measure(config, timeMs); - else - return 0; + synchronized (this.lock) { + if (this.metricValueProvider instanceof Measurable) + return ((Measurable) metricValueProvider).measure(config, timeMs); + else + return 0; + } } public void config(MetricConfig config) { diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java index a1c0814c343..8e3dfeb3b9f 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java @@ -19,19 +19,29 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.metrics.stats.Sum; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.junit.Test; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -124,4 +134,56 @@ public void testIdempotentAdd() { assertEquals(1, sensor.metrics().size()); assertEquals(org.apache.kafka.common.metrics.stats.Avg.class, sensor.metrics().get(0).measurable().getClass()); } + + /** + * The Sensor#checkQuotas should be thread-safe since the method may be used by many ReplicaFetcherThreads. + */ + @Test + public void testCheckQuotasInMultiThreads() throws InterruptedException, ExecutionException { + final Metrics metrics = new Metrics(new MetricConfig().quota(Quota.upperBound(Double.MAX_VALUE)) + // decreasing the value of time window make SampledStat always record the given value + .timeWindow(1, TimeUnit.MILLISECONDS) + // increasing the value of samples make SampledStat store more samples + .samples(100)); + final Sensor sensor = metrics.sensor("sensor"); + + assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), new Rate())); + final int threadCount = 10; + final CountDownLatch latch = new CountDownLatch(1); + ExecutorService service = Executors.newFixedThreadPool(threadCount); + List<Future<Throwable>> workers = new ArrayList<>(threadCount); + boolean needShutdown = true; + try { + for (int i = 0; i != threadCount; ++i) { + final int index = i; + workers.add(service.submit(new Callable<Throwable>() { + @Override + public Throwable call() { + try { + assertTrue(latch.await(5, TimeUnit.SECONDS)); + for (int j = 0; j != 20; ++j) { + sensor.record(j * index, System.currentTimeMillis() + j, false); + sensor.checkQuotas(); + } + return null; + } catch (Throwable e) { + return e; + } + } + })); + } + latch.countDown(); + service.shutdown(); + assertTrue(service.awaitTermination(10, TimeUnit.SECONDS)); + needShutdown = false; + for (Future<Throwable> callable : workers) { + assertTrue("If this failure happen frequently, we can try to increase the wait time", callable.isDone()); + assertNull("Sensor#checkQuotas SHOULD be thread-safe!", callable.get()); + } + } finally { + if (needShutdown) { + service.shutdownNow(); + } + } + } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Concurrency conflicts in SampledStat > ------------------------------------ > > Key: KAFKA-6870 > URL: https://issues.apache.org/jira/browse/KAFKA-6870 > Project: Kafka > Issue Type: Bug > Reporter: Chia-Ping Tsai > Assignee: Chia-Ping Tsai > Priority: Major > Fix For: 2.0.0, 1.1.1 > > > The samples stored in SampledStat is not thread-safe. However, > ReplicaFetcherThreads used to handle replica to specified brokers may update > (when the samples is empty, we will add a new sample to it) and iterate the > samples concurrently, and then cause the ConcurrentModificationException. > {code:java} > [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, > fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76) > java.util.ConcurrentModificationException > at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) > at java.util.ArrayList$Itr.next(ArrayList.java:859) > at > org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132) > at > org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78) > at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66) > at > org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201) > at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192) > at > kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104) > at > kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263) > at > kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102) > at > kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {code} > Before > [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35] > the ConcurrentModificationException doesn't exist since all changes to > samples is "add" currently. Using the get(index) is able to avoid the > ConcurrentModificationException. > In short, we can just make samples thread-safe. Or just replace the foreach > loop by get(index) if we have concerns about the performance of thread-safe > list... > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)