KAFKA-3887 Follow-up: add unit test for null checking in KTable aggregates Also made a pass over the streams unit tests, with the following changes:
1. Removed three integration tests as they are already covered by other integration tests. 2. Merged `KGroupedTableImplTest` into `KTableAggregateTest`. 3. Use mocks whenever possible to reduce code duplicates. Author: Guozhang Wang <[email protected]> Reviewers: Damian Guy <[email protected]>, Ismael Juma <[email protected]> Closes #1604 from guozhangwang/Kminor-unit-tests-consolidation Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/136a8fab Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/136a8fab Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/136a8fab Branch: refs/heads/trunk Commit: 136a8fabca8e266f67897cf5471b2e41c0a341be Parents: c439268 Author: Guozhang Wang <[email protected]> Authored: Mon Jul 11 13:57:02 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Mon Jul 11 13:57:02 2016 -0700 ---------------------------------------------------------------------- .../InternalTopicIntegrationTest.java | 14 +- .../integration/JoinIntegrationTest.java | 259 ---------- .../KGroupedStreamIntegrationTest.java | 472 ------------------- .../KStreamAggregationIntegrationTest.java | 466 ++++++++++++++++++ .../KStreamKTableJoinIntegrationTest.java | 258 ++++++++++ .../integration/KStreamRepartitionJoinTest.java | 69 +-- .../integration/MapFunctionIntegrationTest.java | 122 ----- .../integration/PassThroughIntegrationTest.java | 108 ----- .../integration/WordCountIntegrationTest.java | 154 ------ .../internals/KGroupedTableImplTest.java | 79 ---- .../kstream/internals/KTableAggregateTest.java | 57 ++- .../kstream/internals/KTableSourceTest.java | 4 +- .../apache/kafka/test/MockKeyValueMapper.java | 23 +- 13 files changed, 808 insertions(+), 1277 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/136a8fab/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java index 15469c7..968e060 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -24,13 +24,15 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; +import org.apache.kafka.test.MockKeyValueMapper; import org.apache.kafka.test.TestUtils; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -41,8 +43,6 @@ import java.util.List; import java.util.Locale; import java.util.Properties; -import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster; -import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import kafka.admin.AdminUtils; import kafka.log.LogConfig; import kafka.utils.ZKStringSerializer$; @@ -135,12 +135,8 @@ public class InternalTopicIntegrationTest { public Iterable<String> apply(String value) { return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); } - }).groupBy(new KeyValueMapper<String, String, String>() { - @Override - public String apply(String key, String value) { - return value; - } - }).count("Counts").toStream(); + }).groupBy(MockKeyValueMapper.<String, String>SelectValueMapper()) + .count("Counts").toStream(); wordCounts.to(stringSerde, longSerde, DEFAULT_OUTPUT_TOPIC); http://git-wip-us.apache.org/repos/asf/kafka/blob/136a8fab/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java deleted file mode 100644 index f99a142..0000000 --- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java +++ /dev/null @@ -1,259 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.kafka.streams.integration; - - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.LongSerializer; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.kstream.Reducer; -import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.test.TestUtils; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; - -import java.util.Arrays; -import java.util.List; -import java.util.Properties; - -import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster; -import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.junit.Assert.assertThat; - -/** - * End-to-end integration test that demonstrates how to perform a join between a KStream and a - * KTable (think: KStream.leftJoin(KTable)), i.e. an example of a stateful computation. - */ -public class JoinIntegrationTest { - @ClassRule - public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster(); - private static final String USER_CLICKS_TOPIC = "user-clicks"; - private static final String USER_REGIONS_TOPIC = "user-regions"; - private static final String OUTPUT_TOPIC = "output-topic"; - - @BeforeClass - public static void startKafkaCluster() throws Exception { - CLUSTER.createTopic(USER_CLICKS_TOPIC); - CLUSTER.createTopic(USER_REGIONS_TOPIC); - CLUSTER.createTopic(OUTPUT_TOPIC); - } - - /** - * Tuple for a region and its associated number of clicks. - */ - private static final class RegionWithClicks { - - private final String region; - private final long clicks; - - public RegionWithClicks(String region, long clicks) { - if (region == null || region.isEmpty()) { - throw new IllegalArgumentException("region must be set"); - } - if (clicks < 0) { - throw new IllegalArgumentException("clicks must not be negative"); - } - this.region = region; - this.clicks = clicks; - } - - public String getRegion() { - return region; - } - - public long getClicks() { - return clicks; - } - - } - - @Test - public void shouldCountClicksPerRegion() throws Exception { - // Input 1: Clicks per user (multiple records allowed per user). - List<KeyValue<String, Long>> userClicks = Arrays.asList( - new KeyValue<>("alice", 13L), - new KeyValue<>("bob", 4L), - new KeyValue<>("chao", 25L), - new KeyValue<>("bob", 19L), - new KeyValue<>("dave", 56L), - new KeyValue<>("eve", 78L), - new KeyValue<>("alice", 40L), - new KeyValue<>("fang", 99L) - ); - - // Input 2: Region per user (multiple records allowed per user). - List<KeyValue<String, String>> userRegions = Arrays.asList( - new KeyValue<>("alice", "asia"), /* Alice lived in Asia originally... */ - new KeyValue<>("bob", "americas"), - new KeyValue<>("chao", "asia"), - new KeyValue<>("dave", "europe"), - new KeyValue<>("alice", "europe"), /* ...but moved to Europe some time later. */ - new KeyValue<>("eve", "americas"), - new KeyValue<>("fang", "asia") - ); - - List<KeyValue<String, Long>> expectedClicksPerRegion = Arrays.asList( - new KeyValue<>("europe", 13L), - new KeyValue<>("americas", 4L), - new KeyValue<>("asia", 25L), - new KeyValue<>("americas", 23L), - new KeyValue<>("europe", 69L), - new KeyValue<>("americas", 101L), - new KeyValue<>("europe", 109L), - new KeyValue<>("asia", 124L) - ); - - // - // Step 1: Configure and start the processor topology. - // - final Serde<String> stringSerde = Serdes.String(); - final Serde<Long> longSerde = Serdes.Long(); - - Properties streamsConfiguration = new Properties(); - streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-integration-test"); - streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); - streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, - TestUtils.tempDirectory().getPath()); - - // Remove any state from previous test runs - IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); - - KStreamBuilder builder = new KStreamBuilder(); - - // This KStream contains information such as "alice" -> 13L. - // - // Because this is a KStream ("record stream"), multiple records for the same user will be - // considered as separate click-count events, each of which will be added to the total count. - KStream<String, Long> userClicksStream = builder.stream(stringSerde, longSerde, USER_CLICKS_TOPIC); - - // This KTable contains information such as "alice" -> "europe". - // - // Because this is a KTable ("changelog stream"), only the latest value (here: region) for a - // record key will be considered at the time when a new user-click record (see above) is - // received for the `leftJoin` below. Any previous region values are being considered out of - // date. This behavior is quite different to the KStream for user clicks above. - // - // For example, the user "alice" will be considered to live in "europe" (although originally she - // lived in "asia") because, at the time her first user-click record is being received and - // subsequently processed in the `leftJoin`, the latest region update for "alice" is "europe" - // (which overrides her previous region value of "asia"). - KTable<String, String> userRegionsTable = - builder.table(stringSerde, stringSerde, USER_REGIONS_TOPIC); - - // Compute the number of clicks per region, e.g. "europe" -> 13L. - // - // The resulting KTable is continuously being updated as new data records are arriving in the - // input KStream `userClicksStream` and input KTable `userRegionsTable`. - KTable<String, Long> clicksPerRegion = userClicksStream - // Join the stream against the table. - // - // Null values possible: In general, null values are possible for region (i.e. the value of - // the KTable we are joining against) so we must guard against that (here: by setting the - // fallback region "UNKNOWN"). In this specific example this is not really needed because - // we know, based on the test setup, that all users have appropriate region entries at the - // time we perform the join. - // - // Also, we need to return a tuple of (region, clicks) for each user. But because Java does - // not support tuples out-of-the-box, we must use a custom class `RegionWithClicks` to - // achieve the same effect. - .leftJoin(userRegionsTable, new ValueJoiner<Long, String, RegionWithClicks>() { - @Override - public RegionWithClicks apply(Long clicks, String region) { - RegionWithClicks regionWithClicks = new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks); - return regionWithClicks; - } - }) - // Change the stream from <user> -> <region, clicks> to <region> -> <clicks> - .map(new KeyValueMapper<String, RegionWithClicks, KeyValue<String, Long>>() { - @Override - public KeyValue<String, Long> apply(String key, RegionWithClicks value) { - return new KeyValue<>(value.getRegion(), value.getClicks()); - } - }) - // Compute the total per region by summing the individual click counts per region. - .groupByKey(stringSerde, longSerde) - .reduce(new Reducer<Long>() { - @Override - public Long apply(Long value1, Long value2) { - return value1 + value2; - } - }, "ClicksPerRegionUnwindowed"); - - // Write the (continuously updating) results to the output topic. - clicksPerRegion.to(stringSerde, longSerde, OUTPUT_TOPIC); - - KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); - streams.start(); - - // - // Step 2: Publish user-region information. - // - // To keep this code example simple and easier to understand/reason about, we publish all - // user-region records before any user-click records (cf. step 3). In practice though, - // data records would typically be arriving concurrently in both input streams/topics. - Properties userRegionsProducerConfig = new Properties(); - userRegionsProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - userRegionsProducerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); - userRegionsProducerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); - userRegionsProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - userRegionsProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - IntegrationTestUtils.produceKeyValuesSynchronously(USER_REGIONS_TOPIC, userRegions, userRegionsProducerConfig); - - // - // Step 3: Publish some user click events. - // - Properties userClicksProducerConfig = new Properties(); - userClicksProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - userClicksProducerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); - userClicksProducerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); - userClicksProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - userClicksProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class); - IntegrationTestUtils.produceKeyValuesSynchronously(USER_CLICKS_TOPIC, userClicks, userClicksProducerConfig); - - // - // Step 4: Verify the application's output data. - // - Properties consumerConfig = new Properties(); - consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "join-integration-test-standard-consumer"); - consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); - List<KeyValue<String, Long>> actualClicksPerRegion = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, - OUTPUT_TOPIC, expectedClicksPerRegion.size()); - streams.close(); - assertThat(actualClicksPerRegion, equalTo(expectedClicksPerRegion)); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/136a8fab/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java deleted file mode 100644 index 36340b9..0000000 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java +++ /dev/null @@ -1,472 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at <p> http://www.apache.org/licenses/LICENSE-2.0 <p> Unless required by - * applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ -package org.apache.kafka.streams.integration; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster; -import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; -import org.apache.kafka.streams.kstream.Aggregator; -import org.apache.kafka.streams.kstream.Initializer; -import org.apache.kafka.streams.kstream.KGroupedStream; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.kstream.Reducer; -import org.apache.kafka.streams.kstream.TimeWindows; -import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.test.TestUtils; -import org.junit.After; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Test; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.ExecutionException; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.Is.is; - -public class KGroupedStreamIntegrationTest { - - @ClassRule - public static final EmbeddedSingleNodeKafkaCluster CLUSTER = - new EmbeddedSingleNodeKafkaCluster(); - private static volatile int testNo = 0; - private KStreamBuilder builder; - private Properties streamsConfiguration; - private KafkaStreams kafkaStreams; - private String streamOneInput; - private String outputTopic; - private KGroupedStream<String, String> groupedStream; - private Reducer<String> reducer; - private Initializer<Integer> initializer; - private Aggregator<String, String, Integer> aggregator; - private KStream<Integer, String> stream; - - - @Before - public void before() { - testNo++; - builder = new KStreamBuilder(); - createTopics(); - streamsConfiguration = new Properties(); - String applicationId = "kgrouped-stream-test-" + - testNo; - streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); - streamsConfiguration - .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); - streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - - KeyValueMapper<Integer, String, String> - mapper = - new KeyValueMapper<Integer, String, String>() { - @Override - public String apply(Integer key, String value) { - return value; - } - }; - stream = builder.stream(Serdes.Integer(), Serdes.String(), streamOneInput); - groupedStream = stream - .groupBy( - mapper, - Serdes.String(), - Serdes.String()); - - reducer = new Reducer<String>() { - @Override - public String apply(String value1, String value2) { - return value1 + ":" + value2; - } - }; - initializer = new Initializer<Integer>() { - @Override - public Integer apply() { - return 0; - } - }; - aggregator = new Aggregator<String, String, Integer>() { - @Override - public Integer apply(String aggKey, String value, Integer aggregate) { - return aggregate + value.length(); - } - }; - } - - @After - public void whenShuttingDown() throws IOException { - if (kafkaStreams != null) { - kafkaStreams.close(); - } - IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); - } - - - @Test - public void shouldReduce() throws Exception { - produceMessages(System.currentTimeMillis()); - groupedStream - .reduce(reducer, "reduce-by-key") - .to(Serdes.String(), Serdes.String(), outputTopic); - - startStreams(); - - produceMessages(System.currentTimeMillis()); - - List<KeyValue<String, String>> results = receiveMessages( - new StringDeserializer(), - new StringDeserializer() - , 10); - - Collections.sort(results, new Comparator<KeyValue<String, String>>() { - @Override - public int compare(KeyValue<String, String> o1, KeyValue<String, String> o2) { - return KGroupedStreamIntegrationTest.compare(o1, o2); - } - }); - - assertThat(results, is(Arrays.asList(KeyValue.pair("A", "A"), - KeyValue.pair("A", "A:A"), - KeyValue.pair("B", "B"), - KeyValue.pair("B", "B:B"), - KeyValue.pair("C", "C"), - KeyValue.pair("C", "C:C"), - KeyValue.pair("D", "D"), - KeyValue.pair("D", "D:D"), - KeyValue.pair("E", "E"), - KeyValue.pair("E", "E:E")))); - } - - @SuppressWarnings("unchecked") - private static <K extends Comparable, V extends Comparable> int compare(final KeyValue<K, V> o1, - final KeyValue<K, V> o2) { - final int keyComparison = o1.key.compareTo(o2.key); - if (keyComparison == 0) { - return o1.value.compareTo(o2.value); - } - return keyComparison; - } - - @Test - public void shouldReduceWindowed() throws Exception { - long firstBatchTimestamp = System.currentTimeMillis() - 1000; - produceMessages(firstBatchTimestamp); - long secondBatchTimestamp = System.currentTimeMillis(); - produceMessages(secondBatchTimestamp); - produceMessages(secondBatchTimestamp); - - groupedStream - .reduce(reducer, TimeWindows.of("reduce-time-windows", 500L)) - .toStream(new KeyValueMapper<Windowed<String>, String, String>() { - @Override - public String apply(Windowed<String> windowedKey, String value) { - return windowedKey.key() + "@" + windowedKey.window().start(); - } - }) - .to(Serdes.String(), Serdes.String(), outputTopic); - - startStreams(); - - List<KeyValue<String, String>> windowedOutput = receiveMessages( - new StringDeserializer(), - new StringDeserializer() - , 15); - - Comparator<KeyValue<String, String>> - comparator = - new Comparator<KeyValue<String, String>>() { - @Override - public int compare(final KeyValue<String, String> o1, - final KeyValue<String, String> o2) { - return KGroupedStreamIntegrationTest.compare(o1, o2); - } - }; - - Collections.sort(windowedOutput, comparator); - long firstBatchWindow = firstBatchTimestamp / 500 * 500; - long secondBatchWindow = secondBatchTimestamp / 500 * 500; - - assertThat(windowedOutput, is( - Arrays.asList( - new KeyValue<>("A@" + firstBatchWindow, "A"), - new KeyValue<>("A@" + secondBatchWindow, "A"), - new KeyValue<>("A@" + secondBatchWindow, "A:A"), - new KeyValue<>("B@" + firstBatchWindow, "B"), - new KeyValue<>("B@" + secondBatchWindow, "B"), - new KeyValue<>("B@" + secondBatchWindow, "B:B"), - new KeyValue<>("C@" + firstBatchWindow, "C"), - new KeyValue<>("C@" + secondBatchWindow, "C"), - new KeyValue<>("C@" + secondBatchWindow, "C:C"), - new KeyValue<>("D@" + firstBatchWindow, "D"), - new KeyValue<>("D@" + secondBatchWindow, "D"), - new KeyValue<>("D@" + secondBatchWindow, "D:D"), - new KeyValue<>("E@" + firstBatchWindow, "E"), - new KeyValue<>("E@" + secondBatchWindow, "E"), - new KeyValue<>("E@" + secondBatchWindow, "E:E") - ) - )); - } - - @Test - public void shouldAggregate() throws Exception { - produceMessages(System.currentTimeMillis()); - groupedStream.aggregate( - initializer, - aggregator, - Serdes.Integer(), - "aggregate-by-selected-key") - .to(Serdes.String(), Serdes.Integer(), outputTopic); - - startStreams(); - - produceMessages(System.currentTimeMillis()); - - List<KeyValue<String, Integer>> results = receiveMessages( - new StringDeserializer(), - new IntegerDeserializer() - , 10); - - Collections.sort(results, new Comparator<KeyValue<String, Integer>>() { - @Override - public int compare(KeyValue<String, Integer> o1, KeyValue<String, Integer> o2) { - return KGroupedStreamIntegrationTest.compare(o1, o2); - } - }); - - assertThat(results, is(Arrays.asList( - KeyValue.pair("A", 1), - KeyValue.pair("A", 2), - KeyValue.pair("B", 1), - KeyValue.pair("B", 2), - KeyValue.pair("C", 1), - KeyValue.pair("C", 2), - KeyValue.pair("D", 1), - KeyValue.pair("D", 2), - KeyValue.pair("E", 1), - KeyValue.pair("E", 2) - ))); - } - - @Test - public void shouldAggregateWindowed() throws Exception { - long firstTimestamp = System.currentTimeMillis() - 1000; - produceMessages(firstTimestamp); - long secondTimestamp = System.currentTimeMillis(); - produceMessages(secondTimestamp); - produceMessages(secondTimestamp); - - groupedStream.aggregate( - initializer, - aggregator, - TimeWindows.of("aggregate-by-key-windowed", 500L), - Serdes.Integer()) - .toStream(new KeyValueMapper<Windowed<String>, Integer, String>() { - @Override - public String apply(Windowed<String> windowedKey, Integer value) { - return windowedKey.key() + "@" + windowedKey.window().start(); - } - }) - .to(Serdes.String(), Serdes.Integer(), outputTopic); - - startStreams(); - - List<KeyValue<String, Integer>> windowedMessages = receiveMessages( - new StringDeserializer(), - new IntegerDeserializer() - , 15); - - Comparator<KeyValue<String, Integer>> - comparator = - new Comparator<KeyValue<String, Integer>>() { - @Override - public int compare(final KeyValue<String, Integer> o1, - final KeyValue<String, Integer> o2) { - return KGroupedStreamIntegrationTest.compare(o1, o2); - } - }; - - Collections.sort(windowedMessages, comparator); - - long firstWindow = firstTimestamp / 500 * 500; - long secondWindow = secondTimestamp / 500 * 500; - - assertThat(windowedMessages, is( - Arrays.asList( - new KeyValue<>("A@" + firstWindow, 1), - new KeyValue<>("A@" + secondWindow, 1), - new KeyValue<>("A@" + secondWindow, 2), - new KeyValue<>("B@" + firstWindow, 1), - new KeyValue<>("B@" + secondWindow, 1), - new KeyValue<>("B@" + secondWindow, 2), - new KeyValue<>("C@" + firstWindow, 1), - new KeyValue<>("C@" + secondWindow, 1), - new KeyValue<>("C@" + secondWindow, 2), - new KeyValue<>("D@" + firstWindow, 1), - new KeyValue<>("D@" + secondWindow, 1), - new KeyValue<>("D@" + secondWindow, 2), - new KeyValue<>("E@" + firstWindow, 1), - new KeyValue<>("E@" + secondWindow, 1), - new KeyValue<>("E@" + secondWindow, 2) - ))); - } - - @Test - public void shouldCount() throws Exception { - produceMessages(System.currentTimeMillis()); - - groupedStream.count("count-by-key") - .to(Serdes.String(), Serdes.Long(), outputTopic); - - startStreams(); - - produceMessages(System.currentTimeMillis()); - - List<KeyValue<String, Long>> results = receiveMessages( - new StringDeserializer(), - new LongDeserializer() - , 10); - Collections.sort(results, new Comparator<KeyValue<String, Long>>() { - @Override - public int compare(KeyValue<String, Long> o1, KeyValue<String, Long> o2) { - return KGroupedStreamIntegrationTest.compare(o1, o2); - } - }); - - assertThat(results, is(Arrays.asList( - KeyValue.pair("A", 1L), - KeyValue.pair("A", 2L), - KeyValue.pair("B", 1L), - KeyValue.pair("B", 2L), - KeyValue.pair("C", 1L), - KeyValue.pair("C", 2L), - KeyValue.pair("D", 1L), - KeyValue.pair("D", 2L), - KeyValue.pair("E", 1L), - KeyValue.pair("E", 2L) - ))); - } - - @Test - public void shouldGroupByKey() throws Exception { - long timestamp = System.currentTimeMillis(); - produceMessages(timestamp); - produceMessages(timestamp); - - stream.groupByKey(Serdes.Integer(), Serdes.String()) - .count(TimeWindows.of("count-windows", 500L)) - .toStream(new KeyValueMapper<Windowed<Integer>, Long, String>() { - @Override - public String apply(final Windowed<Integer> windowedKey, final Long value) { - return windowedKey.key() + "@" + windowedKey.window().start(); - } - }).to(Serdes.String(), Serdes.Long(), outputTopic); - - startStreams(); - - List<KeyValue<String, Long>> results = receiveMessages( - new StringDeserializer(), - new LongDeserializer() - , 10); - Collections.sort(results, new Comparator<KeyValue<String, Long>>() { - @Override - public int compare(KeyValue<String, Long> o1, KeyValue<String, Long> o2) { - return KGroupedStreamIntegrationTest.compare(o1, o2); - } - }); - - long window = timestamp / 500 * 500; - assertThat(results, is(Arrays.asList( - KeyValue.pair("1@" + window, 1L), - KeyValue.pair("1@" + window, 2L), - KeyValue.pair("2@" + window, 1L), - KeyValue.pair("2@" + window, 2L), - KeyValue.pair("3@" + window, 1L), - KeyValue.pair("3@" + window, 2L), - KeyValue.pair("4@" + window, 1L), - KeyValue.pair("4@" + window, 2L), - KeyValue.pair("5@" + window, 1L), - KeyValue.pair("5@" + window, 2L) - ))); - - } - - - private void produceMessages(long timestamp) - throws ExecutionException, InterruptedException { - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( - streamOneInput, - Arrays.asList( - new KeyValue<>(1, "A"), - new KeyValue<>(2, "B"), - new KeyValue<>(3, "C"), - new KeyValue<>(4, "D"), - new KeyValue<>(5, "E")), - TestUtils.producerConfig( - CLUSTER.bootstrapServers(), - IntegerSerializer.class, - StringSerializer.class, - new Properties()), - timestamp); - } - - - private void createTopics() { - streamOneInput = "stream-one-" + testNo; - outputTopic = "output-" + testNo; - CLUSTER.createTopic(streamOneInput, 3, 1); - CLUSTER.createTopic(outputTopic); - } - - private void startStreams() { - kafkaStreams = new KafkaStreams(builder, streamsConfiguration); - kafkaStreams.start(); - } - - - private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K> - keyDeserializer, - final Deserializer<V> - valueDeserializer, - final int numMessages) - throws InterruptedException { - final Properties consumerProperties = new Properties(); - consumerProperties - .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + - testNo); - consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - keyDeserializer.getClass().getName()); - consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - valueDeserializer.getClass().getName()); - return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerProperties, - outputTopic, - numMessages, 60 * 1000); - - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/136a8fab/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java new file mode 100644 index 0000000..b91a907 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -0,0 +1,466 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at <p> http://www.apache.org/licenses/LICENSE-2.0 <p> Unless required by + * applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.KGroupedStream; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Reducer; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.test.MockKeyValueMapper; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +public class KStreamAggregationIntegrationTest { + + @ClassRule + public static final EmbeddedSingleNodeKafkaCluster CLUSTER = + new EmbeddedSingleNodeKafkaCluster(); + private static volatile int testNo = 0; + private KStreamBuilder builder; + private Properties streamsConfiguration; + private KafkaStreams kafkaStreams; + private String streamOneInput; + private String outputTopic; + private KGroupedStream<String, String> groupedStream; + private Reducer<String> reducer; + private Initializer<Integer> initializer; + private Aggregator<String, String, Integer> aggregator; + private KStream<Integer, String> stream; + + + @Before + public void before() { + testNo++; + builder = new KStreamBuilder(); + createTopics(); + streamsConfiguration = new Properties(); + String applicationId = "kgrouped-stream-test-" + + testNo; + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + streamsConfiguration + .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + + KeyValueMapper<Integer, String, String> mapper = MockKeyValueMapper.<Integer, String>SelectValueMapper(); + stream = builder.stream(Serdes.Integer(), Serdes.String(), streamOneInput); + groupedStream = stream + .groupBy( + mapper, + Serdes.String(), + Serdes.String()); + + reducer = new Reducer<String>() { + @Override + public String apply(String value1, String value2) { + return value1 + ":" + value2; + } + }; + initializer = new Initializer<Integer>() { + @Override + public Integer apply() { + return 0; + } + }; + aggregator = new Aggregator<String, String, Integer>() { + @Override + public Integer apply(String aggKey, String value, Integer aggregate) { + return aggregate + value.length(); + } + }; + } + + @After + public void whenShuttingDown() throws IOException { + if (kafkaStreams != null) { + kafkaStreams.close(); + } + IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); + } + + + @Test + public void shouldReduce() throws Exception { + produceMessages(System.currentTimeMillis()); + groupedStream + .reduce(reducer, "reduce-by-key") + .to(Serdes.String(), Serdes.String(), outputTopic); + + startStreams(); + + produceMessages(System.currentTimeMillis()); + + List<KeyValue<String, String>> results = receiveMessages( + new StringDeserializer(), + new StringDeserializer() + , 10); + + Collections.sort(results, new Comparator<KeyValue<String, String>>() { + @Override + public int compare(KeyValue<String, String> o1, KeyValue<String, String> o2) { + return KStreamAggregationIntegrationTest.compare(o1, o2); + } + }); + + assertThat(results, is(Arrays.asList(KeyValue.pair("A", "A"), + KeyValue.pair("A", "A:A"), + KeyValue.pair("B", "B"), + KeyValue.pair("B", "B:B"), + KeyValue.pair("C", "C"), + KeyValue.pair("C", "C:C"), + KeyValue.pair("D", "D"), + KeyValue.pair("D", "D:D"), + KeyValue.pair("E", "E"), + KeyValue.pair("E", "E:E")))); + } + + @SuppressWarnings("unchecked") + private static <K extends Comparable, V extends Comparable> int compare(final KeyValue<K, V> o1, + final KeyValue<K, V> o2) { + final int keyComparison = o1.key.compareTo(o2.key); + if (keyComparison == 0) { + return o1.value.compareTo(o2.value); + } + return keyComparison; + } + + @Test + public void shouldReduceWindowed() throws Exception { + long firstBatchTimestamp = System.currentTimeMillis() - 1000; + produceMessages(firstBatchTimestamp); + long secondBatchTimestamp = System.currentTimeMillis(); + produceMessages(secondBatchTimestamp); + produceMessages(secondBatchTimestamp); + + groupedStream + .reduce(reducer, TimeWindows.of("reduce-time-windows", 500L)) + .toStream(new KeyValueMapper<Windowed<String>, String, String>() { + @Override + public String apply(Windowed<String> windowedKey, String value) { + return windowedKey.key() + "@" + windowedKey.window().start(); + } + }) + .to(Serdes.String(), Serdes.String(), outputTopic); + + startStreams(); + + List<KeyValue<String, String>> windowedOutput = receiveMessages( + new StringDeserializer(), + new StringDeserializer() + , 15); + + Comparator<KeyValue<String, String>> + comparator = + new Comparator<KeyValue<String, String>>() { + @Override + public int compare(final KeyValue<String, String> o1, + final KeyValue<String, String> o2) { + return KStreamAggregationIntegrationTest.compare(o1, o2); + } + }; + + Collections.sort(windowedOutput, comparator); + long firstBatchWindow = firstBatchTimestamp / 500 * 500; + long secondBatchWindow = secondBatchTimestamp / 500 * 500; + + assertThat(windowedOutput, is( + Arrays.asList( + new KeyValue<>("A@" + firstBatchWindow, "A"), + new KeyValue<>("A@" + secondBatchWindow, "A"), + new KeyValue<>("A@" + secondBatchWindow, "A:A"), + new KeyValue<>("B@" + firstBatchWindow, "B"), + new KeyValue<>("B@" + secondBatchWindow, "B"), + new KeyValue<>("B@" + secondBatchWindow, "B:B"), + new KeyValue<>("C@" + firstBatchWindow, "C"), + new KeyValue<>("C@" + secondBatchWindow, "C"), + new KeyValue<>("C@" + secondBatchWindow, "C:C"), + new KeyValue<>("D@" + firstBatchWindow, "D"), + new KeyValue<>("D@" + secondBatchWindow, "D"), + new KeyValue<>("D@" + secondBatchWindow, "D:D"), + new KeyValue<>("E@" + firstBatchWindow, "E"), + new KeyValue<>("E@" + secondBatchWindow, "E"), + new KeyValue<>("E@" + secondBatchWindow, "E:E") + ) + )); + } + + @Test + public void shouldAggregate() throws Exception { + produceMessages(System.currentTimeMillis()); + groupedStream.aggregate( + initializer, + aggregator, + Serdes.Integer(), + "aggregate-by-selected-key") + .to(Serdes.String(), Serdes.Integer(), outputTopic); + + startStreams(); + + produceMessages(System.currentTimeMillis()); + + List<KeyValue<String, Integer>> results = receiveMessages( + new StringDeserializer(), + new IntegerDeserializer() + , 10); + + Collections.sort(results, new Comparator<KeyValue<String, Integer>>() { + @Override + public int compare(KeyValue<String, Integer> o1, KeyValue<String, Integer> o2) { + return KStreamAggregationIntegrationTest.compare(o1, o2); + } + }); + + assertThat(results, is(Arrays.asList( + KeyValue.pair("A", 1), + KeyValue.pair("A", 2), + KeyValue.pair("B", 1), + KeyValue.pair("B", 2), + KeyValue.pair("C", 1), + KeyValue.pair("C", 2), + KeyValue.pair("D", 1), + KeyValue.pair("D", 2), + KeyValue.pair("E", 1), + KeyValue.pair("E", 2) + ))); + } + + @Test + public void shouldAggregateWindowed() throws Exception { + long firstTimestamp = System.currentTimeMillis() - 1000; + produceMessages(firstTimestamp); + long secondTimestamp = System.currentTimeMillis(); + produceMessages(secondTimestamp); + produceMessages(secondTimestamp); + + groupedStream.aggregate( + initializer, + aggregator, + TimeWindows.of("aggregate-by-key-windowed", 500L), + Serdes.Integer()) + .toStream(new KeyValueMapper<Windowed<String>, Integer, String>() { + @Override + public String apply(Windowed<String> windowedKey, Integer value) { + return windowedKey.key() + "@" + windowedKey.window().start(); + } + }) + .to(Serdes.String(), Serdes.Integer(), outputTopic); + + startStreams(); + + List<KeyValue<String, Integer>> windowedMessages = receiveMessages( + new StringDeserializer(), + new IntegerDeserializer() + , 15); + + Comparator<KeyValue<String, Integer>> + comparator = + new Comparator<KeyValue<String, Integer>>() { + @Override + public int compare(final KeyValue<String, Integer> o1, + final KeyValue<String, Integer> o2) { + return KStreamAggregationIntegrationTest.compare(o1, o2); + } + }; + + Collections.sort(windowedMessages, comparator); + + long firstWindow = firstTimestamp / 500 * 500; + long secondWindow = secondTimestamp / 500 * 500; + + assertThat(windowedMessages, is( + Arrays.asList( + new KeyValue<>("A@" + firstWindow, 1), + new KeyValue<>("A@" + secondWindow, 1), + new KeyValue<>("A@" + secondWindow, 2), + new KeyValue<>("B@" + firstWindow, 1), + new KeyValue<>("B@" + secondWindow, 1), + new KeyValue<>("B@" + secondWindow, 2), + new KeyValue<>("C@" + firstWindow, 1), + new KeyValue<>("C@" + secondWindow, 1), + new KeyValue<>("C@" + secondWindow, 2), + new KeyValue<>("D@" + firstWindow, 1), + new KeyValue<>("D@" + secondWindow, 1), + new KeyValue<>("D@" + secondWindow, 2), + new KeyValue<>("E@" + firstWindow, 1), + new KeyValue<>("E@" + secondWindow, 1), + new KeyValue<>("E@" + secondWindow, 2) + ))); + } + + @Test + public void shouldCount() throws Exception { + produceMessages(System.currentTimeMillis()); + + groupedStream.count("count-by-key") + .to(Serdes.String(), Serdes.Long(), outputTopic); + + startStreams(); + + produceMessages(System.currentTimeMillis()); + + List<KeyValue<String, Long>> results = receiveMessages( + new StringDeserializer(), + new LongDeserializer() + , 10); + Collections.sort(results, new Comparator<KeyValue<String, Long>>() { + @Override + public int compare(KeyValue<String, Long> o1, KeyValue<String, Long> o2) { + return KStreamAggregationIntegrationTest.compare(o1, o2); + } + }); + + assertThat(results, is(Arrays.asList( + KeyValue.pair("A", 1L), + KeyValue.pair("A", 2L), + KeyValue.pair("B", 1L), + KeyValue.pair("B", 2L), + KeyValue.pair("C", 1L), + KeyValue.pair("C", 2L), + KeyValue.pair("D", 1L), + KeyValue.pair("D", 2L), + KeyValue.pair("E", 1L), + KeyValue.pair("E", 2L) + ))); + } + + @Test + public void shouldGroupByKey() throws Exception { + long timestamp = System.currentTimeMillis(); + produceMessages(timestamp); + produceMessages(timestamp); + + stream.groupByKey(Serdes.Integer(), Serdes.String()) + .count(TimeWindows.of("count-windows", 500L)) + .toStream(new KeyValueMapper<Windowed<Integer>, Long, String>() { + @Override + public String apply(final Windowed<Integer> windowedKey, final Long value) { + return windowedKey.key() + "@" + windowedKey.window().start(); + } + }).to(Serdes.String(), Serdes.Long(), outputTopic); + + startStreams(); + + List<KeyValue<String, Long>> results = receiveMessages( + new StringDeserializer(), + new LongDeserializer() + , 10); + Collections.sort(results, new Comparator<KeyValue<String, Long>>() { + @Override + public int compare(KeyValue<String, Long> o1, KeyValue<String, Long> o2) { + return KStreamAggregationIntegrationTest.compare(o1, o2); + } + }); + + long window = timestamp / 500 * 500; + assertThat(results, is(Arrays.asList( + KeyValue.pair("1@" + window, 1L), + KeyValue.pair("1@" + window, 2L), + KeyValue.pair("2@" + window, 1L), + KeyValue.pair("2@" + window, 2L), + KeyValue.pair("3@" + window, 1L), + KeyValue.pair("3@" + window, 2L), + KeyValue.pair("4@" + window, 1L), + KeyValue.pair("4@" + window, 2L), + KeyValue.pair("5@" + window, 1L), + KeyValue.pair("5@" + window, 2L) + ))); + + } + + + private void produceMessages(long timestamp) + throws ExecutionException, InterruptedException { + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + streamOneInput, + Arrays.asList( + new KeyValue<>(1, "A"), + new KeyValue<>(2, "B"), + new KeyValue<>(3, "C"), + new KeyValue<>(4, "D"), + new KeyValue<>(5, "E")), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + IntegerSerializer.class, + StringSerializer.class, + new Properties()), + timestamp); + } + + + private void createTopics() { + streamOneInput = "stream-one-" + testNo; + outputTopic = "output-" + testNo; + CLUSTER.createTopic(streamOneInput, 3, 1); + CLUSTER.createTopic(outputTopic); + } + + private void startStreams() { + kafkaStreams = new KafkaStreams(builder, streamsConfiguration); + kafkaStreams.start(); + } + + + private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K> + keyDeserializer, + final Deserializer<V> + valueDeserializer, + final int numMessages) + throws InterruptedException { + final Properties consumerProperties = new Properties(); + consumerProperties + .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + + testNo); + consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + keyDeserializer.getClass().getName()); + consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + valueDeserializer.getClass().getName()); + return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerProperties, + outputTopic, + numMessages, 60 * 1000); + + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/136a8fab/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java new file mode 100644 index 0000000..b7d4fc3 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.kafka.streams.integration; + + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Reducer; +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.test.TestUtils; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + +/** + * End-to-end integration test that demonstrates how to perform a join between a KStream and a + * KTable (think: KStream.leftJoin(KTable)), i.e. an example of a stateful computation. + */ +public class KStreamKTableJoinIntegrationTest { + @ClassRule + public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster(); + private static final String USER_CLICKS_TOPIC = "user-clicks"; + private static final String USER_REGIONS_TOPIC = "user-regions"; + private static final String OUTPUT_TOPIC = "output-topic"; + + @BeforeClass + public static void startKafkaCluster() throws Exception { + CLUSTER.createTopic(USER_CLICKS_TOPIC); + CLUSTER.createTopic(USER_REGIONS_TOPIC); + CLUSTER.createTopic(OUTPUT_TOPIC); + } + + /** + * Tuple for a region and its associated number of clicks. + */ + private static final class RegionWithClicks { + + private final String region; + private final long clicks; + + public RegionWithClicks(String region, long clicks) { + if (region == null || region.isEmpty()) { + throw new IllegalArgumentException("region must be set"); + } + if (clicks < 0) { + throw new IllegalArgumentException("clicks must not be negative"); + } + this.region = region; + this.clicks = clicks; + } + + public String getRegion() { + return region; + } + + public long getClicks() { + return clicks; + } + + } + + @Test + public void shouldCountClicksPerRegion() throws Exception { + // Input 1: Clicks per user (multiple records allowed per user). + List<KeyValue<String, Long>> userClicks = Arrays.asList( + new KeyValue<>("alice", 13L), + new KeyValue<>("bob", 4L), + new KeyValue<>("chao", 25L), + new KeyValue<>("bob", 19L), + new KeyValue<>("dave", 56L), + new KeyValue<>("eve", 78L), + new KeyValue<>("alice", 40L), + new KeyValue<>("fang", 99L) + ); + + // Input 2: Region per user (multiple records allowed per user). + List<KeyValue<String, String>> userRegions = Arrays.asList( + new KeyValue<>("alice", "asia"), /* Alice lived in Asia originally... */ + new KeyValue<>("bob", "americas"), + new KeyValue<>("chao", "asia"), + new KeyValue<>("dave", "europe"), + new KeyValue<>("alice", "europe"), /* ...but moved to Europe some time later. */ + new KeyValue<>("eve", "americas"), + new KeyValue<>("fang", "asia") + ); + + List<KeyValue<String, Long>> expectedClicksPerRegion = Arrays.asList( + new KeyValue<>("europe", 13L), + new KeyValue<>("americas", 4L), + new KeyValue<>("asia", 25L), + new KeyValue<>("americas", 23L), + new KeyValue<>("europe", 69L), + new KeyValue<>("americas", 101L), + new KeyValue<>("europe", 109L), + new KeyValue<>("asia", 124L) + ); + + // + // Step 1: Configure and start the processor topology. + // + final Serde<String> stringSerde = Serdes.String(); + final Serde<Long> longSerde = Serdes.Long(); + + Properties streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-integration-test"); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); + streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, + TestUtils.tempDirectory().getPath()); + + // Remove any state from previous test runs + IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); + + KStreamBuilder builder = new KStreamBuilder(); + + // This KStream contains information such as "alice" -> 13L. + // + // Because this is a KStream ("record stream"), multiple records for the same user will be + // considered as separate click-count events, each of which will be added to the total count. + KStream<String, Long> userClicksStream = builder.stream(stringSerde, longSerde, USER_CLICKS_TOPIC); + + // This KTable contains information such as "alice" -> "europe". + // + // Because this is a KTable ("changelog stream"), only the latest value (here: region) for a + // record key will be considered at the time when a new user-click record (see above) is + // received for the `leftJoin` below. Any previous region values are being considered out of + // date. This behavior is quite different to the KStream for user clicks above. + // + // For example, the user "alice" will be considered to live in "europe" (although originally she + // lived in "asia") because, at the time her first user-click record is being received and + // subsequently processed in the `leftJoin`, the latest region update for "alice" is "europe" + // (which overrides her previous region value of "asia"). + KTable<String, String> userRegionsTable = + builder.table(stringSerde, stringSerde, USER_REGIONS_TOPIC); + + // Compute the number of clicks per region, e.g. "europe" -> 13L. + // + // The resulting KTable is continuously being updated as new data records are arriving in the + // input KStream `userClicksStream` and input KTable `userRegionsTable`. + KTable<String, Long> clicksPerRegion = userClicksStream + // Join the stream against the table. + // + // Null values possible: In general, null values are possible for region (i.e. the value of + // the KTable we are joining against) so we must guard against that (here: by setting the + // fallback region "UNKNOWN"). In this specific example this is not really needed because + // we know, based on the test setup, that all users have appropriate region entries at the + // time we perform the join. + // + // Also, we need to return a tuple of (region, clicks) for each user. But because Java does + // not support tuples out-of-the-box, we must use a custom class `RegionWithClicks` to + // achieve the same effect. + .leftJoin(userRegionsTable, new ValueJoiner<Long, String, RegionWithClicks>() { + @Override + public RegionWithClicks apply(Long clicks, String region) { + return new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks); + } + }) + // Change the stream from <user> -> <region, clicks> to <region> -> <clicks> + .map(new KeyValueMapper<String, RegionWithClicks, KeyValue<String, Long>>() { + @Override + public KeyValue<String, Long> apply(String key, RegionWithClicks value) { + return new KeyValue<>(value.getRegion(), value.getClicks()); + } + }) + // Compute the total per region by summing the individual click counts per region. + .groupByKey(stringSerde, longSerde) + .reduce(new Reducer<Long>() { + @Override + public Long apply(Long value1, Long value2) { + return value1 + value2; + } + }, "ClicksPerRegionUnwindowed"); + + // Write the (continuously updating) results to the output topic. + clicksPerRegion.to(stringSerde, longSerde, OUTPUT_TOPIC); + + KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); + streams.start(); + + // + // Step 2: Publish user-region information. + // + // To keep this code example simple and easier to understand/reason about, we publish all + // user-region records before any user-click records (cf. step 3). In practice though, + // data records would typically be arriving concurrently in both input streams/topics. + Properties userRegionsProducerConfig = new Properties(); + userRegionsProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + userRegionsProducerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); + userRegionsProducerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); + userRegionsProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + userRegionsProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + IntegrationTestUtils.produceKeyValuesSynchronously(USER_REGIONS_TOPIC, userRegions, userRegionsProducerConfig); + + // + // Step 3: Publish some user click events. + // + Properties userClicksProducerConfig = new Properties(); + userClicksProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + userClicksProducerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); + userClicksProducerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); + userClicksProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + userClicksProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class); + IntegrationTestUtils.produceKeyValuesSynchronously(USER_CLICKS_TOPIC, userClicks, userClicksProducerConfig); + + // + // Step 4: Verify the application's output data. + // + Properties consumerConfig = new Properties(); + consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "join-integration-test-standard-consumer"); + consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); + List<KeyValue<String, Long>> actualClicksPerRegion = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, + OUTPUT_TOPIC, expectedClicksPerRegion.size()); + streams.close(); + assertThat(actualClicksPerRegion, equalTo(expectedClicksPerRegion)); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/136a8fab/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java index 9aaafe6..434216e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.test.MockKeyValueMapper; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; @@ -59,8 +60,7 @@ public class KStreamRepartitionJoinTest { private KStream<Integer, String> streamTwo; private KStream<Integer, String> streamFour; private ValueJoiner<Integer, String, String> valueJoiner; - private KeyValueMapper<Long, Integer, KeyValue<Integer, Integer>> - keyMapper; + private KeyValueMapper<Long, Integer, KeyValue<Integer, Integer>> keyMapper; private final List<String> expectedStreamOneTwoJoin = Arrays.asList("1:A", "2:B", "3:C", "4:D", "5:E"); @@ -77,16 +77,13 @@ public class KStreamRepartitionJoinTest { builder = new KStreamBuilder(); createTopics(); streamsConfiguration = new Properties(); - streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, - applicationId); - streamsConfiguration - .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3); - streamOne = builder.stream(Serdes.Long(), Serdes.Integer(), streamOneInput); streamTwo = builder.stream(Serdes.Integer(), Serdes.String(), streamTwoInput); streamFour = builder.stream(Serdes.Integer(), Serdes.String(), streamFourInput); @@ -98,12 +95,7 @@ public class KStreamRepartitionJoinTest { } }; - keyMapper = new KeyValueMapper<Long, Integer, KeyValue<Integer, Integer>>() { - @Override - public KeyValue<Integer, Integer> apply(final Long key, final Integer value) { - return new KeyValue<>(value, value); - } - }; + keyMapper = MockKeyValueMapper.<Long, Integer>SelectValueKeyValueMapper(); } @After @@ -146,19 +138,8 @@ public class KStreamRepartitionJoinTest { } private ExpectedOutputOnTopic mapBothStreamsAndJoin() throws Exception { - - final KStream<Integer, Integer> - map1 = - streamOne.map(keyMapper); - - final KStream<Integer, String> map2 = streamTwo.map( - new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() { - @Override - public KeyValue<Integer, String> apply(Integer key, - String value) { - return new KeyValue<>(key, value); - } - }); + final KStream<Integer, Integer> map1 = streamOne.map(keyMapper); + final KStream<Integer, String> map2 = streamTwo.map(MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper()); doJoin(map1, map2, "map-both-streams-and-join", "map-both-join"); return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, "map-both-streams-and-join"); @@ -185,14 +166,8 @@ public class KStreamRepartitionJoinTest { public ExpectedOutputOnTopic selectKeyAndJoin() throws ExecutionException, InterruptedException { - final KStream<Integer, Integer> - keySelected = - streamOne.selectKey(new KeyValueMapper<Long, Integer, Integer>() { - @Override - public Integer apply(final Long key, final Integer value) { - return value; - } - }); + final KStream<Integer, Integer> keySelected = + streamOne.selectKey(MockKeyValueMapper.<Long, Integer>SelectValueMapper()); String outputTopic = "select-key-join"; doJoin(keySelected, streamTwo, outputTopic, outputTopic); @@ -239,18 +214,9 @@ public class KStreamRepartitionJoinTest { } public ExpectedOutputOnTopic mapBothStreamsAndLeftJoin() throws Exception { - final KStream<Integer, Integer> - map1 = - streamOne.map(keyMapper); + final KStream<Integer, Integer> map1 = streamOne.map(keyMapper); - final KStream<Integer, String> map2 = streamTwo.map( - new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() { - @Override - public KeyValue<Integer, String> apply(Integer key, - String value) { - return new KeyValue<>(key, value); - } - }); + final KStream<Integer, String> map2 = streamTwo.map(MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper()); String outputTopic = "left-join"; map1.leftJoin(map2, @@ -266,19 +232,10 @@ public class KStreamRepartitionJoinTest { private ExpectedOutputOnTopic joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined() throws Exception { - final KStream<Integer, Integer> - map1 = - streamOne.map(keyMapper); + final KStream<Integer, Integer> map1 = streamOne.map(keyMapper); final KeyValueMapper<Integer, String, KeyValue<Integer, String>> - kvMapper = - new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() { - @Override - public KeyValue<Integer, String> apply(Integer key, - String value) { - return new KeyValue<>(key, value); - } - }; + kvMapper = MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper(); final KStream<Integer, String> map2 = streamTwo.map(kvMapper); http://git-wip-us.apache.org/repos/asf/kafka/blob/136a8fab/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java deleted file mode 100644 index 2096d9b..0000000 --- a/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - - -package org.apache.kafka.streams.integration; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.kstream.ValueMapper; - -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Locale; -import java.util.Properties; - -import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster; -import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.junit.Assert.assertThat; - -/** - * End-to-end integration test based on a simple map, using an embedded Kafka cluster. - */ -public class MapFunctionIntegrationTest { - @ClassRule - public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster(); - private static final String DEFAULT_INPUT_TOPIC = "inputTopic"; - private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic"; - - @BeforeClass - public static void startKafkaCluster() throws Exception { - CLUSTER.createTopic(DEFAULT_INPUT_TOPIC); - CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC); - } - - @Test - public void shouldUppercaseTheInput() throws Exception { - List<String> inputValues = Arrays.asList("hello", "world"); - List<String> expectedValues = new ArrayList<>(); - for (String input : inputValues) { - expectedValues.add(input.toUpperCase(Locale.getDefault())); - } - - // - // Step 1: Configure and start the processor topology. - // - KStreamBuilder builder = new KStreamBuilder(); - - Properties streamsConfiguration = new Properties(); - streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "map-function-integration-test"); - streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); - streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); - streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - KStream<byte[], String> input = builder.stream(DEFAULT_INPUT_TOPIC); - KStream<byte[], String> uppercased = input.mapValues(new ValueMapper<String, String>() { - @Override - public String apply(String value) { - return value.toUpperCase(Locale.getDefault()); - } - }); - uppercased.to(DEFAULT_OUTPUT_TOPIC); - - KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); - streams.start(); - - // - // Step 2: Produce some input data to the input topic. - // - Properties producerConfig = new Properties(); - producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); - producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); - producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerConfig); - - // - // Step 3: Verify the application's output data. - // - Properties consumerConfig = new Properties(); - consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "map-function-integration-test-standard-consumer"); - consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - List<String> actualValues = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig, - DEFAULT_OUTPUT_TOPIC, inputValues.size()); - streams.close(); - assertThat(actualValues, equalTo(expectedValues)); - } - -} \ No newline at end of file
