Repository: kafka Updated Branches: refs/heads/trunk ed639e826 -> de1b853c3
http://git-wip-us.apache.org/repos/asf/kafka/blob/de1b853c/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index 02e8eb7..310b584 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -4,9 +4,9 @@ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. You may obtain a * copy of the License at - * + * <p> * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> * Unless required by applicable law or agreed to in writing, software distributed under the License * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. See the License for the specific language governing permissions and limitations under @@ -14,6 +14,7 @@ */ package org.apache.kafka.streams.integration; +import kafka.utils.MockTime; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -40,10 +41,8 @@ import org.apache.kafka.streams.state.ReadOnlyWindowStore; import org.apache.kafka.streams.state.StreamsMetadata; import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.test.MockKeyValueMapper; -import org.apache.kafka.test.TestUtils; import org.apache.kafka.test.TestCondition; -import static org.junit.Assert.fail; -import static org.junit.Assert.assertTrue; +import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@ -54,28 +53,30 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.Map; -import java.util.HashMap; import java.util.TreeSet; - import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class QueryableStateIntegrationTest { private static final int NUM_BROKERS = 2; @ClassRule - public static final EmbeddedKafkaCluster CLUSTER = - new EmbeddedKafkaCluster(NUM_BROKERS); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + private final MockTime mockTime = CLUSTER.time; private static final String STREAM_ONE = "stream-one"; private static final String STREAM_CONCURRENT = "stream-concurrent"; private static final String OUTPUT_TOPIC = "output"; private static final String OUTPUT_TOPIC_CONCURRENT = "output-concurrent"; - private static final String STREAM_THREE = "stream-three"; + private static final String STREAM_TWO = "stream-two"; private static final int NUM_PARTITIONS = NUM_BROKERS; private static final int NUM_REPLICAS = NUM_BROKERS; private static final long WINDOW_SIZE = 60000L; @@ -91,7 +92,7 @@ public class QueryableStateIntegrationTest { public static void createTopics() { CLUSTER.createTopic(STREAM_ONE); CLUSTER.createTopic(STREAM_CONCURRENT); - CLUSTER.createTopic(STREAM_THREE, NUM_PARTITIONS, NUM_REPLICAS); + CLUSTER.createTopic(STREAM_TWO, NUM_PARTITIONS, NUM_REPLICAS); CLUSTER.createTopic(OUTPUT_TOPIC); CLUSTER.createTopic(OUTPUT_TOPIC_CONCURRENT); CLUSTER.createTopic(OUTPUT_TOPIC_THREE); @@ -107,12 +108,9 @@ public class QueryableStateIntegrationTest { .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, - TestUtils.tempDirectory("qs-test") - .getPath()); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("qs-test").getPath()); streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - streamsConfiguration - .put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); stringComparator = new Comparator<KeyValue<String, String>>() { @@ -130,23 +128,24 @@ public class QueryableStateIntegrationTest { return o1.key.compareTo(o2.key); } }; - inputValues = Arrays.asList("hello world", - "all streams lead to kafka", - "streams", - "kafka streams", - "the cat in the hat", - "green eggs and ham", - "that sam i am", - "up the creek without a paddle", - "run forest run", - "a tank full of gas", - "eat sleep rave repeat", - "one jolly sailor", - "king of the world"); + inputValues = Arrays.asList( + "hello world", + "all streams lead to kafka", + "streams", + "kafka streams", + "the cat in the hat", + "green eggs and ham", + "that sam i am", + "up the creek without a paddle", + "run forest run", + "a tank full of gas", + "eat sleep rave repeat", + "one jolly sailor", + "king of the world"); inputValuesKeys = new HashSet<>(); - for (String sentence : inputValues) { - String[] words = sentence.split("\\W+"); - for (String word : words) { + for (final String sentence : inputValues) { + final String[] words = sentence.split("\\W+"); + for (final String word : words) { inputValuesKeys.add(word); } } @@ -163,20 +162,21 @@ public class QueryableStateIntegrationTest { /** * Creates a typical word count topology + * * @param inputTopic * @param outputTopic * @param streamsConfiguration config * @return */ - private KafkaStreams createCountStream(String inputTopic, String outputTopic, Properties streamsConfiguration) { - KStreamBuilder builder = new KStreamBuilder(); + private KafkaStreams createCountStream(final String inputTopic, final String outputTopic, final Properties streamsConfiguration) { + final KStreamBuilder builder = new KStreamBuilder(); final Serde<String> stringSerde = Serdes.String(); final KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, inputTopic); final KGroupedStream<String, String> groupedByWord = textLines .flatMapValues(new ValueMapper<String, Iterable<String>>() { @Override - public Iterable<String> apply(String value) { + public Iterable<String> apply(final String value) { return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); } }) @@ -194,21 +194,22 @@ public class QueryableStateIntegrationTest { private class StreamRunnable implements Runnable { private final KafkaStreams myStream; private boolean closed = false; - StreamRunnable(String inputTopic, String outputTopic, int queryPort) { - Properties props = (Properties) streamsConfiguration.clone(); + + StreamRunnable(final String inputTopic, final String outputTopic, final int queryPort) { + final Properties props = (Properties) streamsConfiguration.clone(); props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + queryPort); - this.myStream = createCountStream(inputTopic, outputTopic, props); + myStream = createCountStream(inputTopic, outputTopic, props); } @Override public void run() { - this.myStream.start(); + myStream.start(); } public void close() { if (!closed) { - this.myStream.close(); + myStream.close(); closed = true; } } @@ -237,7 +238,7 @@ public class QueryableStateIntegrationTest { final ReadOnlyKeyValueStore<String, Long> store; try { store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); - } catch (IllegalStateException e) { + } catch (final IllegalStateException e) { // Kafka Streams instance may have closed but rebalance hasn't happened return false; } @@ -249,7 +250,7 @@ public class QueryableStateIntegrationTest { private void verifyAllWindowedKeys(final StreamRunnable[] streamRunnables, final KafkaStreams streams, - final Set<String> keys, final String storeName , + final Set<String> keys, final String storeName, final Long from, final Long to) throws Exception { for (final String key : keys) { TestUtils.waitForCondition(new TestCondition() { @@ -264,7 +265,7 @@ public class QueryableStateIntegrationTest { final ReadOnlyWindowStore<String, Long> store; try { store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>windowStore()); - } catch (IllegalStateException e) { + } catch (final IllegalStateException e) { // Kafka Streams instance may have closed but rebalance hasn't happened return false; } @@ -277,18 +278,18 @@ public class QueryableStateIntegrationTest { @Test public void queryOnRebalance() throws Exception { - int numThreads = NUM_PARTITIONS; - StreamRunnable[] streamRunnables = new StreamRunnable[numThreads]; - Thread[] streamThreads = new Thread[numThreads]; + final int numThreads = NUM_PARTITIONS; + final StreamRunnable[] streamRunnables = new StreamRunnable[numThreads]; + final Thread[] streamThreads = new Thread[numThreads]; final int numIterations = 500000; // create concurrent producer - ProducerRunnable producerRunnable = new ProducerRunnable(STREAM_THREE, inputValues, numIterations); - Thread producerThread = new Thread(producerRunnable); + final ProducerRunnable producerRunnable = new ProducerRunnable(STREAM_TWO, inputValues, numIterations); + final Thread producerThread = new Thread(producerRunnable); // create three stream threads for (int i = 0; i < numThreads; i++) { - streamRunnables[i] = new StreamRunnable(STREAM_THREE, OUTPUT_TOPIC_THREE, i); + streamRunnables[i] = new StreamRunnable(STREAM_TWO, OUTPUT_TOPIC_THREE, i); streamThreads[i] = new Thread(streamRunnables[i]); streamThreads[i].start(); } @@ -299,9 +300,9 @@ public class QueryableStateIntegrationTest { for (int i = 0; i < numThreads; i++) { verifyAllKVKeys(streamRunnables, streamRunnables[i].getStream(), inputValuesKeys, - "word-count-store-" + STREAM_THREE); + "word-count-store-" + STREAM_TWO); verifyAllWindowedKeys(streamRunnables, streamRunnables[i].getStream(), inputValuesKeys, - "windowed-word-count-store-" + STREAM_THREE, 0L, WINDOW_SIZE); + "windowed-word-count-store-" + STREAM_TWO, 0L, WINDOW_SIZE); } // kill N-1 threads @@ -313,9 +314,9 @@ public class QueryableStateIntegrationTest { // query from the remaining thread verifyAllKVKeys(streamRunnables, streamRunnables[0].getStream(), inputValuesKeys, - "word-count-store-" + STREAM_THREE); + "word-count-store-" + STREAM_TWO); verifyAllWindowedKeys(streamRunnables, streamRunnables[0].getStream(), inputValuesKeys, - "windowed-word-count-store-" + STREAM_THREE, 0L, WINDOW_SIZE); + "windowed-word-count-store-" + STREAM_TWO, 0L, WINDOW_SIZE); } finally { for (int i = 0; i < numThreads; i++) { if (!streamRunnables[i].isClosed()) { @@ -335,8 +336,8 @@ public class QueryableStateIntegrationTest { final int numIterations = 500000; - ProducerRunnable producerRunnable = new ProducerRunnable(STREAM_CONCURRENT, inputValues, numIterations); - Thread producerThread = new Thread(producerRunnable); + final ProducerRunnable producerRunnable = new ProducerRunnable(STREAM_CONCURRENT, inputValues, numIterations); + final Thread producerThread = new Thread(producerRunnable); kafkaStreams = createCountStream(STREAM_CONCURRENT, OUTPUT_TOPIC_CONCURRENT, streamsConfiguration); kafkaStreams.start(); producerThread.start(); @@ -351,8 +352,8 @@ public class QueryableStateIntegrationTest { kafkaStreams.store("windowed-word-count-store-" + STREAM_CONCURRENT, QueryableStoreTypes.<String, Long>windowStore()); - Map<String, Long> expectedWindowState = new HashMap<>(); - Map<String, Long> expectedCount = new HashMap<>(); + final Map<String, Long> expectedWindowState = new HashMap<>(); + final Map<String, Long> expectedCount = new HashMap<>(); while (producerRunnable.getCurrIteration() < numIterations) { verifyGreaterOrEqual(inputValuesKeys.toArray(new String[inputValuesKeys.size()]), expectedWindowState, expectedCount, windowStore, keyValueStore, false); @@ -369,7 +370,7 @@ public class QueryableStateIntegrationTest { @Test public void shouldBeAbleToQueryState() throws Exception { - KStreamBuilder builder = new KStreamBuilder(); + final KStreamBuilder builder = new KStreamBuilder(); final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"}; final Set<KeyValue<String, String>> batch1 = new TreeSet<>(stringComparator); @@ -382,7 +383,7 @@ public class QueryableStateIntegrationTest { final Set<KeyValue<String, Long>> expectedCount = new TreeSet<>(stringLongComparator); - for (String key : keys) { + for (final String key : keys) { expectedCount.add(new KeyValue<>(key, 1L)); } @@ -393,7 +394,8 @@ public class QueryableStateIntegrationTest { CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, - new Properties())); + new Properties()), + mockTime); final KStream<String, String> s1 = builder.stream(STREAM_ONE); @@ -410,12 +412,12 @@ public class QueryableStateIntegrationTest { myCount = kafkaStreams.store("my-count", QueryableStoreTypes.<String, Long>keyValueStore()); final ReadOnlyWindowStore<String, Long> windowStore = - kafkaStreams.store("windowed-count", QueryableStoreTypes.<String, Long>windowStore()); + kafkaStreams.store("windowed-count", QueryableStoreTypes.<String, Long>windowStore()); verifyCanGetByKey(keys, - expectedCount, - expectedCount, - windowStore, - myCount); + expectedCount, + expectedCount, + windowStore, + myCount); verifyRangeAndAll(expectedCount, myCount); @@ -463,10 +465,10 @@ public class QueryableStateIntegrationTest { final long timeout = System.currentTimeMillis() + 30000; while (windowState.size() < 5 && - countState.size() < 5 && - System.currentTimeMillis() < timeout) { + countState.size() < 5 && + System.currentTimeMillis() < timeout) { Thread.sleep(10); - for (String key : keys) { + for (final String key : keys) { windowState.addAll(fetch(windowStore, key)); final Long value = myCount.get(key); if (value != null) { @@ -481,28 +483,29 @@ public class QueryableStateIntegrationTest { /** * Verify that the new count is greater than or equal to the previous count. * Note: this method changes the values in expectedWindowState and expectedCount - * @param keys All the keys we ever expect to find + * + * @param keys All the keys we ever expect to find * @param expectedWindowedCount Expected windowed count - * @param expectedCount Expected count - * @param windowStore Window Store - * @param keyValueStore Key-value store - * @param failIfKeyNotFound if true, tests fails if an expected key is not found in store. If false, - * the method merely inserts the new found key into the list of - * expected keys. + * @param expectedCount Expected count + * @param windowStore Window Store + * @param keyValueStore Key-value store + * @param failIfKeyNotFound if true, tests fails if an expected key is not found in store. If false, + * the method merely inserts the new found key into the list of + * expected keys. * @throws InterruptedException */ private void verifyGreaterOrEqual(final String[] keys, - Map<String, Long> expectedWindowedCount, - Map<String, Long> expectedCount, + final Map<String, Long> expectedWindowedCount, + final Map<String, Long> expectedCount, final ReadOnlyWindowStore<String, Long> windowStore, final ReadOnlyKeyValueStore<String, Long> keyValueStore, - boolean failIfKeyNotFound) + final boolean failIfKeyNotFound) throws InterruptedException { final Map<String, Long> windowState = new HashMap<>(); final Map<String, Long> countState = new HashMap<>(); - for (String key : keys) { - Map<String, Long> map = fetchMap(windowStore, key); + for (final String key : keys) { + final Map<String, Long> map = fetchMap(windowStore, key); if (map.equals(Collections.<String, Long>emptyMap()) && failIfKeyNotFound) { fail("Key not found " + key); } @@ -517,9 +520,9 @@ public class QueryableStateIntegrationTest { } } - for (Map.Entry<String, Long> actualWindowStateEntry : windowState.entrySet()) { + for (final Map.Entry<String, Long> actualWindowStateEntry : windowState.entrySet()) { if (expectedWindowedCount.containsKey(actualWindowStateEntry.getKey())) { - Long expectedValue = expectedWindowedCount.get(actualWindowStateEntry.getKey()); + final Long expectedValue = expectedWindowedCount.get(actualWindowStateEntry.getKey()); assertTrue(actualWindowStateEntry.getValue() >= expectedValue); } else { if (failIfKeyNotFound) { @@ -530,9 +533,9 @@ public class QueryableStateIntegrationTest { expectedWindowedCount.put(actualWindowStateEntry.getKey(), actualWindowStateEntry.getValue()); } - for (Map.Entry<String, Long> actualCountStateEntry : countState.entrySet()) { + for (final Map.Entry<String, Long> actualCountStateEntry : countState.entrySet()) { if (expectedCount.containsKey(actualCountStateEntry.getKey())) { - Long expectedValue = expectedCount.get(actualCountStateEntry.getKey()); + final Long expectedValue = expectedCount.get(actualCountStateEntry.getKey()); assertTrue(actualCountStateEntry.getValue() >= expectedValue); } else { if (failIfKeyNotFound) { @@ -545,28 +548,28 @@ public class QueryableStateIntegrationTest { } - private void waitUntilAtLeastNumRecordProcessed(String topic, int numRecs) throws InterruptedException { + private void waitUntilAtLeastNumRecordProcessed(final String topic, final int numRecs) throws InterruptedException { final Properties config = new Properties(); config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "queryable-state-consumer"); config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class.getName()); + StringDeserializer.class.getName()); config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - LongDeserializer.class.getName()); - IntegrationTestUtils.waitUntilMinValuesRecordsReceived(config, - topic, - numRecs, - 60 * - 1000); + LongDeserializer.class.getName()); + IntegrationTestUtils.waitUntilMinValuesRecordsReceived( + config, + topic, + numRecs, + 60 * 1000); } private Set<KeyValue<String, Long>> fetch(final ReadOnlyWindowStore<String, Long> store, - final String key) { + final String key) { final WindowStoreIterator<Long> fetch = store.fetch(key, 0, System.currentTimeMillis()); if (fetch.hasNext()) { - KeyValue<Long, Long> next = fetch.next(); + final KeyValue<Long, Long> next = fetch.next(); return Collections.singleton(KeyValue.pair(key, next.value)); } return Collections.emptySet(); @@ -577,7 +580,7 @@ public class QueryableStateIntegrationTest { final WindowStoreIterator<Long> fetch = store.fetch(key, 0, System.currentTimeMillis()); if (fetch.hasNext()) { - KeyValue<Long, Long> next = fetch.next(); + final KeyValue<Long, Long> next = fetch.next(); return Collections.singletonMap(key, next.value); } return Collections.emptyMap(); @@ -588,13 +591,13 @@ public class QueryableStateIntegrationTest { * A class that periodically produces records in a separate thread */ private class ProducerRunnable implements Runnable { - private String topic; + private final String topic; private final List<String> inputValues; private final int numIterations; private int currIteration = 0; boolean shutdown = false; - ProducerRunnable(String topic, List<String> inputValues, int numIterations) { + ProducerRunnable(final String topic, final List<String> inputValues, final int numIterations) { this.topic = topic; this.inputValues = inputValues; this.numIterations = numIterations; @@ -603,16 +606,18 @@ public class QueryableStateIntegrationTest { private synchronized void incrementInteration() { currIteration++; } + public synchronized int getCurrIteration() { return currIteration; } + public synchronized void shutdown() { shutdown = true; } @Override public void run() { - Properties producerConfig = new Properties(); + final Properties producerConfig = new Properties(); producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); http://git-wip-us.apache.org/repos/asf/kafka/blob/de1b853c/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index dd43af6..fe0a0eb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -16,6 +16,7 @@ package org.apache.kafka.streams.integration; +import kafka.utils.MockTime; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serde; @@ -38,8 +39,8 @@ import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.apache.kafka.streams.processor.internals.StreamTask; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.StreamsMetadataState; -import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.StreamsTestUtils; +import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; @@ -65,11 +66,11 @@ import static org.junit.Assert.fail; * End-to-end integration test based on using regex and named topics for creating sources, using * an embedded Kafka cluster. */ - public class RegexSourceIntegrationTest { private static final int NUM_BROKERS = 1; @ClassRule public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + private final MockTime mockTime = CLUSTER.time; private static final String TOPIC_1 = "topic-1"; private static final String TOPIC_2 = "topic-2"; @@ -103,8 +104,8 @@ public class RegexSourceIntegrationTest { public void setUp() { streamsConfiguration = StreamsTestUtils.getStreamsConfig(CLUSTER.bootstrapServers(), - STRING_SERDE_CLASSNAME, - STRING_SERDE_CLASSNAME); + STRING_SERDE_CLASSNAME, + STRING_SERDE_CLASSNAME); } @After @@ -120,28 +121,28 @@ public class RegexSourceIntegrationTest { final List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-1"); final List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2"); - StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration); + final StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration); CLUSTER.createTopic("TEST-TOPIC-1"); - KStreamBuilder builder = new KStreamBuilder(); + final KStreamBuilder builder = new KStreamBuilder(); - KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d")); + final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d")); pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); - KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); + final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); - Field streamThreadsField = streams.getClass().getDeclaredField("threads"); + final Field streamThreadsField = streams.getClass().getDeclaredField("threads"); streamThreadsField.setAccessible(true); - StreamThread[] streamThreads = (StreamThread[]) streamThreadsField.get(streams); - StreamThread originalThread = streamThreads[0]; + final StreamThread[] streamThreads = (StreamThread[]) streamThreadsField.get(streams); + final StreamThread originalThread = streamThreads[0]; final TestStreamThread testStreamThread = new TestStreamThread(builder, streamsConfig, - new DefaultKafkaClientSupplier(), - originalThread.applicationId, originalThread.clientId, originalThread.processId, new Metrics(), new SystemTime()); + new DefaultKafkaClientSupplier(), + originalThread.applicationId, originalThread.clientId, originalThread.processId, new Metrics(), new SystemTime()); - TestCondition oneTopicAdded = new TestCondition() { + final TestCondition oneTopicAdded = new TestCondition() { @Override public boolean conditionMet() { return testStreamThread.assignedTopicPartitions.equals(expectedFirstAssignment); @@ -155,7 +156,7 @@ public class RegexSourceIntegrationTest { CLUSTER.createTopic("TEST-TOPIC-2"); - TestCondition secondTopicAdded = new TestCondition() { + final TestCondition secondTopicAdded = new TestCondition() { @Override public boolean conditionMet() { return testStreamThread.assignedTopicPartitions.equals(expectedSecondAssignment); @@ -174,31 +175,31 @@ public class RegexSourceIntegrationTest { final List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-A", "TEST-TOPIC-B"); final List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-B"); - StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration); + final StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration); CLUSTER.createTopic("TEST-TOPIC-A"); CLUSTER.createTopic("TEST-TOPIC-B"); - KStreamBuilder builder = new KStreamBuilder(); + final KStreamBuilder builder = new KStreamBuilder(); - KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-[A-Z]")); + final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-[A-Z]")); pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); - KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); + final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); - Field streamThreadsField = streams.getClass().getDeclaredField("threads"); + final Field streamThreadsField = streams.getClass().getDeclaredField("threads"); streamThreadsField.setAccessible(true); - StreamThread[] streamThreads = (StreamThread[]) streamThreadsField.get(streams); - StreamThread originalThread = streamThreads[0]; + final StreamThread[] streamThreads = (StreamThread[]) streamThreadsField.get(streams); + final StreamThread originalThread = streamThreads[0]; final TestStreamThread testStreamThread = new TestStreamThread(builder, streamsConfig, - new DefaultKafkaClientSupplier(), - originalThread.applicationId, originalThread.clientId, originalThread.processId, new Metrics(), new SystemTime()); + new DefaultKafkaClientSupplier(), + originalThread.applicationId, originalThread.clientId, originalThread.processId, new Metrics(), new SystemTime()); streamThreads[0] = testStreamThread; - TestCondition bothTopicsAdded = new TestCondition() { + final TestCondition bothTopicsAdded = new TestCondition() { @Override public boolean conditionMet() { return testStreamThread.assignedTopicPartitions.equals(expectedFirstAssignment); @@ -210,7 +211,7 @@ public class RegexSourceIntegrationTest { CLUSTER.deleteTopic("TEST-TOPIC-A"); - TestCondition oneTopicRemoved = new TestCondition() { + final TestCondition oneTopicRemoved = new TestCondition() { @Override public boolean conditionMet() { return testStreamThread.assignedTopicPartitions.equals(expectedSecondAssignment); @@ -226,45 +227,45 @@ public class RegexSourceIntegrationTest { @Test public void testShouldReadFromRegexAndNamedTopics() throws Exception { - String topic1TestMessage = "topic-1 test"; - String topic2TestMessage = "topic-2 test"; - String topicATestMessage = "topic-A test"; - String topicCTestMessage = "topic-C test"; - String topicYTestMessage = "topic-Y test"; - String topicZTestMessage = "topic-Z test"; + final String topic1TestMessage = "topic-1 test"; + final String topic2TestMessage = "topic-2 test"; + final String topicATestMessage = "topic-A test"; + final String topicCTestMessage = "topic-C test"; + final String topicYTestMessage = "topic-Y test"; + final String topicZTestMessage = "topic-Z test"; final Serde<String> stringSerde = Serdes.String(); - KStreamBuilder builder = new KStreamBuilder(); + final KStreamBuilder builder = new KStreamBuilder(); - KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("topic-\\d")); - KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("topic-[A-D]")); - KStream<String, String> namedTopicsStream = builder.stream(TOPIC_Y, TOPIC_Z); + final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("topic-\\d")); + final KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("topic-[A-D]")); + final KStream<String, String> namedTopicsStream = builder.stream(TOPIC_Y, TOPIC_Z); pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); namedTopicsStream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); - KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); + final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.start(); - Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class); + final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList(topic1TestMessage), producerConfig); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Arrays.asList(topic2TestMessage), producerConfig); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Arrays.asList(topicATestMessage), producerConfig); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Arrays.asList(topicCTestMessage), producerConfig); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Arrays.asList(topicYTestMessage), producerConfig); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Arrays.asList(topicZTestMessage), producerConfig); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList(topic1TestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Arrays.asList(topic2TestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Arrays.asList(topicATestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Arrays.asList(topicCTestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Arrays.asList(topicYTestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Arrays.asList(topicZTestMessage), producerConfig, mockTime); - Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class); + final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class); - List<String> expectedReceivedValues = Arrays.asList(topicATestMessage, topic1TestMessage, topic2TestMessage, topicCTestMessage, topicYTestMessage, topicZTestMessage); - List<KeyValue<String, String>> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 6); - List<String> actualValues = new ArrayList<>(6); + final List<String> expectedReceivedValues = Arrays.asList(topicATestMessage, topic1TestMessage, topic2TestMessage, topicCTestMessage, topicYTestMessage, topicZTestMessage); + final List<KeyValue<String, String>> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 6); + final List<String> actualValues = new ArrayList<>(6); - for (KeyValue<String, String> receivedKeyValue : receivedKeyValues) { + for (final KeyValue<String, String> receivedKeyValue : receivedKeyValues) { actualValues.add(receivedKeyValue.value); } @@ -278,34 +279,34 @@ public class RegexSourceIntegrationTest { @Test(expected = AssertionError.class) public void testNoMessagesSentExceptionFromOverlappingPatterns() throws Exception { - String fooMessage = "fooMessage"; - String fMessage = "fMessage"; + final String fooMessage = "fooMessage"; + final String fMessage = "fMessage"; final Serde<String> stringSerde = Serdes.String(); - KStreamBuilder builder = new KStreamBuilder(); + final KStreamBuilder builder = new KStreamBuilder(); // overlapping patterns here, no messages should be sent as TopologyBuilderException // will be thrown when the processor topology is built. - KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("foo.*")); - KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("f.*")); + final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("foo.*")); + final KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("f.*")); pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); - KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); + final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.start(); - Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class); + final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class); - IntegrationTestUtils.produceValuesSynchronously(FA_TOPIC, Arrays.asList(fMessage), producerConfig); - IntegrationTestUtils.produceValuesSynchronously(FOO_TOPIC, Arrays.asList(fooMessage), producerConfig); + IntegrationTestUtils.produceValuesSynchronously(FA_TOPIC, Arrays.asList(fMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(FOO_TOPIC, Arrays.asList(fooMessage), producerConfig, mockTime); - Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class); + final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class); try { IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 2, 5000); @@ -319,14 +320,14 @@ public class RegexSourceIntegrationTest { private class TestStreamThread extends StreamThread { public volatile List<String> assignedTopicPartitions = new ArrayList<>(); - public TestStreamThread(TopologyBuilder builder, StreamsConfig config, KafkaClientSupplier clientSupplier, String applicationId, String clientId, UUID processId, Metrics metrics, Time time) { + public TestStreamThread(final TopologyBuilder builder, final StreamsConfig config, final KafkaClientSupplier clientSupplier, final String applicationId, final String clientId, final UUID processId, final Metrics metrics, final Time time) { super(builder, config, clientSupplier, applicationId, clientId, processId, metrics, time, new StreamsMetadataState(builder)); } @Override - public StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) { - List<String> topicPartitions = new ArrayList<>(); - for (TopicPartition partition : partitions) { + public StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) { + final List<String> topicPartitions = new ArrayList<>(); + for (final TopicPartition partition : partitions) { topicPartitions.add(partition.topic()); } Collections.sort(topicPartitions); http://git-wip-us.apache.org/repos/asf/kafka/blob/de1b853c/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index 8e9101d..9c0cbe1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.integration.utils; import kafka.server.KafkaConfig$; +import kafka.utils.MockTime; import kafka.zk.EmbeddedZookeeper; import org.junit.rules.ExternalResource; import org.slf4j.Logger; @@ -36,15 +37,17 @@ public class EmbeddedKafkaCluster extends ExternalResource { private EmbeddedZookeeper zookeeper = null; private final KafkaEmbedded[] brokers; - public EmbeddedKafkaCluster(int numBrokers) { - this.brokers = new KafkaEmbedded[numBrokers]; + public EmbeddedKafkaCluster(final int numBrokers) { + brokers = new KafkaEmbedded[numBrokers]; } + public MockTime time = new MockTime(); + /** * Creates and starts a Kafka cluster. */ public void start() throws IOException, InterruptedException { - Properties brokerConfig = new Properties(); + final Properties brokerConfig = new Properties(); log.debug("Initiating embedded Kafka cluster startup"); log.debug("Starting a ZooKeeper instance"); @@ -56,10 +59,10 @@ public class EmbeddedKafkaCluster extends ExternalResource { brokerConfig.put(KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L); brokerConfig.put(KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), 0); - for (int i = 0; i < this.brokers.length; i++) { + for (int i = 0; i < brokers.length; i++) { brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), i); log.debug("Starting a Kafka instance on port {} ...", brokerConfig.getProperty(KafkaConfig$.MODULE$.PortProp())); - brokers[i] = new KafkaEmbedded(brokerConfig); + brokers[i] = new KafkaEmbedded(brokerConfig, time); log.debug("Kafka instance is running at {}, connected to ZooKeeper at {}", brokers[i].brokerList(), brokers[i].zookeeperConnect()); @@ -70,8 +73,8 @@ public class EmbeddedKafkaCluster extends ExternalResource { * Stop the Kafka cluster. */ public void stop() { - for (int i = 0; i < this.brokers.length; i++) { - brokers[i].stop(); + for (final KafkaEmbedded broker : brokers) { + broker.stop(); } zookeeper.shutdown(); } @@ -79,7 +82,7 @@ public class EmbeddedKafkaCluster extends ExternalResource { /** * The ZooKeeper connection string aka `zookeeper.connect` in `hostnameOrIp:port` format. * Example: `127.0.0.1:2181`. - * + * <p> * You can use this to e.g. tell Kafka brokers how to connect to this instance. */ public String zKConnectString() { @@ -88,7 +91,7 @@ public class EmbeddedKafkaCluster extends ExternalResource { /** * This cluster's `bootstrap.servers` value. Example: `127.0.0.1:9092`. - * + * <p> * You can use this to tell Kafka producers how to connect to this cluster. */ public String bootstrapServers() { @@ -108,7 +111,7 @@ public class EmbeddedKafkaCluster extends ExternalResource { * * @param topic The name of the topic. */ - public void createTopic(String topic) { + public void createTopic(final String topic) { createTopic(topic, 1, 1, new Properties()); } @@ -119,7 +122,7 @@ public class EmbeddedKafkaCluster extends ExternalResource { * @param partitions The number of partitions for this topic. * @param replication The replication factor for (the partitions of) this topic. */ - public void createTopic(String topic, int partitions, int replication) { + public void createTopic(final String topic, final int partitions, final int replication) { createTopic(topic, partitions, replication, new Properties()); } @@ -131,14 +134,14 @@ public class EmbeddedKafkaCluster extends ExternalResource { * @param replication The replication factor for (partitions of) this topic. * @param topicConfig Additional topic-level configuration settings. */ - public void createTopic(String topic, - int partitions, - int replication, - Properties topicConfig) { + public void createTopic(final String topic, + final int partitions, + final int replication, + final Properties topicConfig) { brokers[0].createTopic(topic, partitions, replication, topicConfig); } - public void deleteTopic(String topic) { + public void deleteTopic(final String topic) { brokers[0].deleteTopic(topic); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/de1b853c/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 1a1a561..117e6ff 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,6 +17,7 @@ package org.apache.kafka.streams.integration.utils; +import kafka.utils.Time; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -57,10 +58,10 @@ public class IntegrationTestUtils { * @param maxMessages Maximum number of messages to read via the consumer. * @return The values retrieved via the consumer. */ - public static <V> List<V> readValues(String topic, Properties consumerConfig, int maxMessages) { - List<V> returnList = new ArrayList<>(); - List<KeyValue<Object, V>> kvs = readKeyValues(topic, consumerConfig, maxMessages); - for (KeyValue<?, V> kv : kvs) { + public static <V> List<V> readValues(final String topic, final Properties consumerConfig, final int maxMessages) { + final List<V> returnList = new ArrayList<>(); + final List<KeyValue<Object, V>> kvs = readKeyValues(topic, consumerConfig, maxMessages); + for (final KeyValue<?, V> kv : kvs) { returnList.add(kv.value); } return returnList; @@ -74,7 +75,7 @@ public class IntegrationTestUtils { * @param consumerConfig Kafka consumer configuration * @return The KeyValue elements retrieved via the consumer. */ - public static <K, V> List<KeyValue<K, V>> readKeyValues(String topic, Properties consumerConfig) { + public static <K, V> List<KeyValue<K, V>> readKeyValues(final String topic, final Properties consumerConfig) { return readKeyValues(topic, consumerConfig, UNLIMITED_MESSAGES); } @@ -87,17 +88,17 @@ public class IntegrationTestUtils { * @param maxMessages Maximum number of messages to read via the consumer * @return The KeyValue elements retrieved via the consumer */ - public static <K, V> List<KeyValue<K, V>> readKeyValues(String topic, Properties consumerConfig, int maxMessages) { - KafkaConsumer<K, V> consumer = new KafkaConsumer<>(consumerConfig); + public static <K, V> List<KeyValue<K, V>> readKeyValues(final String topic, final Properties consumerConfig, final int maxMessages) { + final KafkaConsumer<K, V> consumer = new KafkaConsumer<>(consumerConfig); consumer.subscribe(Collections.singletonList(topic)); - int pollIntervalMs = 100; - int maxTotalPollTimeMs = 2000; + final int pollIntervalMs = 100; + final int maxTotalPollTimeMs = 2000; int totalPollTimeMs = 0; - List<KeyValue<K, V>> consumedValues = new ArrayList<>(); + final List<KeyValue<K, V>> consumedValues = new ArrayList<>(); while (totalPollTimeMs < maxTotalPollTimeMs && continueConsuming(consumedValues.size(), maxMessages)) { totalPollTimeMs += pollIntervalMs; - ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs); - for (ConsumerRecord<K, V> record : records) { + final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs); + for (final ConsumerRecord<K, V> record : records) { consumedValues.add(new KeyValue<>(record.key(), record.value())); } } @@ -105,7 +106,7 @@ public class IntegrationTestUtils { return consumedValues; } - private static boolean continueConsuming(int messagesConsumed, int maxMessages) { + private static boolean continueConsuming(final int messagesConsumed, final int maxMessages) { return maxMessages <= 0 || messagesConsumed < maxMessages; } @@ -114,11 +115,11 @@ public class IntegrationTestUtils { * * @param streamsConfiguration Streams configuration settings */ - public static void purgeLocalStreamsState(Properties streamsConfiguration) throws IOException { + public static void purgeLocalStreamsState(final Properties streamsConfiguration) throws IOException { final String tmpDir = TestUtils.IO_TMP_DIR.getPath(); - String path = streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG); + final String path = streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG); if (path != null) { - File node = Paths.get(path).normalize().toFile(); + final File node = Paths.get(path).normalize().toFile(); // Only purge state when it's under java.io.tmpdir. This is a safety net to prevent accidentally // deleting important local directory trees. if (node.getAbsolutePath().startsWith(tmpDir)) { @@ -135,22 +136,25 @@ public class IntegrationTestUtils { * @param <V> Value type of the data records */ public static <K, V> void produceKeyValuesSynchronously( - String topic, Collection<KeyValue<K, V>> records, Properties producerConfig) + final String topic, final Collection<KeyValue<K, V>> records, final Properties producerConfig, final Time time) throws ExecutionException, InterruptedException { - produceKeyValuesSynchronouslyWithTimestamp(topic, - records, - producerConfig, - null); + for (final KeyValue<K, V> record : records) { + produceKeyValuesSynchronouslyWithTimestamp(topic, + Collections.singleton(record), + producerConfig, + time.milliseconds()); + time.sleep(1L); + } } - public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(String topic, - Collection<KeyValue<K, V>> records, - Properties producerConfig, - Long timestamp) + public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String topic, + final Collection<KeyValue<K, V>> records, + final Properties producerConfig, + final Long timestamp) throws ExecutionException, InterruptedException { - Producer<K, V> producer = new KafkaProducer<>(producerConfig); - for (KeyValue<K, V> record : records) { - Future<RecordMetadata> f = producer.send( + final Producer<K, V> producer = new KafkaProducer<>(producerConfig); + for (final KeyValue<K, V> record : records) { + final Future<RecordMetadata> f = producer.send( new ProducerRecord<>(topic, null, timestamp, record.key, record.value)); f.get(); } @@ -159,92 +163,94 @@ public class IntegrationTestUtils { } public static <V> void produceValuesSynchronously( - String topic, Collection<V> records, Properties producerConfig) + final String topic, final Collection<V> records, final Properties producerConfig, final Time time) throws ExecutionException, InterruptedException { - Collection<KeyValue<Object, V>> keyedRecords = new ArrayList<>(); - for (V value : records) { - KeyValue<Object, V> kv = new KeyValue<>(null, value); + final Collection<KeyValue<Object, V>> keyedRecords = new ArrayList<>(); + for (final V value : records) { + final KeyValue<Object, V> kv = new KeyValue<>(null, value); keyedRecords.add(kv); } - produceKeyValuesSynchronously(topic, keyedRecords, producerConfig); + produceKeyValuesSynchronously(topic, keyedRecords, producerConfig, time); } - public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(Properties consumerConfig, - String topic, - int expectedNumRecords) throws InterruptedException { + public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig, + final String topic, + final int expectedNumRecords) throws InterruptedException { return waitUntilMinKeyValueRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT); } /** * Wait until enough data (key-value records) has been consumed. - * @param consumerConfig Kafka Consumer configuration - * @param topic Topic to consume from + * + * @param consumerConfig Kafka Consumer configuration + * @param topic Topic to consume from * @param expectedNumRecords Minimum number of expected records - * @param waitTime Upper bound in waiting time in milliseconds + * @param waitTime Upper bound in waiting time in milliseconds * @return All the records consumed, or null if no records are consumed * @throws InterruptedException - * @throws AssertionError if the given wait time elapses + * @throws AssertionError if the given wait time elapses */ public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig, final String topic, final int expectedNumRecords, - long waitTime) throws InterruptedException { + final long waitTime) throws InterruptedException { final List<KeyValue<K, V>> accumData = new ArrayList<>(); - TestCondition valuesRead = new TestCondition() { + final TestCondition valuesRead = new TestCondition() { @Override public boolean conditionMet() { - List<KeyValue<K, V>> readData = readKeyValues(topic, consumerConfig); + final List<KeyValue<K, V>> readData = readKeyValues(topic, consumerConfig); accumData.addAll(readData); return accumData.size() >= expectedNumRecords; } }; - String conditionDetails = "Did not receive " + expectedNumRecords + " number of records"; + final String conditionDetails = "Did not receive " + expectedNumRecords + " number of records"; TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails); return accumData; } - public static <V> List<V> waitUntilMinValuesRecordsReceived(Properties consumerConfig, - String topic, - int expectedNumRecords) throws InterruptedException { + public static <V> List<V> waitUntilMinValuesRecordsReceived(final Properties consumerConfig, + final String topic, + final int expectedNumRecords) throws InterruptedException { return waitUntilMinValuesRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT); } /** * Wait until enough data (value records) has been consumed. - * @param consumerConfig Kafka Consumer configuration - * @param topic Topic to consume from + * + * @param consumerConfig Kafka Consumer configuration + * @param topic Topic to consume from * @param expectedNumRecords Minimum number of expected records - * @param waitTime Upper bound in waiting time in milliseconds + * @param waitTime Upper bound in waiting time in milliseconds * @return All the records consumed, or null if no records are consumed * @throws InterruptedException - * @throws AssertionError if the given wait time elapses + * @throws AssertionError if the given wait time elapses */ public static <V> List<V> waitUntilMinValuesRecordsReceived(final Properties consumerConfig, final String topic, final int expectedNumRecords, - long waitTime) throws InterruptedException { + final long waitTime) throws InterruptedException { final List<V> accumData = new ArrayList<>(); - TestCondition valuesRead = new TestCondition() { + final TestCondition valuesRead = new TestCondition() { @Override public boolean conditionMet() { - List<V> readData = readValues(topic, consumerConfig, expectedNumRecords); + final List<V> readData = readValues(topic, consumerConfig, expectedNumRecords); accumData.addAll(readData); return accumData.size() >= expectedNumRecords; } }; - String conditionDetails = "Did not receive " + expectedNumRecords + " number of records"; + final String conditionDetails = "Did not receive " + expectedNumRecords + " number of records"; TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails); return accumData; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/de1b853c/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java index 43b82d6..ac9b670 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java @@ -17,36 +17,33 @@ package org.apache.kafka.streams.integration.utils; - -import org.apache.kafka.common.protocol.SecurityProtocol; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Properties; - -import org.I0Itec.zkclient.ZkClient; -import org.I0Itec.zkclient.ZkConnection; - -import java.io.File; -import java.util.Collections; -import java.util.List; - import kafka.admin.AdminUtils; import kafka.admin.RackAwareMode; import kafka.server.KafkaConfig; import kafka.server.KafkaConfig$; import kafka.server.KafkaServer; import kafka.utils.CoreUtils; -import kafka.utils.SystemTime$; +import kafka.utils.MockTime; import kafka.utils.TestUtils; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; +import org.apache.kafka.common.protocol.SecurityProtocol; import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + /** * Runs an in-memory, "embedded" instance of a Kafka broker, which listens at `127.0.0.1:9092` by * default. - * + * <p> * Requires a running ZooKeeper instance to connect to. */ public class KafkaEmbedded { @@ -56,6 +53,7 @@ public class KafkaEmbedded { private static final String DEFAULT_ZK_CONNECT = "127.0.0.1:2181"; private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10 * 1000; private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8 * 1000; + private final Properties effectiveConfig; private final File logDir; public final TemporaryFolder tmpFolder; @@ -63,20 +61,21 @@ public class KafkaEmbedded { /** * Creates and starts an embedded Kafka broker. + * * @param config Broker configuration settings. Used to modify, for example, on which port the * broker should listen to. Note that you cannot change the `log.dirs` setting * currently. */ - public KafkaEmbedded(Properties config) throws IOException { + public KafkaEmbedded(final Properties config, final MockTime time) throws IOException { tmpFolder = new TemporaryFolder(); tmpFolder.create(); logDir = tmpFolder.newFolder(); effectiveConfig = effectiveConfigFrom(config); - boolean loggingEnabled = true; - KafkaConfig kafkaConfig = new KafkaConfig(effectiveConfig, loggingEnabled); + final boolean loggingEnabled = true; + final KafkaConfig kafkaConfig = new KafkaConfig(effectiveConfig, loggingEnabled); log.debug("Starting embedded Kafka broker (with log.dirs={} and ZK ensemble at {}) ...", logDir, zookeeperConnect()); - kafka = TestUtils.createServer(kafkaConfig, SystemTime$.MODULE$); + kafka = TestUtils.createServer(kafkaConfig, time); log.debug("Startup of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...", brokerList(), zookeeperConnect()); } @@ -85,12 +84,13 @@ public class KafkaEmbedded { /** * Creates the configuration for starting the Kafka broker by merging default values with * overwrites. + * * @param initialConfig Broker configuration settings that override the default config. * @return * @throws IOException */ - private Properties effectiveConfigFrom(Properties initialConfig) throws IOException { - Properties effectiveConfig = new Properties(); + private Properties effectiveConfigFrom(final Properties initialConfig) throws IOException { + final Properties effectiveConfig = new Properties(); effectiveConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), 0); effectiveConfig.put(KafkaConfig$.MODULE$.HostNameProp(), "127.0.0.1"); effectiveConfig.put(KafkaConfig$.MODULE$.PortProp(), "9092"); @@ -106,7 +106,7 @@ public class KafkaEmbedded { /** * This broker's `metadata.broker.list` value. Example: `127.0.0.1:9092`. - * + * <p> * You can use this to tell Kafka producers and consumers how to connect to this instance. */ public String brokerList() { @@ -130,7 +130,7 @@ public class KafkaEmbedded { kafka.shutdown(); kafka.awaitShutdown(); log.debug("Removing logs.dir at {} ...", logDir); - List<String> logDirs = Collections.singletonList(logDir.getAbsolutePath()); + final List<String> logDirs = Collections.singletonList(logDir.getAbsolutePath()); CoreUtils.delete(scala.collection.JavaConversions.asScalaBuffer(logDirs).seq()); tmpFolder.delete(); log.debug("Shutdown of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...", @@ -142,7 +142,7 @@ public class KafkaEmbedded { * * @param topic The name of the topic. */ - public void createTopic(String topic) { + public void createTopic(final String topic) { createTopic(topic, 1, 1, new Properties()); } @@ -153,7 +153,7 @@ public class KafkaEmbedded { * @param partitions The number of partitions for this topic. * @param replication The replication factor for (the partitions of) this topic. */ - public void createTopic(String topic, int partitions, int replication) { + public void createTopic(final String topic, final int partitions, final int replication) { createTopic(topic, partitions, replication, new Properties()); } @@ -165,10 +165,10 @@ public class KafkaEmbedded { * @param replication The replication factor for (partitions of) this topic. * @param topicConfig Additional topic-level configuration settings. */ - public void createTopic(String topic, - int partitions, - int replication, - Properties topicConfig) { + public void createTopic(final String topic, + final int partitions, + final int replication, + final Properties topicConfig) { log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }", topic, partitions, replication, topicConfig); @@ -176,29 +176,29 @@ public class KafkaEmbedded { // createTopic() will only seem to work (it will return without error). The topic will exist in // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the // topic. - ZkClient zkClient = new ZkClient( + final ZkClient zkClient = new ZkClient( zookeeperConnect(), DEFAULT_ZK_SESSION_TIMEOUT_MS, DEFAULT_ZK_CONNECTION_TIMEOUT_MS, ZKStringSerializer$.MODULE$); - boolean isSecure = false; - ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect()), isSecure); + final boolean isSecure = false; + final ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect()), isSecure); AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$); zkClient.close(); } - public void deleteTopic(String topic) { + public void deleteTopic(final String topic) { log.debug("Deleting topic { name: {} }", topic); - ZkClient zkClient = new ZkClient( - zookeeperConnect(), - DEFAULT_ZK_SESSION_TIMEOUT_MS, - DEFAULT_ZK_CONNECTION_TIMEOUT_MS, - ZKStringSerializer$.MODULE$); - boolean isSecure = false; - ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect()), isSecure); + final ZkClient zkClient = new ZkClient( + zookeeperConnect(), + DEFAULT_ZK_SESSION_TIMEOUT_MS, + DEFAULT_ZK_CONNECTION_TIMEOUT_MS, + ZKStringSerializer$.MODULE$); + final boolean isSecure = false; + final ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect()), isSecure); AdminUtils.deleteTopic(zkUtils, topic); zkClient.close(); } -} \ No newline at end of file +}
