This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new b0dd4d5  KAFKA-8011: Fix for race condition causing concurrent 
modification exception (#6338)
b0dd4d5 is described below

commit b0dd4d50b4fe97365db2475d559dd3f1c6eb271a
Author: Bill Bejeck <bbej...@gmail.com>
AuthorDate: Thu Feb 28 18:54:25 2019 -0500

    KAFKA-8011: Fix for race condition causing concurrent modification 
exception (#6338)
    
    In the RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenCreated() and 
RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenDeleted() a race 
condition exists where the ConsumerRebalanceListener in the test modifies the 
list of subscribed topics when the condition for the test success is comparing 
the same array instance against expected values.
    
    This PR should fix this race condition by using a CopyOnWriteArrayList 
which guarantees safe traversal of the list even when a concurrent modification 
is happening.
    
    Reviewers: Guozhang Wang <wangg...@gmail.com>
---
 .../apache/kafka/streams/integration/RegexSourceIntegrationTest.java | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index d035586..0c6f21e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -58,6 +58,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
 
@@ -146,7 +147,7 @@ public class RegexSourceIntegrationTest {
         final KStream<String, String> pattern1Stream = 
builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
 
         pattern1Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, 
stringSerde));
-        final List<String> assignedTopics = new ArrayList<>();
+        final List<String> assignedTopics = new CopyOnWriteArrayList<>();
         streams = new KafkaStreams(builder.build(), streamsConfiguration, new 
DefaultKafkaClientSupplier() {
             @Override
             public Consumer<byte[], byte[]> getConsumer(final Map<String, 
Object> config) {
@@ -195,7 +196,7 @@ public class RegexSourceIntegrationTest {
 
         pattern1Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, 
stringSerde));
 
-        final List<String> assignedTopics = new ArrayList<>();
+        final List<String> assignedTopics = new CopyOnWriteArrayList<>();
         streams = new KafkaStreams(builder.build(), streamsConfiguration, new 
DefaultKafkaClientSupplier() {
             @Override
             public Consumer<byte[], byte[]> getConsumer(final Map<String, 
Object> config) {

Reply via email to