[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat

2018-06-01 Thread Kevin Lafferty (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16498412#comment-16498412
 ] 

Kevin Lafferty commented on KAFKA-6870:
---

This depends on [KAFKA-6765|https://issues.apache.org/jira/browse/KAFKA-6765] 
so that would have to be backported to 1.0 as well. [~rsivaram]

> Concurrency conflicts in SampledStat
> 
>
> Key: KAFKA-6870
> URL: https://issues.apache.org/jira/browse/KAFKA-6870
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
> Fix For: 2.0.0, 1.1.1
>
>
> The samples stored in SampledStat is not thread-safe. However, 
> ReplicaFetcherThreads used to handle replica to specified brokers may update 
> (when the samples is empty, we will add a new sample to it) and iterate the 
> samples concurrently, and then cause the ConcurrentModificationException.
> {code:java}
> [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76)
> java.util.ConcurrentModificationException
> at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
> at java.util.ArrayList$Itr.next(ArrayList.java:859)
> at 
> org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132)
> at 
> org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78)
> at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85)
> at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201)
> at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192)
> at 
> kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104)
> at 
> kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384)
> at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263)
> at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> Before 
> [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35]
>  the ConcurrentModificationException doesn't exist since all changes to 
> samples is "add" currently. Using the get(index) is able to avoid the 
> ConcurrentModificationException.
> In short, we can just make samples thread-safe. Or just replace the foreach 
> loop by get(index) if we have concerns about the performance of thread-safe 
> list...
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat

2018-05-31 Thread Chia-Ping Tsai (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16497450#comment-16497450
 ] 

Chia-Ping Tsai commented on KAFKA-6870:
---

{quote}Assuming there will be a 1.0.2, can this fix be included there?
{quote}
I'm happy to backport this fix to 1.0.2. Should I file a PR against the branch 
1.0?

> Concurrency conflicts in SampledStat
> 
>
> Key: KAFKA-6870
> URL: https://issues.apache.org/jira/browse/KAFKA-6870
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
> Fix For: 2.0.0, 1.1.1
>
>
> The samples stored in SampledStat is not thread-safe. However, 
> ReplicaFetcherThreads used to handle replica to specified brokers may update 
> (when the samples is empty, we will add a new sample to it) and iterate the 
> samples concurrently, and then cause the ConcurrentModificationException.
> {code:java}
> [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76)
> java.util.ConcurrentModificationException
> at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
> at java.util.ArrayList$Itr.next(ArrayList.java:859)
> at 
> org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132)
> at 
> org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78)
> at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85)
> at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201)
> at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192)
> at 
> kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104)
> at 
> kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384)
> at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263)
> at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> Before 
> [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35]
>  the ConcurrentModificationException doesn't exist since all changes to 
> samples is "add" currently. Using the get(index) is able to avoid the 
> ConcurrentModificationException.
> In short, we can just make samples thread-safe. Or just replace the foreach 
> loop by get(index) if we have concerns about the performance of thread-safe 
> list...
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat

2018-05-31 Thread Kevin Lafferty (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16497222#comment-16497222
 ] 

Kevin Lafferty commented on KAFKA-6870:
---

Assuming there will be a 1.0.2, can this fix be included there?

> Concurrency conflicts in SampledStat
> 
>
> Key: KAFKA-6870
> URL: https://issues.apache.org/jira/browse/KAFKA-6870
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
> Fix For: 2.0.0, 1.1.1
>
>
> The samples stored in SampledStat is not thread-safe. However, 
> ReplicaFetcherThreads used to handle replica to specified brokers may update 
> (when the samples is empty, we will add a new sample to it) and iterate the 
> samples concurrently, and then cause the ConcurrentModificationException.
> {code:java}
> [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76)
> java.util.ConcurrentModificationException
> at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
> at java.util.ArrayList$Itr.next(ArrayList.java:859)
> at 
> org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132)
> at 
> org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78)
> at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85)
> at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201)
> at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192)
> at 
> kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104)
> at 
> kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384)
> at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263)
> at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> Before 
> [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35]
>  the ConcurrentModificationException doesn't exist since all changes to 
> samples is "add" currently. Using the get(index) is able to avoid the 
> ConcurrentModificationException.
> In short, we can just make samples thread-safe. Or just replace the foreach 
> loop by get(index) if we have concerns about the performance of thread-safe 
> list...
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat

2018-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16470241#comment-16470241
 ] 

ASF GitHub Bot commented on KAFKA-6870:
---

