MINOR: changes embedded broker time to MockTime Author: Matthias J. Sax <[email protected]>
Reviewers: Damian Guy, Ismael Juma, Guozhang Wang Closes #1808 from mjsax/mockTime Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/de1b853c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/de1b853c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/de1b853c Branch: refs/heads/trunk Commit: de1b853c3ed326cf296a56538ca9570b0ecc0636 Parents: ed639e8 Author: Matthias J. Sax <[email protected]> Authored: Tue Sep 6 15:35:12 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue Sep 6 15:35:12 2016 -0700 ---------------------------------------------------------------------- .../integration/FanoutIntegrationTest.java | 56 ++--- .../InternalTopicIntegrationTest.java | 66 +++--- .../KStreamAggregationIntegrationTest.java | 127 ++++++------ .../KStreamKTableJoinIntegrationTest.java | 58 +++--- .../integration/KStreamRepartitionJoinTest.java | 167 +++++++-------- .../QueryableStateIntegrationTest.java | 203 ++++++++++--------- .../integration/RegexSourceIntegrationTest.java | 127 ++++++------ .../integration/utils/EmbeddedKafkaCluster.java | 35 ++-- .../integration/utils/IntegrationTestUtils.java | 124 +++++------ .../integration/utils/KafkaEmbedded.java | 86 ++++---- 10 files changed, 538 insertions(+), 511 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/de1b853c/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java index 56cba58..efc427a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,6 +17,7 @@ package org.apache.kafka.streams.integration; +import kafka.utils.MockTime; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.ByteArrayDeserializer; @@ -26,31 +27,31 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.ValueMapper; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; + import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Locale; import java.util.Properties; -import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; -import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; - import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertThat; /** * End-to-end integration test that demonstrates "fan-out", using an embedded Kafka cluster. - * + * <p> * This example shows how you can read from one input topic/stream, transform the data (here: * trivially) in two different ways via two intermediate streams, and then write the respective * results to two output topics. - * + * <p> * <pre> * {@code * @@ -67,6 +68,7 @@ public class FanoutIntegrationTest { private static final int NUM_BROKERS = 1; @ClassRule public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + private final MockTime mockTime = CLUSTER.time; private static final String INPUT_TOPIC_A = "A"; private static final String OUTPUT_TOPIC_B = "B"; private static final String OUTPUT_TOPIC_C = "C"; @@ -80,10 +82,10 @@ public class FanoutIntegrationTest { @Test public void shouldFanoutTheInput() throws Exception { - List<String> inputValues = Arrays.asList("Hello", "World"); - List<String> expectedValuesForB = new ArrayList<>(); - List<String> expectedValuesForC = new ArrayList<>(); - for (String input : inputValues) { + final List<String> inputValues = Arrays.asList("Hello", "World"); + final List<String> expectedValuesForB = new ArrayList<>(); + final List<String> expectedValuesForC = new ArrayList<>(); + for (final String input : inputValues) { expectedValuesForB.add(input.toUpperCase(Locale.getDefault())); expectedValuesForC.add(input.toLowerCase(Locale.getDefault())); } @@ -91,73 +93,73 @@ public class FanoutIntegrationTest { // // Step 1: Configure and start the processor topology. // - KStreamBuilder builder = new KStreamBuilder(); + final KStreamBuilder builder = new KStreamBuilder(); - Properties streamsConfiguration = new Properties(); + final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "fanout-integration-test"); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - KStream<byte[], String> stream1 = builder.stream(INPUT_TOPIC_A); - KStream<byte[], String> stream2 = stream1.mapValues( + final KStream<byte[], String> stream1 = builder.stream(INPUT_TOPIC_A); + final KStream<byte[], String> stream2 = stream1.mapValues( new ValueMapper<String, String>() { @Override - public String apply(String value) { + public String apply(final String value) { return value.toUpperCase(Locale.getDefault()); } }); - KStream<byte[], String> stream3 = stream1.mapValues( + final KStream<byte[], String> stream3 = stream1.mapValues( new ValueMapper<String, String>() { @Override - public String apply(String value) { + public String apply(final String value) { return value.toLowerCase(Locale.getDefault()); } }); stream2.to(OUTPUT_TOPIC_B); stream3.to(OUTPUT_TOPIC_C); - KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); + final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.start(); // // Step 2: Produce some input data to the input topic. // - Properties producerConfig = new Properties(); + final Properties producerConfig = new Properties(); producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - IntegrationTestUtils.produceValuesSynchronously(INPUT_TOPIC_A, inputValues, producerConfig); + IntegrationTestUtils.produceValuesSynchronously(INPUT_TOPIC_A, inputValues, producerConfig, mockTime); // // Step 3: Verify the application's output data. // // Verify output topic B - Properties consumerConfigB = new Properties(); + final Properties consumerConfigB = new Properties(); consumerConfigB.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); consumerConfigB.put(ConsumerConfig.GROUP_ID_CONFIG, "fanout-integration-test-standard-consumer-topicB"); consumerConfigB.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerConfigB.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); consumerConfigB.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - List<String> actualValuesForB = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfigB, + final List<String> actualValuesForB = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfigB, OUTPUT_TOPIC_B, inputValues.size()); assertThat(actualValuesForB, equalTo(expectedValuesForB)); // Verify output topic C - Properties consumerConfigC = new Properties(); + final Properties consumerConfigC = new Properties(); consumerConfigC.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); consumerConfigC.put(ConsumerConfig.GROUP_ID_CONFIG, "fanout-integration-test-standard-consumer-topicC"); consumerConfigC.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerConfigC.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); consumerConfigC.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - List<String> actualValuesForC = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfigC, + final List<String> actualValuesForC = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfigC, OUTPUT_TOPIC_C, inputValues.size()); streams.close(); assertThat(actualValuesForC, equalTo(expectedValuesForC)); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/de1b853c/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java index f88c1b2..b9a1cf6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,6 +17,11 @@ package org.apache.kafka.streams.integration; +import kafka.admin.AdminUtils; +import kafka.log.LogConfig; +import kafka.utils.MockTime; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -24,10 +29,10 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; -import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.ValueMapper; @@ -37,19 +42,16 @@ import org.apache.kafka.test.TestUtils; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; -import static org.junit.Assert.assertEquals; +import scala.Tuple2; +import scala.collection.Iterator; +import scala.collection.Map; + import java.util.Arrays; import java.util.List; import java.util.Locale; import java.util.Properties; -import kafka.admin.AdminUtils; -import kafka.log.LogConfig; -import kafka.utils.ZKStringSerializer$; -import kafka.utils.ZkUtils; -import scala.Tuple2; -import scala.collection.Iterator; -import scala.collection.Map; +import static org.junit.Assert.assertEquals; /** * Tests related to internal topics in streams @@ -58,6 +60,7 @@ public class InternalTopicIntegrationTest { private static final int NUM_BROKERS = 1; @ClassRule public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + private final MockTime mockTime = CLUSTER.time; private static final String DEFAULT_INPUT_TOPIC = "inputTopic"; private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic"; private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10 * 1000; @@ -71,6 +74,7 @@ public class InternalTopicIntegrationTest { /** * Validates that any state changelog topics are compacted + * * @return true if topics have a valid config, false otherwise */ private boolean isUsingCompactionForStateChangelogTopics() { @@ -80,20 +84,20 @@ public class InternalTopicIntegrationTest { // createTopic() will only seem to work (it will return without error). The topic will exist in // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the // topic. - ZkClient zkClient = new ZkClient( + final ZkClient zkClient = new ZkClient( CLUSTER.zKConnectString(), DEFAULT_ZK_SESSION_TIMEOUT_MS, DEFAULT_ZK_CONNECTION_TIMEOUT_MS, ZKStringSerializer$.MODULE$); - boolean isSecure = false; - ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(CLUSTER.zKConnectString()), isSecure); + final boolean isSecure = false; + final ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(CLUSTER.zKConnectString()), isSecure); - Map<String, Properties> topicConfigs = AdminUtils.fetchAllTopicConfigs(zkUtils); - Iterator it = topicConfigs.iterator(); + final Map<String, Properties> topicConfigs = AdminUtils.fetchAllTopicConfigs(zkUtils); + final Iterator it = topicConfigs.iterator(); while (it.hasNext()) { - Tuple2<String, Properties> topicConfig = (Tuple2<String, Properties>) it.next(); - String topic = topicConfig._1; - Properties prop = topicConfig._2; + final Tuple2<String, Properties> topicConfig = (Tuple2<String, Properties>) it.next(); + final String topic = topicConfig._1; + final Properties prop = topicConfig._2; // state changelogs should be compacted if (topic.endsWith(ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)) { @@ -110,7 +114,7 @@ public class InternalTopicIntegrationTest { @Test public void shouldCompactTopicsForStateChangelogs() throws Exception { - List<String> inputValues = Arrays.asList("hello", "world", "world", "hello world"); + final List<String> inputValues = Arrays.asList("hello", "world", "world", "hello world"); // // Step 1: Configure and start a simple word count topology @@ -118,7 +122,7 @@ public class InternalTopicIntegrationTest { final Serde<String> stringSerde = Serdes.String(); final Serde<Long> longSerde = Serdes.Long(); - Properties streamsConfiguration = new Properties(); + final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "compact-topics-integration-test"); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); @@ -126,37 +130,37 @@ public class InternalTopicIntegrationTest { streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - KStreamBuilder builder = new KStreamBuilder(); + final KStreamBuilder builder = new KStreamBuilder(); - KStream<String, String> textLines = builder.stream(DEFAULT_INPUT_TOPIC); + final KStream<String, String> textLines = builder.stream(DEFAULT_INPUT_TOPIC); - KStream<String, Long> wordCounts = textLines + final KStream<String, Long> wordCounts = textLines .flatMapValues(new ValueMapper<String, Iterable<String>>() { @Override - public Iterable<String> apply(String value) { + public Iterable<String> apply(final String value) { return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); } }).groupBy(MockKeyValueMapper.<String, String>SelectValueMapper()) - .count("Counts").toStream(); + .count("Counts").toStream(); wordCounts.to(stringSerde, longSerde, DEFAULT_OUTPUT_TOPIC); // Remove any state from previous test runs IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); - KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); + final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.start(); // // Step 2: Produce some input data to the input topic. // - Properties producerConfig = new Properties(); + final Properties producerConfig = new Properties(); producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerConfig); + IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerConfig, mockTime); // // Step 3: Verify the state changelog topics are compact http://git-wip-us.apache.org/repos/asf/kafka/blob/de1b853c/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index 17e197c..6da2a95 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -3,13 +3,18 @@ * agreements. See the NOTICE file distributed with this work for additional information regarding * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at <p> http://www.apache.org/licenses/LICENSE-2.0 <p> Unless required by - * applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. + * copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. */ package org.apache.kafka.streams.integration; +import kafka.utils.MockTime; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; @@ -52,10 +57,11 @@ import static org.hamcrest.core.Is.is; public class KStreamAggregationIntegrationTest { private static final int NUM_BROKERS = 1; + @ClassRule - public static final EmbeddedKafkaCluster CLUSTER = - new EmbeddedKafkaCluster(NUM_BROKERS); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); private static volatile int testNo = 0; + private final MockTime mockTime = CLUSTER.time; private KStreamBuilder builder; private Properties streamsConfiguration; private KafkaStreams kafkaStreams; @@ -74,8 +80,7 @@ public class KStreamAggregationIntegrationTest { builder = new KStreamBuilder(); createTopics(); streamsConfiguration = new Properties(); - String applicationId = "kgrouped-stream-test-" + - testNo; + final String applicationId = "kgrouped-stream-test-" + testNo; streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); streamsConfiguration .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); @@ -83,7 +88,7 @@ public class KStreamAggregationIntegrationTest { streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - KeyValueMapper<Integer, String, String> mapper = MockKeyValueMapper.<Integer, String>SelectValueMapper(); + final KeyValueMapper<Integer, String, String> mapper = MockKeyValueMapper.SelectValueMapper(); stream = builder.stream(Serdes.Integer(), Serdes.String(), streamOneInput); groupedStream = stream .groupBy( @@ -93,7 +98,7 @@ public class KStreamAggregationIntegrationTest { reducer = new Reducer<String>() { @Override - public String apply(String value1, String value2) { + public String apply(final String value1, final String value2) { return value1 + ":" + value2; } }; @@ -105,7 +110,7 @@ public class KStreamAggregationIntegrationTest { }; aggregator = new Aggregator<String, String, Integer>() { @Override - public Integer apply(String aggKey, String value, Integer aggregate) { + public Integer apply(final String aggKey, final String value, final Integer aggregate) { return aggregate + value.length(); } }; @@ -122,40 +127,39 @@ public class KStreamAggregationIntegrationTest { @Test public void shouldReduce() throws Exception { - produceMessages(System.currentTimeMillis()); + produceMessages(mockTime.milliseconds()); groupedStream .reduce(reducer, "reduce-by-key") .to(Serdes.String(), Serdes.String(), outputTopic); startStreams(); - produceMessages(System.currentTimeMillis()); + produceMessages(mockTime.milliseconds()); - List<KeyValue<String, String>> results = receiveMessages( + final List<KeyValue<String, String>> results = receiveMessages( new StringDeserializer(), new StringDeserializer() , 10); Collections.sort(results, new Comparator<KeyValue<String, String>>() { @Override - public int compare(KeyValue<String, String> o1, KeyValue<String, String> o2) { + public int compare(final KeyValue<String, String> o1, final KeyValue<String, String> o2) { return KStreamAggregationIntegrationTest.compare(o1, o2); } }); assertThat(results, is(Arrays.asList(KeyValue.pair("A", "A"), - KeyValue.pair("A", "A:A"), - KeyValue.pair("B", "B"), - KeyValue.pair("B", "B:B"), - KeyValue.pair("C", "C"), - KeyValue.pair("C", "C:C"), - KeyValue.pair("D", "D"), - KeyValue.pair("D", "D:D"), - KeyValue.pair("E", "E"), - KeyValue.pair("E", "E:E")))); + KeyValue.pair("A", "A:A"), + KeyValue.pair("B", "B"), + KeyValue.pair("B", "B:B"), + KeyValue.pair("C", "C"), + KeyValue.pair("C", "C:C"), + KeyValue.pair("D", "D"), + KeyValue.pair("D", "D:D"), + KeyValue.pair("E", "E"), + KeyValue.pair("E", "E:E")))); } - @SuppressWarnings("unchecked") private static <K extends Comparable, V extends Comparable> int compare(final KeyValue<K, V> o1, final KeyValue<K, V> o2) { final int keyComparison = o1.key.compareTo(o2.key); @@ -167,9 +171,10 @@ public class KStreamAggregationIntegrationTest { @Test public void shouldReduceWindowed() throws Exception { - long firstBatchTimestamp = System.currentTimeMillis() - 1000; + final long firstBatchTimestamp = mockTime.milliseconds(); + mockTime.sleep(1000); produceMessages(firstBatchTimestamp); - long secondBatchTimestamp = System.currentTimeMillis(); + final long secondBatchTimestamp = mockTime.milliseconds(); produceMessages(secondBatchTimestamp); produceMessages(secondBatchTimestamp); @@ -177,7 +182,7 @@ public class KStreamAggregationIntegrationTest { .reduce(reducer, TimeWindows.of(500L), "reduce-time-windows") .toStream(new KeyValueMapper<Windowed<String>, String, String>() { @Override - public String apply(Windowed<String> windowedKey, String value) { + public String apply(final Windowed<String> windowedKey, final String value) { return windowedKey.key() + "@" + windowedKey.window().start(); } }) @@ -185,12 +190,12 @@ public class KStreamAggregationIntegrationTest { startStreams(); - List<KeyValue<String, String>> windowedOutput = receiveMessages( + final List<KeyValue<String, String>> windowedOutput = receiveMessages( new StringDeserializer(), new StringDeserializer() , 15); - Comparator<KeyValue<String, String>> + final Comparator<KeyValue<String, String>> comparator = new Comparator<KeyValue<String, String>>() { @Override @@ -201,8 +206,8 @@ public class KStreamAggregationIntegrationTest { }; Collections.sort(windowedOutput, comparator); - long firstBatchWindow = firstBatchTimestamp / 500 * 500; - long secondBatchWindow = secondBatchTimestamp / 500 * 500; + final long firstBatchWindow = firstBatchTimestamp / 500 * 500; + final long secondBatchWindow = secondBatchTimestamp / 500 * 500; assertThat(windowedOutput, is( Arrays.asList( @@ -227,7 +232,7 @@ public class KStreamAggregationIntegrationTest { @Test public void shouldAggregate() throws Exception { - produceMessages(System.currentTimeMillis()); + produceMessages(mockTime.milliseconds()); groupedStream.aggregate( initializer, aggregator, @@ -237,16 +242,16 @@ public class KStreamAggregationIntegrationTest { startStreams(); - produceMessages(System.currentTimeMillis()); + produceMessages(mockTime.milliseconds()); - List<KeyValue<String, Integer>> results = receiveMessages( + final List<KeyValue<String, Integer>> results = receiveMessages( new StringDeserializer(), new IntegerDeserializer() , 10); Collections.sort(results, new Comparator<KeyValue<String, Integer>>() { @Override - public int compare(KeyValue<String, Integer> o1, KeyValue<String, Integer> o2) { + public int compare(final KeyValue<String, Integer> o1, final KeyValue<String, Integer> o2) { return KStreamAggregationIntegrationTest.compare(o1, o2); } }); @@ -267,9 +272,10 @@ public class KStreamAggregationIntegrationTest { @Test public void shouldAggregateWindowed() throws Exception { - long firstTimestamp = System.currentTimeMillis() - 1000; + final long firstTimestamp = mockTime.milliseconds(); + mockTime.sleep(1000); produceMessages(firstTimestamp); - long secondTimestamp = System.currentTimeMillis(); + final long secondTimestamp = mockTime.milliseconds(); produceMessages(secondTimestamp); produceMessages(secondTimestamp); @@ -280,7 +286,7 @@ public class KStreamAggregationIntegrationTest { Serdes.Integer(), "aggregate-by-key-windowed") .toStream(new KeyValueMapper<Windowed<String>, Integer, String>() { @Override - public String apply(Windowed<String> windowedKey, Integer value) { + public String apply(final Windowed<String> windowedKey, final Integer value) { return windowedKey.key() + "@" + windowedKey.window().start(); } }) @@ -288,12 +294,12 @@ public class KStreamAggregationIntegrationTest { startStreams(); - List<KeyValue<String, Integer>> windowedMessages = receiveMessages( + final List<KeyValue<String, Integer>> windowedMessages = receiveMessages( new StringDeserializer(), new IntegerDeserializer() , 15); - Comparator<KeyValue<String, Integer>> + final Comparator<KeyValue<String, Integer>> comparator = new Comparator<KeyValue<String, Integer>>() { @Override @@ -305,8 +311,8 @@ public class KStreamAggregationIntegrationTest { Collections.sort(windowedMessages, comparator); - long firstWindow = firstTimestamp / 500 * 500; - long secondWindow = secondTimestamp / 500 * 500; + final long firstWindow = firstTimestamp / 500 * 500; + final long secondWindow = secondTimestamp / 500 * 500; assertThat(windowedMessages, is( Arrays.asList( @@ -330,22 +336,22 @@ public class KStreamAggregationIntegrationTest { @Test public void shouldCount() throws Exception { - produceMessages(System.currentTimeMillis()); + produceMessages(mockTime.milliseconds()); groupedStream.count("count-by-key") .to(Serdes.String(), Serdes.Long(), outputTopic); startStreams(); - produceMessages(System.currentTimeMillis()); + produceMessages(mockTime.milliseconds()); - List<KeyValue<String, Long>> results = receiveMessages( + final List<KeyValue<String, Long>> results = receiveMessages( new StringDeserializer(), new LongDeserializer() , 10); Collections.sort(results, new Comparator<KeyValue<String, Long>>() { @Override - public int compare(KeyValue<String, Long> o1, KeyValue<String, Long> o2) { + public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) { return KStreamAggregationIntegrationTest.compare(o1, o2); } }); @@ -366,7 +372,7 @@ public class KStreamAggregationIntegrationTest { @Test public void shouldGroupByKey() throws Exception { - long timestamp = System.currentTimeMillis(); + final long timestamp = mockTime.milliseconds(); produceMessages(timestamp); produceMessages(timestamp); @@ -381,18 +387,18 @@ public class KStreamAggregationIntegrationTest { startStreams(); - List<KeyValue<String, Long>> results = receiveMessages( + final List<KeyValue<String, Long>> results = receiveMessages( new StringDeserializer(), new LongDeserializer() , 10); Collections.sort(results, new Comparator<KeyValue<String, Long>>() { @Override - public int compare(KeyValue<String, Long> o1, KeyValue<String, Long> o2) { + public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) { return KStreamAggregationIntegrationTest.compare(o1, o2); } }); - long window = timestamp / 500 * 500; + final long window = timestamp / 500 * 500; assertThat(results, is(Arrays.asList( KeyValue.pair("1@" + window, 1L), KeyValue.pair("1@" + window, 2L), @@ -409,7 +415,7 @@ public class KStreamAggregationIntegrationTest { } - private void produceMessages(long timestamp) + private void produceMessages(final long timestamp) throws ExecutionException, InterruptedException { IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( streamOneInput, @@ -450,16 +456,15 @@ public class KStreamAggregationIntegrationTest { final Properties consumerProperties = new Properties(); consumerProperties .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + - testNo); + consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testNo); consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - keyDeserializer.getClass().getName()); - consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - valueDeserializer.getClass().getName()); - return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerProperties, - outputTopic, - numMessages, 60 * 1000); + consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName()); + consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName()); + return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + consumerProperties, + outputTopic, + numMessages, + 60 * 1000); } http://git-wip-us.apache.org/repos/asf/kafka/blob/de1b853c/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java index 536ad24..4a13482 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java @@ -4,9 +4,9 @@ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. You may obtain a * copy of the License at - * + * <p> * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> * Unless required by applicable law or agreed to in writing, software distributed under the License * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. See the License for the specific language governing permissions and limitations under @@ -15,6 +15,7 @@ package org.apache.kafka.streams.integration; +import kafka.utils.MockTime; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.LongDeserializer; @@ -26,6 +27,8 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; @@ -41,9 +44,6 @@ import java.util.Arrays; import java.util.List; import java.util.Properties; -import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; -import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; - import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertThat; @@ -52,10 +52,11 @@ import static org.junit.Assert.assertThat; * KTable (think: KStream.leftJoin(KTable)), i.e. an example of a stateful computation. */ public class KStreamKTableJoinIntegrationTest { - private static final int NUM_BROKERS = 1; + @ClassRule public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + private final MockTime mockTime = CLUSTER.time; private static final String USER_CLICKS_TOPIC = "user-clicks"; private static final String USER_REGIONS_TOPIC = "user-regions"; private static final String USER_REGIONS_STORE_NAME = "user-regions-store-name"; @@ -76,7 +77,7 @@ public class KStreamKTableJoinIntegrationTest { private final String region; private final long clicks; - public RegionWithClicks(String region, long clicks) { + public RegionWithClicks(final String region, final long clicks) { if (region == null || region.isEmpty()) { throw new IllegalArgumentException("region must be set"); } @@ -100,7 +101,7 @@ public class KStreamKTableJoinIntegrationTest { @Test public void shouldCountClicksPerRegion() throws Exception { // Input 1: Clicks per user (multiple records allowed per user). - List<KeyValue<String, Long>> userClicks = Arrays.asList( + final List<KeyValue<String, Long>> userClicks = Arrays.asList( new KeyValue<>("alice", 13L), new KeyValue<>("bob", 4L), new KeyValue<>("chao", 25L), @@ -112,7 +113,7 @@ public class KStreamKTableJoinIntegrationTest { ); // Input 2: Region per user (multiple records allowed per user). - List<KeyValue<String, String>> userRegions = Arrays.asList( + final List<KeyValue<String, String>> userRegions = Arrays.asList( new KeyValue<>("alice", "asia"), /* Alice lived in Asia originally... */ new KeyValue<>("bob", "americas"), new KeyValue<>("chao", "asia"), @@ -122,7 +123,7 @@ public class KStreamKTableJoinIntegrationTest { new KeyValue<>("fang", "asia") ); - List<KeyValue<String, Long>> expectedClicksPerRegion = Arrays.asList( + final List<KeyValue<String, Long>> expectedClicksPerRegion = Arrays.asList( new KeyValue<>("europe", 13L), new KeyValue<>("americas", 4L), new KeyValue<>("asia", 25L), @@ -139,26 +140,25 @@ public class KStreamKTableJoinIntegrationTest { final Serde<String> stringSerde = Serdes.String(); final Serde<Long> longSerde = Serdes.Long(); - Properties streamsConfiguration = new Properties(); + final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-integration-test"); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, - TestUtils.tempDirectory().getPath()); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); // Remove any state from previous test runs IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); - KStreamBuilder builder = new KStreamBuilder(); + final KStreamBuilder builder = new KStreamBuilder(); // This KStream contains information such as "alice" -> 13L. // // Because this is a KStream ("record stream"), multiple records for the same user will be // considered as separate click-count events, each of which will be added to the total count. - KStream<String, Long> userClicksStream = builder.stream(stringSerde, longSerde, USER_CLICKS_TOPIC); + final KStream<String, Long> userClicksStream = builder.stream(stringSerde, longSerde, USER_CLICKS_TOPIC); // This KTable contains information such as "alice" -> "europe". // @@ -171,14 +171,14 @@ public class KStreamKTableJoinIntegrationTest { // lived in "asia") because, at the time her first user-click record is being received and // subsequently processed in the `leftJoin`, the latest region update for "alice" is "europe" // (which overrides her previous region value of "asia"). - KTable<String, String> userRegionsTable = + final KTable<String, String> userRegionsTable = builder.table(stringSerde, stringSerde, USER_REGIONS_TOPIC, USER_REGIONS_STORE_NAME); // Compute the number of clicks per region, e.g. "europe" -> 13L. // // The resulting KTable is continuously being updated as new data records are arriving in the // input KStream `userClicksStream` and input KTable `userRegionsTable`. - KTable<String, Long> clicksPerRegion = userClicksStream + final KTable<String, Long> clicksPerRegion = userClicksStream // Join the stream against the table. // // Null values possible: In general, null values are possible for region (i.e. the value of @@ -192,14 +192,14 @@ public class KStreamKTableJoinIntegrationTest { // achieve the same effect. .leftJoin(userRegionsTable, new ValueJoiner<Long, String, RegionWithClicks>() { @Override - public RegionWithClicks apply(Long clicks, String region) { + public RegionWithClicks apply(final Long clicks, final String region) { return new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks); } }) // Change the stream from <user> -> <region, clicks> to <region> -> <clicks> .map(new KeyValueMapper<String, RegionWithClicks, KeyValue<String, Long>>() { @Override - public KeyValue<String, Long> apply(String key, RegionWithClicks value) { + public KeyValue<String, Long> apply(final String key, final RegionWithClicks value) { return new KeyValue<>(value.getRegion(), value.getClicks()); } }) @@ -207,7 +207,7 @@ public class KStreamKTableJoinIntegrationTest { .groupByKey(stringSerde, longSerde) .reduce(new Reducer<Long>() { @Override - public Long apply(Long value1, Long value2) { + public Long apply(final Long value1, final Long value2) { return value1 + value2; } }, "ClicksPerRegionUnwindowed"); @@ -215,47 +215,47 @@ public class KStreamKTableJoinIntegrationTest { // Write the (continuously updating) results to the output topic. clicksPerRegion.to(stringSerde, longSerde, OUTPUT_TOPIC); - KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); + final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.start(); // // Step 2: Publish user-region information. // // To keep this code example simple and easier to understand/reason about, we publish all - // user-region records before any user-click records (cf. step 3). In practice though, + // user-region records before any user-click records (cf. step 3). In practice though, // data records would typically be arriving concurrently in both input streams/topics. - Properties userRegionsProducerConfig = new Properties(); + final Properties userRegionsProducerConfig = new Properties(); userRegionsProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); userRegionsProducerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); userRegionsProducerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); userRegionsProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); userRegionsProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - IntegrationTestUtils.produceKeyValuesSynchronously(USER_REGIONS_TOPIC, userRegions, userRegionsProducerConfig); + IntegrationTestUtils.produceKeyValuesSynchronously(USER_REGIONS_TOPIC, userRegions, userRegionsProducerConfig, mockTime); // // Step 3: Publish some user click events. // - Properties userClicksProducerConfig = new Properties(); + final Properties userClicksProducerConfig = new Properties(); userClicksProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); userClicksProducerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); userClicksProducerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); userClicksProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); userClicksProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class); - IntegrationTestUtils.produceKeyValuesSynchronously(USER_CLICKS_TOPIC, userClicks, userClicksProducerConfig); + IntegrationTestUtils.produceKeyValuesSynchronously(USER_CLICKS_TOPIC, userClicks, userClicksProducerConfig, mockTime); // // Step 4: Verify the application's output data. // - Properties consumerConfig = new Properties(); + final Properties consumerConfig = new Properties(); consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "join-integration-test-standard-consumer"); consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); - List<KeyValue<String, Long>> actualClicksPerRegion = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, + final List<KeyValue<String, Long>> actualClicksPerRegion = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_TOPIC, expectedClicksPerRegion.size()); streams.close(); assertThat(actualClicksPerRegion, equalTo(expectedClicksPerRegion)); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/de1b853c/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java index de9c2c9..e9a7da1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java @@ -3,13 +3,18 @@ * agreements. See the NOTICE file distributed with this work for additional information regarding * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at <p> http://www.apache.org/licenses/LICENSE-2.0 <p> Unless required by - * applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. + * copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. */ package org.apache.kafka.streams.integration; +import kafka.utils.MockTime; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; @@ -48,10 +53,10 @@ import static org.hamcrest.core.Is.is; public class KStreamRepartitionJoinTest { private static final int NUM_BROKERS = 1; - @ClassRule - public static final EmbeddedKafkaCluster CLUSTER = - new EmbeddedKafkaCluster(NUM_BROKERS); + @ClassRule + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + private final MockTime mockTime = CLUSTER.time; private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS); private KStreamBuilder builder; @@ -70,10 +75,9 @@ public class KStreamRepartitionJoinTest { private String streamFourInput; - @Before public void before() { - String applicationId = "kstream-repartition-join-test"; + final String applicationId = "kstream-repartition-join-test"; builder = new KStreamBuilder(); createTopics(); streamsConfiguration = new Properties(); @@ -95,7 +99,7 @@ public class KStreamRepartitionJoinTest { } }; - keyMapper = MockKeyValueMapper.<Long, Integer>SelectValueKeyValueMapper(); + keyMapper = MockKeyValueMapper.SelectValueKeyValueMapper(); } @After @@ -132,8 +136,8 @@ public class KStreamRepartitionJoinTest { } private ExpectedOutputOnTopic mapStreamOneAndJoin() { - String mapOneStreamAndJoinOutput = "map-one-join-output"; - doJoin(streamOne.map(keyMapper), streamTwo, mapOneStreamAndJoinOutput, "map-one-join"); + final String mapOneStreamAndJoinOutput = "map-one-join-output"; + doJoin(streamOne.map(keyMapper), streamTwo, mapOneStreamAndJoinOutput); return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, mapOneStreamAndJoinOutput); } @@ -141,7 +145,7 @@ public class KStreamRepartitionJoinTest { final KStream<Integer, Integer> map1 = streamOne.map(keyMapper); final KStream<Integer, String> map2 = streamTwo.map(MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper()); - doJoin(map1, map2, "map-both-streams-and-join", "map-both-join"); + doJoin(map1, map2, "map-both-streams-and-join"); return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, "map-both-streams-and-join"); } @@ -150,7 +154,7 @@ public class KStreamRepartitionJoinTest { final KStream<Integer, Integer> mapMapStream = streamOne.map( new KeyValueMapper<Long, Integer, KeyValue<Long, Integer>>() { @Override - public KeyValue<Long, Integer> apply(Long key, Integer value) { + public KeyValue<Long, Integer> apply(final Long key, final Integer value) { if (value == null) { return new KeyValue<>(null, null); } @@ -158,8 +162,8 @@ public class KStreamRepartitionJoinTest { } }).map(keyMapper); - String outputTopic = "map-map-join"; - doJoin(mapMapStream, streamTwo, outputTopic, outputTopic); + final String outputTopic = "map-map-join"; + doJoin(mapMapStream, streamTwo, outputTopic); return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic); } @@ -167,10 +171,10 @@ public class KStreamRepartitionJoinTest { public ExpectedOutputOnTopic selectKeyAndJoin() throws ExecutionException, InterruptedException { final KStream<Integer, Integer> keySelected = - streamOne.selectKey(MockKeyValueMapper.<Long, Integer>SelectValueMapper()); + streamOne.selectKey(MockKeyValueMapper.<Long, Integer>SelectValueMapper()); - String outputTopic = "select-key-join"; - doJoin(keySelected, streamTwo, outputTopic, outputTopic); + final String outputTopic = "select-key-join"; + doJoin(keySelected, streamTwo, outputTopic); return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic); } @@ -179,38 +183,36 @@ public class KStreamRepartitionJoinTest { final KStream<Integer, Integer> flatMapped = streamOne.flatMap( new KeyValueMapper<Long, Integer, Iterable<KeyValue<Integer, Integer>>>() { @Override - public Iterable<KeyValue<Integer, Integer>> apply(Long key, - Integer value) { + public Iterable<KeyValue<Integer, Integer>> apply(final Long key, final Integer value) { return Collections.singletonList(new KeyValue<>(value, value)); } }); - String outputTopic = "flat-map-join"; - doJoin(flatMapped, streamTwo, outputTopic, outputTopic); + final String outputTopic = "flat-map-join"; + doJoin(flatMapped, streamTwo, outputTopic); return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic); } private ExpectedOutputOnTopic joinMappedRhsStream() throws Exception { - ValueJoiner<String, Integer, String> joiner = new ValueJoiner<String, Integer, String>() { + final ValueJoiner<String, Integer, String> joiner = new ValueJoiner<String, Integer, String>() { @Override - public String apply(String value1, Integer value2) { + public String apply(final String value1, final Integer value2) { return value1 + ":" + value2; } }; - String output = "join-rhs-stream-mapped"; + final String output = "join-rhs-stream-mapped"; streamTwo .join(streamOne.map(keyMapper), - joiner, - getJoinWindow(), - Serdes.Integer(), - Serdes.String(), - Serdes.Integer()) + joiner, + getJoinWindow(), + Serdes.Integer(), + Serdes.String(), + Serdes.Integer()) .to(Serdes.Integer(), Serdes.String(), output); - return new ExpectedOutputOnTopic(Arrays.asList("A:1", "B:2", "C:3", "D:4", "E:5"), - output); + return new ExpectedOutputOnTopic(Arrays.asList("A:1", "B:2", "C:3", "D:4", "E:5"), output); } public ExpectedOutputOnTopic mapBothStreamsAndLeftJoin() throws Exception { @@ -218,53 +220,51 @@ public class KStreamRepartitionJoinTest { final KStream<Integer, String> map2 = streamTwo.map(MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper()); - String outputTopic = "left-join"; + final String outputTopic = "left-join"; map1.leftJoin(map2, - valueJoiner, - getJoinWindow(), - Serdes.Integer(), - Serdes.Integer(), - Serdes.String()) + valueJoiner, + getJoinWindow(), + Serdes.Integer(), + Serdes.Integer(), + Serdes.String()) .to(Serdes.Integer(), Serdes.String(), outputTopic); return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic); } - private ExpectedOutputOnTopic joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined() throws - Exception { + private ExpectedOutputOnTopic joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined() throws Exception { final KStream<Integer, Integer> map1 = streamOne.map(keyMapper); final KeyValueMapper<Integer, String, KeyValue<Integer, String>> - kvMapper = MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper(); + kvMapper = MockKeyValueMapper.NoOpKeyValueMapper(); final KStream<Integer, String> map2 = streamTwo.map(kvMapper); final KStream<Integer, String> join = map1.join(map2, - valueJoiner, - getJoinWindow(), - Serdes.Integer(), - Serdes.Integer(), - Serdes.String()); + valueJoiner, + getJoinWindow(), + Serdes.Integer(), + Serdes.Integer(), + Serdes.String()); - ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() { + final ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() { @Override public String apply(final String value1, final String value2) { return value1 + ":" + value2; } }; - String topic = "map-join-join"; + final String topic = "map-join-join"; join.map(kvMapper) .join(streamFour.map(kvMapper), - joiner, - getJoinWindow(), - Serdes.Integer(), - Serdes.String(), - Serdes.String()) + joiner, + getJoinWindow(), + Serdes.Integer(), + Serdes.String(), + Serdes.String()) .to(Serdes.Integer(), Serdes.String(), topic); - return new ExpectedOutputOnTopic(Arrays.asList("1:A:A", "2:B:B", "3:C:C", "4:D:D", "5:E:E"), - topic); + return new ExpectedOutputOnTopic(Arrays.asList("1:A:A", "2:B:B", "3:C:C", "4:D:D", "5:E:E"), topic); } private JoinWindows getJoinWindow() { @@ -286,13 +286,14 @@ public class KStreamRepartitionJoinTest { private void verifyCorrectOutput(final ExpectedOutputOnTopic expectedOutputOnTopic) throws InterruptedException { assertThat(receiveMessages(new StringDeserializer(), - expectedOutputOnTopic.expectedOutput.size(), - expectedOutputOnTopic.outputTopic), - is(expectedOutputOnTopic.expectedOutput)); + expectedOutputOnTopic.expectedOutput.size(), + expectedOutputOnTopic.outputTopic), + is(expectedOutputOnTopic.expectedOutput)); } - private void verifyLeftJoin(ExpectedOutputOnTopic expectedOutputOnTopic) + + private void verifyLeftJoin(final ExpectedOutputOnTopic expectedOutputOnTopic) throws InterruptedException, ExecutionException { - List<String> received = receiveMessages(new StringDeserializer(), expectedOutputOnTopic + final List<String> received = receiveMessages(new StringDeserializer(), expectedOutputOnTopic .expectedOutput.size(), expectedOutputOnTopic.outputTopic); if (!received.equals(expectedOutputOnTopic.expectedOutput)) { produceToStreamOne(); @@ -323,7 +324,8 @@ public class KStreamRepartitionJoinTest { CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, - new Properties())); + new Properties()), + mockTime); } private void produceToStreamOne() @@ -341,7 +343,8 @@ public class KStreamRepartitionJoinTest { CLUSTER.bootstrapServers(), LongSerializer.class, IntegerSerializer.class, - new Properties())); + new Properties()), + mockTime); } private void createTopics() { @@ -365,40 +368,38 @@ public class KStreamRepartitionJoinTest { final Properties config = new Properties(); - config - .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kstream-test"); config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - IntegerDeserializer.class.getName()); + IntegerDeserializer.class.getName()); config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - valueDeserializer.getClass().getName()); - List<String> received = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(config, - topic, - numMessages, - 60 * - 1000); + valueDeserializer.getClass().getName()); + final List<String> received = IntegrationTestUtils.waitUntilMinValuesRecordsReceived( + config, + topic, + numMessages, + 60 * 1000); Collections.sort(received); return received; } - private void verifyCorrectOutput(List<String> expectedMessages, + private void verifyCorrectOutput(final List<String> expectedMessages, final String topic) throws InterruptedException { assertThat(receiveMessages(new StringDeserializer(), expectedMessages.size(), topic), - is(expectedMessages)); + is(expectedMessages)); } - private void doJoin(KStream<Integer, Integer> lhs, - KStream<Integer, String> rhs, - String outputTopic, - final String joinName) { + private void doJoin(final KStream<Integer, Integer> lhs, + final KStream<Integer, String> rhs, + final String outputTopic) { CLUSTER.createTopic(outputTopic); lhs.join(rhs, - valueJoiner, - getJoinWindow(), - Serdes.Integer(), - Serdes.Integer(), - Serdes.String()) + valueJoiner, + getJoinWindow(), + Serdes.Integer(), + Serdes.Integer(), + Serdes.String()) .to(Serdes.Integer(), Serdes.String(), outputTopic); }
