[ 
https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16470241#comment-16470241
 ] 

ASF GitHub Bot commented on KAFKA-6870:
---------------------------------------

rajinisivaram closed pull request #4985: KAFKA-6870 Concurrency conflicts in 
SampledStat
URL: https://github.com/apache/kafka/pull/4985
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
index f04981aee39..48999e1057d 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
@@ -55,9 +55,7 @@ public MetricName metricName() {
     @Override
     @Deprecated
     public double value() {
-        synchronized (this.lock) {
-            return measurableValue(time.milliseconds());
-        }
+        return measurableValue(time.milliseconds());
     }
 
     @Override
@@ -81,10 +79,12 @@ public Measurable measurable() {
     }
 
     double measurableValue(long timeMs) {
-        if (this.metricValueProvider instanceof Measurable)
-            return ((Measurable) metricValueProvider).measure(config, timeMs);
-        else
-            return 0;
+        synchronized (this.lock) {
+            if (this.metricValueProvider instanceof Measurable)
+                return ((Measurable) metricValueProvider).measure(config, 
timeMs);
+            else
+                return 0;
+        }
     }
 
     public void config(MetricConfig config) {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java 
b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
index a1c0814c343..8e3dfeb3b9f 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
@@ -19,19 +19,29 @@
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.metrics.stats.Sum;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -124,4 +134,56 @@ public void testIdempotentAdd() {
         assertEquals(1, sensor.metrics().size());
         assertEquals(org.apache.kafka.common.metrics.stats.Avg.class, 
sensor.metrics().get(0).measurable().getClass());
     }
+
+    /**
+     * The Sensor#checkQuotas should be thread-safe since the method may be 
used by many ReplicaFetcherThreads.
+     */
+    @Test
+    public void testCheckQuotasInMultiThreads() throws InterruptedException, 
ExecutionException {
+        final Metrics metrics = new Metrics(new 
MetricConfig().quota(Quota.upperBound(Double.MAX_VALUE))
+            // decreasing the value of time window make SampledStat always 
record the given value
+            .timeWindow(1, TimeUnit.MILLISECONDS)
+            // increasing the value of samples make SampledStat store more 
samples
+            .samples(100));
+        final Sensor sensor = metrics.sensor("sensor");
+
+        assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), 
new Rate()));
+        final int threadCount = 10;
+        final CountDownLatch latch = new CountDownLatch(1);
+        ExecutorService service = Executors.newFixedThreadPool(threadCount);
+        List<Future<Throwable>> workers = new ArrayList<>(threadCount);
+        boolean needShutdown = true;
+        try {
+            for (int i = 0; i != threadCount; ++i) {
+                final int index = i;
+                workers.add(service.submit(new Callable<Throwable>() {
+                    @Override
+                    public Throwable call() {
+                        try {
+                            assertTrue(latch.await(5, TimeUnit.SECONDS));
+                            for (int j = 0; j != 20; ++j) {
+                                sensor.record(j * index, 
System.currentTimeMillis() + j, false);
+                                sensor.checkQuotas();
+                            }
+                            return null;
+                        } catch (Throwable e) {
+                            return e;
+                        }
+                    }
+                }));
+            }
+            latch.countDown();
+            service.shutdown();
+            assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
+            needShutdown = false;
+            for (Future<Throwable> callable : workers) {
+                assertTrue("If this failure happen frequently, we can try to 
increase the wait time", callable.isDone());
+                assertNull("Sensor#checkQuotas SHOULD be thread-safe!", 
callable.get());
+            }
+        } finally {
+            if (needShutdown) {
+                service.shutdownNow();
+            }
+        }
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Concurrency conflicts in SampledStat
> ------------------------------------
>
>                 Key: KAFKA-6870
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6870
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Chia-Ping Tsai
>            Assignee: Chia-Ping Tsai
>            Priority: Major
>             Fix For: 2.0.0, 1.1.1
>
>
> The samples stored in SampledStat is not thread-safe. However, 
> ReplicaFetcherThreads used to handle replica to specified brokers may update 
> (when the samples is empty, we will add a new sample to it) and iterate the 
> samples concurrently, and then cause the ConcurrentModificationException.
> {code:java}
> [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76)
> java.util.ConcurrentModificationException
>         at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
>         at java.util.ArrayList$Itr.next(ArrayList.java:859)
>         at 
> org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132)
>         at 
> org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78)
>         at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66)
>         at 
> org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85)
>         at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201)
>         at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192)
>         at 
> kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104)
>         at 
> kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384)
>         at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263)
>         at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261)
>         at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at 
> kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261)
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102)
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>         at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> Before 
> [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35]
>  the ConcurrentModificationException doesn't exist since all changes to 
> samples is "add" currently. Using the get(index) is able to avoid the 
> ConcurrentModificationException.
> In short, we can just make samples thread-safe. Or just replace the foreach 
> loop by get(index) if we have concerns about the performance of thread-safe 
> list...
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to