rajinisivaram closed pull request #4985: KAFKA-6870 Concurrency conflicts in 
SampledStat
URL: https://github.com/apache/kafka/pull/4985
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
index f04981aee39..48999e1057d 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
@@ -55,9 +55,7 @@ public MetricName metricName() {
 @Override
 @Deprecated
 public double value() {
-synchronized (this.lock) {
-return measurableValue(time.milliseconds());
-}
+return measurableValue(time.milliseconds());
 }
 
 @Override
@@ -81,10 +79,12 @@ public Measurable measurable() {
 }
 
 double measurableValue(long timeMs) {
-if (this.metricValueProvider instanceof Measurable)
-return ((Measurable) metricValueProvider).measure(config, timeMs);
-else
-return 0;
+synchronized (this.lock) {
+if (this.metricValueProvider instanceof Measurable)
+return ((Measurable) metricValueProvider).measure(config, 
timeMs);
+else
+return 0;
+}
 }
 
 public void config(MetricConfig config) {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java 
b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
index a1c0814c343..8e3dfeb3b9f 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
@@ -19,19 +19,29 @@
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.metrics.stats.Sum;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -124,4 +134,56 @@ public void testIdempotentAdd() {
 assertEquals(1, sensor.metrics().size());
 assertEquals(org.apache.kafka.common.metrics.stats.Avg.class, 
sensor.metrics().get(0).measurable().getClass());
 }
+
+/**
+ * The Sensor#checkQuotas should be thread-safe since the method may be 
used by many ReplicaFetcherThreads.
+ */
+@Test
+public void testCheckQuotasInMultiThreads() throws InterruptedException, 
ExecutionException {
+final Metrics metrics = new Metrics(new 
MetricConfig().quota(Quota.upperBound(Double.MAX_VALUE))
+// decreasing the value of time window make SampledStat always 
record the given value
+.timeWindow(1, TimeUnit.MILLISECONDS)
+// increasing the value of samples make SampledStat store more 
samples
+.samples(100));
+final Sensor sensor = metrics.sensor("sensor");
+
+assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), 
new Rate()));
+final int threadCount = 10;
+final CountDownLatch latch = new CountDownLatch(1);
+ExecutorService service = Executors.newFixedThreadPool(threadCount);
+List> workers = new ArrayList<>(threadCount);
+boolean needShutdown = true;
+try {
+for (int i = 0; i != threadCount; ++i) {
+final int index = i;
+workers.add(service.submit(new Callable() {
+@Override
+public Throwable call() {
+try {
+assertTrue(latch.await(5, TimeUnit.SECONDS));
+for (int j 

[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat

2018-05-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468587#comment-16468587
 ] 

ASF GitHub Bot commented on KAFKA-6870:
---

chia7712 opened a new pull request #4985: KAFKA-6870 Concurrency conflicts in 
SampledStat
URL: https://github.com/apache/kafka/pull/4985
 
 
   see [KAFKA-6870](https://issues.apache.org/jira/browse/KAFKA-6870) for more 
details.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Concurrency conflicts in SampledStat
> 
>
> Key: KAFKA-6870
> URL: https://issues.apache.org/jira/browse/KAFKA-6870
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
> Fix For: 2.0.0, 1.1.1
>
>
> The samples stored in SampledStat is not thread-safe. However, 
> ReplicaFetcherThreads used to handle replica to specified brokers may update 
> (when the samples is empty, we will add a new sample to it) and iterate the 
> samples concurrently, and then cause the ConcurrentModificationException.
> {code:java}
> [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76)
> java.util.ConcurrentModificationException
> at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
> at java.util.ArrayList$Itr.next(ArrayList.java:859)
> at 
> org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132)
> at 
> org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78)
> at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85)
> at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201)
> at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192)
> at 
> kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104)
> at 
> kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384)
> at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263)
> at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> Before 
> [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35]
>  the ConcurrentModificationException doesn't exist since all changes to 
> samples is "add" currently. Using the get(index) is able to avoid the 
> ConcurrentModificationException.
> In short, we can just make samples thread-safe. Or just replace the foreach 
> loop by get(index) if we have concerns about the performance of thread-safe 
> list...
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat

2018-05-09 Thread Chia-Ping Tsai (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468575#comment-16468575
 ] 

Chia-Ping Tsai commented on KAFKA-6870:
---

Thanks! [~rsivaram]

> Concurrency conflicts in SampledStat
> 
>
> Key: KAFKA-6870
> URL: https://issues.apache.org/jira/browse/KAFKA-6870
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
> Fix For: 2.0.0, 1.1.1
>
>
> The samples stored in SampledStat is not thread-safe. However, 
> ReplicaFetcherThreads used to handle replica to specified brokers may update 
> (when the samples is empty, we will add a new sample to it) and iterate the 
> samples concurrently, and then cause the ConcurrentModificationException.
> {code:java}
> [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76)
> java.util.ConcurrentModificationException
> at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
> at java.util.ArrayList$Itr.next(ArrayList.java:859)
> at 
> org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132)
> at 
> org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78)
> at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85)
> at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201)
> at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192)
> at 
> kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104)
> at 
> kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384)
> at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263)
> at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> Before 
> [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35]
>  the ConcurrentModificationException doesn't exist since all changes to 
> samples is "add" currently. Using the get(index) is able to avoid the 
> ConcurrentModificationException.
> In short, we can just make samples thread-safe. Or just replace the foreach 
> loop by get(index) if we have concerns about the performance of thread-safe 
> list...
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat

