This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push: new ce9f522 KAFKA-8011: Fix for race condition causing concurrent modification exception (#6338) ce9f522 is described below commit ce9f522400e9eba5a36b49a725bc28ebaeaf33c8 Author: Bill Bejeck <bbej...@gmail.com> AuthorDate: Thu Feb 28 18:54:25 2019 -0500 KAFKA-8011: Fix for race condition causing concurrent modification exception (#6338) In the RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenCreated() and RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenDeleted() a race condition exists where the ConsumerRebalanceListener in the test modifies the list of subscribed topics when the condition for the test success is comparing the same array instance against expected values. This PR should fix this race condition by using a CopyOnWriteArrayList which guarantees safe traversal of the list even when a concurrent modification is happening. Reviewers: Guozhang Wang <wangg...@gmail.com> --- .../apache/kafka/streams/integration/RegexSourceIntegrationTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index be87eb2..a3ab289 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -60,6 +60,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; @@ -153,7 +154,7 @@ public class RegexSourceIntegrationTest { final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d")); pattern1Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde)); - final List<String> assignedTopics = new ArrayList<>(); + final List<String> assignedTopics = new CopyOnWriteArrayList<>(); streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() { @Override public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) { @@ -202,7 +203,7 @@ public class RegexSourceIntegrationTest { pattern1Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde)); - final List<String> assignedTopics = new ArrayList<>(); + final List<String> assignedTopics = new CopyOnWriteArrayList<>(); streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() { @Override public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) {