This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch 1.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.0 by this push: new 7434ef1 KAFKA-8026: Fix flaky regex source integration test 1.0 (#6463) 7434ef1 is described below commit 7434ef11d090bdf8583363e453552df6a1254f5c Author: Bill Bejeck <bbej...@gmail.com> AuthorDate: Mon Mar 25 17:10:23 2019 -0400 KAFKA-8026: Fix flaky regex source integration test 1.0 (#6463) Reviewers: Matthias J. Sax <mj...@apache.org>, John Roesler <j...@confluent.io> --- .../integration/RegexSourceIntegrationTest.java | 200 ++++++++++++--------- 1 file changed, 111 insertions(+), 89 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index 5f0a107..6773b2a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -44,7 +44,6 @@ import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -57,7 +56,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import static org.hamcrest.CoreMatchers.equalTo; @@ -90,12 +90,13 @@ public class RegexSourceIntegrationTest { private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName(); private Properties streamsConfiguration; private static final String STREAM_TASKS_NOT_UPDATED = "Stream tasks not updated"; - private KafkaStreams streams; + @Before + public void setUp() throws InterruptedException { + final Properties properties = new Properties(); + properties.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); - @BeforeClass - public static void startKafkaCluster() throws InterruptedException { - CLUSTER.createTopics( + CLUSTER.deleteAndRecreateTopics( TOPIC_1, TOPIC_2, TOPIC_A, @@ -105,26 +106,21 @@ public class RegexSourceIntegrationTest { FA_TOPIC, FOO_TOPIC, DEFAULT_OUTPUT_TOPIC); + + CLUSTER.deleteTopicsAndWait(PARTITIONED_TOPIC_1, PARTITIONED_TOPIC_2); + CLUSTER.createTopic(PARTITIONED_TOPIC_1, 2, 1); CLUSTER.createTopic(PARTITIONED_TOPIC_2, 2, 1); - } - @Before - public void setUp() { - final Properties properties = new Properties(); - properties.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); - streamsConfiguration = StreamsTestUtils.getStreamsConfig("regex-source-integration-test", - CLUSTER.bootstrapServers(), - STRING_SERDE_CLASSNAME, - STRING_SERDE_CLASSNAME, - properties); + streamsConfiguration = StreamsTestUtils.getStreamsConfig(UUID.randomUUID().toString(), + CLUSTER.bootstrapServers(), + STRING_SERDE_CLASSNAME, + STRING_SERDE_CLASSNAME, + properties); } @After public void tearDown() throws IOException { - if (streams != null) { - streams.close(); - } // Remove any state from previous test runs IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); } @@ -145,8 +141,8 @@ public class RegexSourceIntegrationTest { final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d")); pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); - final List<String> assignedTopics = new CopyOnWriteArrayList<>(); - streams = new KafkaStreams(builder.build(), streamsConfig, new DefaultKafkaClientSupplier() { + final List<String> assignedTopics = new ArrayList<>(); + final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig, new DefaultKafkaClientSupplier() { @Override public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) { return new KafkaConsumer<byte[], byte[]>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) { @@ -159,23 +155,30 @@ public class RegexSourceIntegrationTest { } }); + try { + streams.start(); + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + synchronized (assignedTopics) { + return assignedTopics.equals(expectedFirstAssignment); + } + } + }, STREAM_TASKS_NOT_UPDATED); - streams.start(); - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - return assignedTopics.equals(expectedFirstAssignment); - } - }, STREAM_TASKS_NOT_UPDATED); - - CLUSTER.createTopic("TEST-TOPIC-2"); + CLUSTER.createTopic("TEST-TOPIC-2"); - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - return assignedTopics.equals(expectedSecondAssignment); - } - }, STREAM_TASKS_NOT_UPDATED); + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + synchronized (assignedTopics) { + return assignedTopics.equals(expectedSecondAssignment); + } + } + }, STREAM_TASKS_NOT_UPDATED); + } finally { + streams.close(5, TimeUnit.SECONDS); + } } @@ -196,8 +199,8 @@ public class RegexSourceIntegrationTest { pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); - final List<String> assignedTopics = new CopyOnWriteArrayList<>(); - streams = new KafkaStreams(builder.build(), streamsConfig, new DefaultKafkaClientSupplier() { + final List<String> assignedTopics = new ArrayList<>(); + final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig, new DefaultKafkaClientSupplier() { @Override public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) { return new KafkaConsumer<byte[], byte[]>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) { @@ -210,23 +213,30 @@ public class RegexSourceIntegrationTest { } }); + try { + streams.start(); + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + synchronized (assignedTopics) { + return assignedTopics.equals(expectedFirstAssignment); + } + } + }, STREAM_TASKS_NOT_UPDATED); - streams.start(); - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - return assignedTopics.equals(expectedFirstAssignment); - } - }, STREAM_TASKS_NOT_UPDATED); - - CLUSTER.deleteTopic("TEST-TOPIC-A"); + CLUSTER.deleteTopic("TEST-TOPIC-A"); - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - return assignedTopics.equals(expectedSecondAssignment); - } - }, STREAM_TASKS_NOT_UPDATED); + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + synchronized (assignedTopics) { + return assignedTopics.equals(expectedSecondAssignment); + } + } + }, STREAM_TASKS_NOT_UPDATED); + } finally { + streams.close(5, TimeUnit.SECONDS); + } } @SuppressWarnings("deprecation") @@ -238,12 +248,12 @@ public class RegexSourceIntegrationTest { final long thirtySecondTimeout = 30 * 1000; final TopologyBuilder builder = new TopologyBuilder() - .addSource("ingest", Pattern.compile("topic-\\d+")) - .addProcessor("my-processor", processorSupplier, "ingest") - .addStateStore(stateStoreSupplier, "my-processor"); + .addSource("ingest", Pattern.compile("topic-\\d+")) + .addProcessor("my-processor", processorSupplier, "ingest") + .addStateStore(stateStoreSupplier, "my-processor"); - streams = new KafkaStreams(builder, streamsConfiguration); + final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); try { streams.start(); @@ -259,7 +269,7 @@ public class RegexSourceIntegrationTest { TestUtils.waitForCondition(stateStoreNameBoundToSourceTopic, thirtySecondTimeout, "Did not find topic: [topic-1] connected to state store: [testStateStore]"); } finally { - streams.close(); + streams.close(5, TimeUnit.SECONDS); } } @@ -287,31 +297,35 @@ public class RegexSourceIntegrationTest { pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); namedTopicsStream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); - streams = new KafkaStreams(builder.build(), streamsConfiguration); - streams.start(); + final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); + try { + streams.start(); - final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class); + final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList(topic1TestMessage), producerConfig, mockTime); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Arrays.asList(topic2TestMessage), producerConfig, mockTime); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Arrays.asList(topicATestMessage), producerConfig, mockTime); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Arrays.asList(topicCTestMessage), producerConfig, mockTime); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Arrays.asList(topicYTestMessage), producerConfig, mockTime); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Arrays.asList(topicZTestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList(topic1TestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Arrays.asList(topic2TestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Arrays.asList(topicATestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Arrays.asList(topicCTestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Arrays.asList(topicYTestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Arrays.asList(topicZTestMessage), producerConfig, mockTime); - final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class); + final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class); - final List<String> expectedReceivedValues = Arrays.asList(topicATestMessage, topic1TestMessage, topic2TestMessage, topicCTestMessage, topicYTestMessage, topicZTestMessage); - final List<KeyValue<String, String>> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 6); - final List<String> actualValues = new ArrayList<>(6); + final List<String> expectedReceivedValues = Arrays.asList(topicATestMessage, topic1TestMessage, topic2TestMessage, topicCTestMessage, topicYTestMessage, topicZTestMessage); + final List<KeyValue<String, String>> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 6); + final List<String> actualValues = new ArrayList<>(6); - for (final KeyValue<String, String> receivedKeyValue : receivedKeyValues) { - actualValues.add(receivedKeyValue.value); - } + for (final KeyValue<String, String> receivedKeyValue : receivedKeyValues) { + actualValues.add(receivedKeyValue.value); + } - Collections.sort(actualValues); - Collections.sort(expectedReceivedValues); - assertThat(actualValues, equalTo(expectedReceivedValues)); + Collections.sort(actualValues); + Collections.sort(expectedReceivedValues); + assertThat(actualValues, equalTo(expectedReceivedValues)); + } finally { + streams.close(5, TimeUnit.SECONDS); + } } @Test @@ -404,18 +418,22 @@ public class RegexSourceIntegrationTest { pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); - streams = new KafkaStreams(builder.build(), streamsConfiguration); - streams.start(); + final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); + try { + streams.start(); - final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class); + final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class); - IntegrationTestUtils.produceValuesSynchronously(FA_TOPIC, Arrays.asList(fMessage), producerConfig, mockTime); - IntegrationTestUtils.produceValuesSynchronously(FOO_TOPIC, Arrays.asList(fooMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(FA_TOPIC, Arrays.asList(fMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(FOO_TOPIC, Arrays.asList(fooMessage), producerConfig, mockTime); - final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class); + final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class); - IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 2, 5000); - fail("Should not get here"); + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 2, 5000); + fail("Should not get here"); + } finally { + streams.close(5, TimeUnit.SECONDS); + } } private static class TheConsumerRebalanceListener implements ConsumerRebalanceListener { @@ -429,16 +447,20 @@ public class RegexSourceIntegrationTest { @Override public void onPartitionsRevoked(final Collection<TopicPartition> partitions) { - assignedTopics.clear(); + synchronized (assignedTopics) { + assignedTopics.clear(); + } listener.onPartitionsRevoked(partitions); } @Override public void onPartitionsAssigned(final Collection<TopicPartition> partitions) { - for (final TopicPartition partition : partitions) { - assignedTopics.add(partition.topic()); + synchronized (assignedTopics) { + for (final TopicPartition partition : partitions) { + assignedTopics.add(partition.topic()); + } + Collections.sort(assignedTopics); } - Collections.sort(assignedTopics); listener.onPartitionsAssigned(partitions); } }