2018-05-09 Thread Rajini Sivaram (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468572#comment-16468572
 ] 

Rajini Sivaram commented on KAFKA-6870:
---

I have added you as Contributor and assigned the Jira to you.

> Concurrency conflicts in SampledStat
> 
>
> Key: KAFKA-6870
> URL: https://issues.apache.org/jira/browse/KAFKA-6870
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
> Fix For: 2.0.0, 1.1.1
>
>
> The samples stored in SampledStat is not thread-safe. However, 
> ReplicaFetcherThreads used to handle replica to specified brokers may update 
> (when the samples is empty, we will add a new sample to it) and iterate the 
> samples concurrently, and then cause the ConcurrentModificationException.
> {code:java}
> [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76)
> java.util.ConcurrentModificationException
> at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
> at java.util.ArrayList$Itr.next(ArrayList.java:859)
> at 
> org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132)
> at 
> org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78)
> at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85)
> at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201)
> at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192)
> at 
> kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104)
> at 
> kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384)
> at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263)
> at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> Before 
> [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35]
>  the ConcurrentModificationException doesn't exist since all changes to 
> samples is "add" currently. Using the get(index) is able to avoid the 
> ConcurrentModificationException.
> In short, we can just make samples thread-safe. Or just replace the foreach 
> loop by get(index) if we have concerns about the performance of thread-safe 
> list...
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat

2018-05-09 Thread Chia-Ping Tsai (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468565#comment-16468565
 ] 

Chia-Ping Tsai commented on KAFKA-6870:
---

{quote}What is your user id?
{quote}
chia7712

> Concurrency conflicts in SampledStat
> 
>
> Key: KAFKA-6870
> URL: https://issues.apache.org/jira/browse/KAFKA-6870
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Priority: Major
> Fix For: 2.0.0, 1.1.1
>
>
> The samples stored in SampledStat is not thread-safe. However, 
> ReplicaFetcherThreads used to handle replica to specified brokers may update 
> (when the samples is empty, we will add a new sample to it) and iterate the 
> samples concurrently, and then cause the ConcurrentModificationException.
> {code:java}
> [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76)
> java.util.ConcurrentModificationException
> at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
> at java.util.ArrayList$Itr.next(ArrayList.java:859)
> at 
> org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132)
> at 
> org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78)
> at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85)
> at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201)
> at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192)
> at 
> kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104)
> at 
> kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384)
> at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263)
> at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> Before 
> [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35]
>  the ConcurrentModificationException doesn't exist since all changes to 
> samples is "add" currently. Using the get(index) is able to avoid the 
> ConcurrentModificationException.
> In short, we can just make samples thread-safe. Or just replace the foreach 
> loop by get(index) if we have concerns about the performance of thread-safe 
> list...
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat

2018-05-09 Thread Rajini Sivaram (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468563#comment-16468563
 ] 

Rajini Sivaram commented on KAFKA-6870:
---

What is your user id?

> Concurrency conflicts in SampledStat
> 
>
> Key: KAFKA-6870
> URL: https://issues.apache.org/jira/browse/KAFKA-6870
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Priority: Major
> Fix For: 2.0.0, 1.1.1
>
>
> The samples stored in SampledStat is not thread-safe. However, 
> ReplicaFetcherThreads used to handle replica to specified brokers may update 
> (when the samples is empty, we will add a new sample to it) and iterate the 
> samples concurrently, and then cause the ConcurrentModificationException.
> {code:java}
> [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76)
> java.util.ConcurrentModificationException
> at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
> at java.util.ArrayList$Itr.next(ArrayList.java:859)
> at 
> org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132)
> at 
> org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78)
> at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85)
> at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201)
> at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192)
> at 
> kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104)
> at 
> kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384)
> at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263)
> at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> Before 
> [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35]
>  the ConcurrentModificationException doesn't exist since all changes to 
> samples is "add" currently. Using the get(index) is able to avoid the 
> ConcurrentModificationException.
> In short, we can just make samples thread-safe. Or just replace the foreach 
> loop by get(index) if we have concerns about the performance of thread-safe 
> list...
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat

2018-05-09 Thread Chia-Ping Tsai (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468549#comment-16468549
 ] 

Chia-Ping Tsai commented on KAFKA-6870:
---

{quote}You can just assign it to yourself
{quote}
That is the problem. I can't assign this issue to myself...

> Concurrency conflicts in SampledStat
> 
>
> Key: KAFKA-6870
> URL: https://issues.apache.org/jira/browse/KAFKA-6870
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Priority: Major
> Fix For: 2.0.0, 1.1.1
>
>
> The samples stored in SampledStat is not thread-safe. However, 
> ReplicaFetcherThreads used to handle replica to specified brokers may update 
> (when the samples is empty, we will add a new sample to it) and iterate the 
> samples concurrently, and then cause the ConcurrentModificationException.
> {code:java}
> [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76)
> java.util.ConcurrentModificationException
> at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
> at java.util.ArrayList$Itr.next(ArrayList.java:859)
> at 
> org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132)
> at 
> org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78)
> at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85)
> at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201)
> at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192)
> at 
> kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104)
> at 
> kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384)
> at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263)
> at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> Before 
> [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35]
>  the ConcurrentModificationException doesn't exist since all changes to 
> samples is "add" currently. Using the get(index) is able to avoid the 
> ConcurrentModificationException.
> In short, we can just make samples thread-safe. Or just replace the foreach 
> loop by get(index) if we have concerns about the performance of thread-safe 
> list...
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat

2018-05-09 Thread Rajini Sivaram (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468545#comment-16468545
 ] 

Rajini Sivaram commented on KAFKA-6870:
---

[~chia7712] Yes, of course. You can just assign it to yourself and submit a PR.

> Concurrency conflicts in SampledStat
> 
>
> Key: KAFKA-6870
> URL: https://issues.apache.org/jira/browse/KAFKA-6870
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Priority: Major
> Fix For: 2.0.0, 1.1.1
>
>
> The samples stored in SampledStat is not thread-safe. However, 
> ReplicaFetcherThreads used to handle replica to specified brokers may update 
> (when the samples is empty, we will add a new sample to it) and iterate the 
> samples concurrently, and then cause the ConcurrentModificationException.
> {code:java}
> [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76)
> java.util.ConcurrentModificationException
> at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
> at java.util.ArrayList$Itr.next(ArrayList.java:859)
> at 
> org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132)
> at 
> org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78)
> at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85)
> at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201)
> at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192)
> at 
> kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104)
> at 
> kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384)
> at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263)
> at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> Before 
> [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35]
>  the ConcurrentModificationException doesn't exist since all changes to 
> samples is "add" currently. Using the get(index) is able to avoid the 
> ConcurrentModificationException.
> In short, we can just make samples thread-safe. Or just replace the foreach 
> loop by get(index) if we have concerns about the performance of thread-safe 
> list...
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat

2018-05-09 Thread Chia-Ping Tsai (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468540#comment-16468540
 ] 

Chia-Ping Tsai commented on KAFKA-6870:
---

[~rsivaram]  Could I take over this issue?

> Concurrency conflicts in SampledStat
> 
>
> Key: KAFKA-6870
> URL: https://issues.apache.org/jira/browse/KAFKA-6870
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Priority: Major
> Fix For: 2.0.0, 1.1.1
>
>
> The samples stored in SampledStat is not thread-safe. However, 
> ReplicaFetcherThreads used to handle replica to specified brokers may update 
> (when the samples is empty, we will add a new sample to it) and iterate the 
> samples concurrently, and then cause the ConcurrentModificationException.
> {code:java}
> [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76)
> java.util.ConcurrentModificationException
> at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
> at java.util.ArrayList$Itr.next(ArrayList.java:859)
> at 
> org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132)
> at 
> org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78)
> at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85)
> at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201)
> at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192)
> at 
> kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104)
> at 
> kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384)
> at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263)
> at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> Before 
> [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35]
>  the ConcurrentModificationException doesn't exist since all changes to 
> samples is "add" currently. Using the get(index) is able to avoid the 
> ConcurrentModificationException.
> In short, we can just make samples thread-safe. Or just replace the foreach 
> loop by get(index) if we have concerns about the performance of thread-safe 
> list...
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat

2018-05-09 Thread Rajini Sivaram (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468538#comment-16468538
 ] 

Rajini Sivaram commented on KAFKA-6870:
---

[~ijuma] This is a different issue from KAFKA-6765, there is missing 
synchronization.

[~chia7712] Yes, I think it makes sense to add  `synchronized(this.lock)` in 
`KafkaMetric#measurableValue`.

> Concurrency conflicts in SampledStat
> 
>
> Key: KAFKA-6870
> URL: https://issues.apache.org/jira/browse/KAFKA-6870
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Priority: Major
> Fix For: 2.0.0, 1.1.1
>
>
> The samples stored in SampledStat is not thread-safe. However, 
> ReplicaFetcherThreads used to handle replica to specified brokers may update 
> (when the samples is empty, we will add a new sample to it) and iterate the 
> samples concurrently, and then cause the ConcurrentModificationException.
> {code:java}
> [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76)
> java.util.ConcurrentModificationException
> at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
> at java.util.ArrayList$Itr.next(ArrayList.java:859)
> at 
> org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132)
> at 
> org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78)
> at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85)
> at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201)
> at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192)
> at 
> kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104)
> at 
> kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384)
> at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263)
> at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> Before 
> [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35]
>  the ConcurrentModificationException doesn't exist since all changes to 
> samples is "add" currently. Using the get(index) is able to avoid the 
> ConcurrentModificationException.
> In short, we can just make samples thread-safe. Or just replace the foreach 
> loop by get(index) if we have concerns about the performance of thread-safe 
> list...
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat

2018-05-08 Thread Chia-Ping Tsai (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468303#comment-16468303
 ] 

Chia-Ping Tsai commented on KAFKA-6870:
---

After reading the Sensor and KafkaMetrics, I feel we should make 
KafkaMetric#measurableValue thread-safe to resolve this issue. We have passed a 
lock to KafkaMetrics and KafkaMetrics should use the lock when trying to get 
the measure value from metricValueProvider. [~ijuma]  [~rsivaram] Any 
suggestions? 

> Concurrency conflicts in SampledStat
> 
>
> Key: KAFKA-6870
> URL: https://issues.apache.org/jira/browse/KAFKA-6870
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Priority: Major
> Fix For: 2.0.0, 1.1.1
>
>
> The samples stored in SampledStat is not thread-safe. However, 
> ReplicaFetcherThreads used to handle replica to specified brokers may update 
> (when the samples is empty, we will add a new sample to it) and iterate the 
> samples concurrently, and then cause the ConcurrentModificationException.
> {code:java}
> [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76)
> java.util.ConcurrentModificationException
> at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
> at java.util.ArrayList$Itr.next(ArrayList.java:859)
> at 
> org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132)
> at 
> org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78)
> at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85)
> at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201)
> at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192)
> at 
> kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104)
> at 
> kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384)
> at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263)
> at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> Before 
> [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35]
>  the ConcurrentModificationException doesn't exist since all changes to 
> samples is "add" currently. Using the get(index) is able to avoid the 
> ConcurrentModificationException.
> In short, we can just make samples thread-safe. Or just replace the foreach 
> loop by get(index) if we have concerns about the performance of thread-safe 
> list...
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat

2018-05-06 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16465207#comment-16465207
 ] 

Ismael Juma commented on KAFKA-6870:


Thanks for the report. There was a recent related by [~rsivaram], but not sure 
if it was the same underlying issue. [~rsivaram], can you please check?

> Concurrency conflicts in SampledStat
> 
>
> Key: KAFKA-6870
> URL: https://issues.apache.org/jira/browse/KAFKA-6870
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Priority: Major
>
> The samples stored in SampledStat is not thread-safe. However, 
> ReplicaFetcherThreads used to handle replica to specified brokers may update 
> (when the samples is empty, we will add a new sample to it) and iterate the 
> samples concurrently, and then cause the ConcurrentModificationException.
> {code:java}
> [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76)
> java.util.ConcurrentModificationException
> at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
> at java.util.ArrayList$Itr.next(ArrayList.java:859)
> at 
> org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132)
> at 
> org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78)
> at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85)
> at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201)
> at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192)
> at 
> kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104)
> at 
> kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384)
> at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263)
> at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> Before 
> [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35,]
>  the ConcurrentModificationException doesn't exist since all changes to 
> samples is "add" currently. Using the get(index) is able to avoid the 
> ConcurrentModificationException.
> In short, we can just make samples thread-safe. Or just replace the foreach 
> loop by get(index) if we have concerns about the performance of thread-safe 
> list...
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)