Repository: kafka Updated Branches: refs/heads/trunk af9fc503d -> a960faf5f
KAFKA-4105: Queryable state tests Author: Eno Thereska <[email protected]> Reviewers: Damian Guy, Guozhang Wang Closes #1806 from enothereska/queryable-state-tests Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a960faf5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a960faf5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a960faf5 Branch: refs/heads/trunk Commit: a960faf5f48025548de71a84b16e0ccf2a1837ca Parents: af9fc50 Author: Eno Thereska <[email protected]> Authored: Sun Sep 4 21:49:48 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Sun Sep 4 21:49:48 2016 -0700 ---------------------------------------------------------------------- .../apache/kafka/streams/KafkaStreamsTest.java | 5 +- .../integration/FanoutIntegrationTest.java | 5 +- .../InternalTopicIntegrationTest.java | 5 +- .../KStreamAggregationIntegrationTest.java | 8 +- .../KStreamKTableJoinIntegrationTest.java | 6 +- .../integration/KStreamRepartitionJoinTest.java | 8 +- .../QueryableStateIntegrationTest.java | 410 ++++++++++++++++++- .../integration/RegexSourceIntegrationTest.java | 5 +- .../integration/ResetIntegrationTest.java | 5 +- .../integration/utils/EmbeddedKafkaCluster.java | 144 +++++++ .../utils/EmbeddedSingleNodeKafkaCluster.java | 135 ------ 11 files changed, 566 insertions(+), 170 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/a960faf5/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index b15c2ee..7330810 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -18,7 +18,7 @@ package org.apache.kafka.streams; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.test.MockMetricsReporter; @@ -35,10 +35,11 @@ import static org.junit.Assert.assertTrue; public class KafkaStreamsTest { + private static final int NUM_BROKERS = 1; // We need this to avoid the KafkaConsumer hanging on poll (this may occur if the test doesn't complete // quick enough) @ClassRule - public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster(); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); @Test public void testStartAndClose() throws Exception { http://git-wip-us.apache.org/repos/asf/kafka/blob/a960faf5/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java index 6494533..56cba58 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java @@ -38,7 +38,7 @@ import java.util.List; import java.util.Locale; import java.util.Properties; -import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import static org.hamcrest.CoreMatchers.equalTo; @@ -64,8 +64,9 @@ import static org.junit.Assert.assertThat; * </pre> */ public class FanoutIntegrationTest { + private static final int NUM_BROKERS = 1; @ClassRule - public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster(); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); private static final String INPUT_TOPIC_A = "A"; private static final String OUTPUT_TOPIC_B = "B"; private static final String OUTPUT_TOPIC_C = "C"; http://git-wip-us.apache.org/repos/asf/kafka/blob/a960faf5/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java index 968e060..f88c1b2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -24,7 +24,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; @@ -55,8 +55,9 @@ import scala.collection.Map; * Tests related to internal topics in streams */ public class InternalTopicIntegrationTest { + private static final int NUM_BROKERS = 1; @ClassRule - public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster(); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); private static final String DEFAULT_INPUT_TOPIC = "inputTopic"; private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic"; private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10 * 1000; http://git-wip-us.apache.org/repos/asf/kafka/blob/a960faf5/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index 71aebad..17e197c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -21,7 +21,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Initializer; @@ -51,10 +51,10 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; public class KStreamAggregationIntegrationTest { - + private static final int NUM_BROKERS = 1; @ClassRule - public static final EmbeddedSingleNodeKafkaCluster CLUSTER = - new EmbeddedSingleNodeKafkaCluster(); + public static final EmbeddedKafkaCluster CLUSTER = + new EmbeddedKafkaCluster(NUM_BROKERS); private static volatile int testNo = 0; private KStreamBuilder builder; private Properties streamsConfiguration; http://git-wip-us.apache.org/repos/asf/kafka/blob/a960faf5/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java index e290eb0..536ad24 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java @@ -41,7 +41,7 @@ import java.util.Arrays; import java.util.List; import java.util.Properties; -import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import static org.hamcrest.CoreMatchers.equalTo; @@ -52,8 +52,10 @@ import static org.junit.Assert.assertThat; * KTable (think: KStream.leftJoin(KTable)), i.e. an example of a stateful computation. */ public class KStreamKTableJoinIntegrationTest { + + private static final int NUM_BROKERS = 1; @ClassRule - public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster(); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); private static final String USER_CLICKS_TOPIC = "user-clicks"; private static final String USER_REGIONS_TOPIC = "user-regions"; private static final String USER_REGIONS_STORE_NAME = "user-regions-store-name"; http://git-wip-us.apache.org/repos/asf/kafka/blob/a960faf5/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java index ba6956d..de9c2c9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java @@ -21,7 +21,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; @@ -47,10 +47,10 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; public class KStreamRepartitionJoinTest { - + private static final int NUM_BROKERS = 1; @ClassRule - public static final EmbeddedSingleNodeKafkaCluster CLUSTER = - new EmbeddedSingleNodeKafkaCluster(); + public static final EmbeddedKafkaCluster CLUSTER = + new EmbeddedKafkaCluster(NUM_BROKERS); private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS); http://git-wip-us.apache.org/repos/asf/kafka/blob/a960faf5/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index 974f109..02e8eb7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -15,24 +15,35 @@ package org.apache.kafka.streams.integration; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.streams.state.ReadOnlyWindowStore; +import org.apache.kafka.streams.state.StreamsMetadata; import org.apache.kafka.streams.state.WindowStoreIterator; +import org.apache.kafka.test.MockKeyValueMapper; import org.apache.kafka.test.TestUtils; +import org.apache.kafka.test.TestCondition; +import static org.junit.Assert.fail; +import static org.junit.Assert.assertTrue; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@ -43,24 +54,35 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; import java.util.Properties; import java.util.Set; +import java.util.Map; +import java.util.HashMap; import java.util.TreeSet; + import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; - public class QueryableStateIntegrationTest { - + private static final int NUM_BROKERS = 2; @ClassRule - public static final EmbeddedSingleNodeKafkaCluster CLUSTER = - new EmbeddedSingleNodeKafkaCluster(); + public static final EmbeddedKafkaCluster CLUSTER = + new EmbeddedKafkaCluster(NUM_BROKERS); private static final String STREAM_ONE = "stream-one"; - private static final String STREAM_TWO = "stream-two"; + private static final String STREAM_CONCURRENT = "stream-concurrent"; private static final String OUTPUT_TOPIC = "output"; - + private static final String OUTPUT_TOPIC_CONCURRENT = "output-concurrent"; + private static final String STREAM_THREE = "stream-three"; + private static final int NUM_PARTITIONS = NUM_BROKERS; + private static final int NUM_REPLICAS = NUM_BROKERS; + private static final long WINDOW_SIZE = 60000L; + private static final String OUTPUT_TOPIC_THREE = "output-three"; private Properties streamsConfiguration; - private KStreamBuilder builder; + private List<String> inputValues; + private Set<String> inputValuesKeys; private KafkaStreams kafkaStreams; private Comparator<KeyValue<String, String>> stringComparator; private Comparator<KeyValue<String, Long>> stringLongComparator; @@ -68,13 +90,15 @@ public class QueryableStateIntegrationTest { @BeforeClass public static void createTopics() { CLUSTER.createTopic(STREAM_ONE); - CLUSTER.createTopic(STREAM_TWO); + CLUSTER.createTopic(STREAM_CONCURRENT); + CLUSTER.createTopic(STREAM_THREE, NUM_PARTITIONS, NUM_REPLICAS); CLUSTER.createTopic(OUTPUT_TOPIC); + CLUSTER.createTopic(OUTPUT_TOPIC_CONCURRENT); + CLUSTER.createTopic(OUTPUT_TOPIC_THREE); } @Before public void before() throws IOException { - builder = new KStreamBuilder(); streamsConfiguration = new Properties(); final String applicationId = "queryable-state"; @@ -89,6 +113,7 @@ public class QueryableStateIntegrationTest { streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration .put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + stringComparator = new Comparator<KeyValue<String, String>>() { @Override @@ -105,6 +130,26 @@ public class QueryableStateIntegrationTest { return o1.key.compareTo(o2.key); } }; + inputValues = Arrays.asList("hello world", + "all streams lead to kafka", + "streams", + "kafka streams", + "the cat in the hat", + "green eggs and ham", + "that sam i am", + "up the creek without a paddle", + "run forest run", + "a tank full of gas", + "eat sleep rave repeat", + "one jolly sailor", + "king of the world"); + inputValuesKeys = new HashSet<>(); + for (String sentence : inputValues) { + String[] words = sentence.split("\\W+"); + for (String word : words) { + inputValuesKeys.add(word); + } + } } @After @@ -115,8 +160,216 @@ public class QueryableStateIntegrationTest { IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); } + + /** + * Creates a typical word count topology + * @param inputTopic + * @param outputTopic + * @param streamsConfiguration config + * @return + */ + private KafkaStreams createCountStream(String inputTopic, String outputTopic, Properties streamsConfiguration) { + KStreamBuilder builder = new KStreamBuilder(); + final Serde<String> stringSerde = Serdes.String(); + final KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, inputTopic); + + final KGroupedStream<String, String> groupedByWord = textLines + .flatMapValues(new ValueMapper<String, Iterable<String>>() { + @Override + public Iterable<String> apply(String value) { + return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); + } + }) + .groupBy(MockKeyValueMapper.<String, String>SelectValueMapper()); + + // Create a State Store for the all time word count + groupedByWord.count("word-count-store-" + inputTopic).to(Serdes.String(), Serdes.Long(), outputTopic); + + // Create a Windowed State Store that contains the word count for every 1 minute + groupedByWord.count(TimeWindows.of(WINDOW_SIZE), "windowed-word-count-store-" + inputTopic); + + return new KafkaStreams(builder, streamsConfiguration); + } + + private class StreamRunnable implements Runnable { + private final KafkaStreams myStream; + private boolean closed = false; + StreamRunnable(String inputTopic, String outputTopic, int queryPort) { + Properties props = (Properties) streamsConfiguration.clone(); + props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + queryPort); + this.myStream = createCountStream(inputTopic, outputTopic, props); + } + + @Override + public void run() { + this.myStream.start(); + + } + + public void close() { + if (!closed) { + this.myStream.close(); + closed = true; + } + } + + public boolean isClosed() { + return closed; + } + + public final KafkaStreams getStream() { + return myStream; + } + } + + private void verifyAllKVKeys(final StreamRunnable[] streamRunnables, final KafkaStreams streams, + final Set<String> keys, final String storeName) throws Exception { + for (final String key : keys) { + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer()); + if (metadata == null) { + return false; + } + final int index = metadata.hostInfo().port(); + final KafkaStreams streamsWithKey = streamRunnables[index].getStream(); + final ReadOnlyKeyValueStore<String, Long> store; + try { + store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); + } catch (IllegalStateException e) { + // Kafka Streams instance may have closed but rebalance hasn't happened + return false; + } + return store != null && store.get(key) != null; + } + }, 30000, "waiting for metadata, store and value to be non null"); + } + } + + + private void verifyAllWindowedKeys(final StreamRunnable[] streamRunnables, final KafkaStreams streams, + final Set<String> keys, final String storeName , + final Long from, final Long to) throws Exception { + for (final String key : keys) { + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer()); + if (metadata == null) { + return false; + } + final int index = metadata.hostInfo().port(); + final KafkaStreams streamsWithKey = streamRunnables[index].getStream(); + final ReadOnlyWindowStore<String, Long> store; + try { + store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>windowStore()); + } catch (IllegalStateException e) { + // Kafka Streams instance may have closed but rebalance hasn't happened + return false; + } + return store != null && store.fetch(key, from, to) != null; + } + }, 30000, "waiting for metadata, store and value to be non null"); + } + } + + + @Test + public void queryOnRebalance() throws Exception { + int numThreads = NUM_PARTITIONS; + StreamRunnable[] streamRunnables = new StreamRunnable[numThreads]; + Thread[] streamThreads = new Thread[numThreads]; + final int numIterations = 500000; + + // create concurrent producer + ProducerRunnable producerRunnable = new ProducerRunnable(STREAM_THREE, inputValues, numIterations); + Thread producerThread = new Thread(producerRunnable); + + // create three stream threads + for (int i = 0; i < numThreads; i++) { + streamRunnables[i] = new StreamRunnable(STREAM_THREE, OUTPUT_TOPIC_THREE, i); + streamThreads[i] = new Thread(streamRunnables[i]); + streamThreads[i].start(); + } + producerThread.start(); + + try { + waitUntilAtLeastNumRecordProcessed(OUTPUT_TOPIC_THREE, 1); + + for (int i = 0; i < numThreads; i++) { + verifyAllKVKeys(streamRunnables, streamRunnables[i].getStream(), inputValuesKeys, + "word-count-store-" + STREAM_THREE); + verifyAllWindowedKeys(streamRunnables, streamRunnables[i].getStream(), inputValuesKeys, + "windowed-word-count-store-" + STREAM_THREE, 0L, WINDOW_SIZE); + } + + // kill N-1 threads + for (int i = 1; i < numThreads; i++) { + streamRunnables[i].close(); + streamThreads[i].interrupt(); + streamThreads[i].join(); + } + + // query from the remaining thread + verifyAllKVKeys(streamRunnables, streamRunnables[0].getStream(), inputValuesKeys, + "word-count-store-" + STREAM_THREE); + verifyAllWindowedKeys(streamRunnables, streamRunnables[0].getStream(), inputValuesKeys, + "windowed-word-count-store-" + STREAM_THREE, 0L, WINDOW_SIZE); + } finally { + for (int i = 0; i < numThreads; i++) { + if (!streamRunnables[i].isClosed()) { + streamRunnables[i].close(); + streamThreads[i].interrupt(); + streamThreads[i].join(); + } + } + producerRunnable.shutdown(); + producerThread.interrupt(); + producerThread.join(); + } + } + + @Test + public void concurrentAccesses() throws Exception { + + final int numIterations = 500000; + + ProducerRunnable producerRunnable = new ProducerRunnable(STREAM_CONCURRENT, inputValues, numIterations); + Thread producerThread = new Thread(producerRunnable); + kafkaStreams = createCountStream(STREAM_CONCURRENT, OUTPUT_TOPIC_CONCURRENT, streamsConfiguration); + kafkaStreams.start(); + producerThread.start(); + + try { + waitUntilAtLeastNumRecordProcessed(OUTPUT_TOPIC_CONCURRENT, 1); + + final ReadOnlyKeyValueStore<String, Long> + keyValueStore = kafkaStreams.store("word-count-store-" + STREAM_CONCURRENT, QueryableStoreTypes.<String, Long>keyValueStore()); + + final ReadOnlyWindowStore<String, Long> windowStore = + kafkaStreams.store("windowed-word-count-store-" + STREAM_CONCURRENT, QueryableStoreTypes.<String, Long>windowStore()); + + + Map<String, Long> expectedWindowState = new HashMap<>(); + Map<String, Long> expectedCount = new HashMap<>(); + while (producerRunnable.getCurrIteration() < numIterations) { + verifyGreaterOrEqual(inputValuesKeys.toArray(new String[inputValuesKeys.size()]), expectedWindowState, + expectedCount, windowStore, keyValueStore, false); + } + // finally check if all keys are there + verifyGreaterOrEqual(inputValuesKeys.toArray(new String[inputValuesKeys.size()]), expectedWindowState, + expectedCount, windowStore, keyValueStore, true); + } finally { + producerRunnable.shutdown(); + producerThread.interrupt(); + producerThread.join(); + } + } + @Test public void shouldBeAbleToQueryState() throws Exception { + KStreamBuilder builder = new KStreamBuilder(); final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"}; final Set<KeyValue<String, String>> batch1 = new TreeSet<>(stringComparator); @@ -147,11 +400,11 @@ public class QueryableStateIntegrationTest { // Non Windowed s1.groupByKey().count("my-count").to(Serdes.String(), Serdes.Long(), OUTPUT_TOPIC); - s1.groupByKey().count(TimeWindows.of(60000L), "windowed-count"); + s1.groupByKey().count(TimeWindows.of(WINDOW_SIZE), "windowed-count"); kafkaStreams = new KafkaStreams(builder, streamsConfiguration); kafkaStreams.start(); - waitUntilAtLeastOneRecordProcessed(); + waitUntilAtLeastNumRecordProcessed(OUTPUT_TOPIC, 1); final ReadOnlyKeyValueStore<String, Long> myCount = kafkaStreams.store("my-count", QueryableStoreTypes.<String, Long>keyValueStore()); @@ -225,7 +478,74 @@ public class QueryableStateIntegrationTest { assertThat(countState, equalTo(expectedCount)); } - private void waitUntilAtLeastOneRecordProcessed() throws InterruptedException { + /** + * Verify that the new count is greater than or equal to the previous count. + * Note: this method changes the values in expectedWindowState and expectedCount + * @param keys All the keys we ever expect to find + * @param expectedWindowedCount Expected windowed count + * @param expectedCount Expected count + * @param windowStore Window Store + * @param keyValueStore Key-value store + * @param failIfKeyNotFound if true, tests fails if an expected key is not found in store. If false, + * the method merely inserts the new found key into the list of + * expected keys. + * @throws InterruptedException + */ + private void verifyGreaterOrEqual(final String[] keys, + Map<String, Long> expectedWindowedCount, + Map<String, Long> expectedCount, + final ReadOnlyWindowStore<String, Long> windowStore, + final ReadOnlyKeyValueStore<String, Long> keyValueStore, + boolean failIfKeyNotFound) + throws InterruptedException { + final Map<String, Long> windowState = new HashMap<>(); + final Map<String, Long> countState = new HashMap<>(); + + for (String key : keys) { + Map<String, Long> map = fetchMap(windowStore, key); + if (map.equals(Collections.<String, Long>emptyMap()) && failIfKeyNotFound) { + fail("Key not found " + key); + } + windowState.putAll(map); + final Long value = keyValueStore.get(key); + if (value != null) { + countState.put(key, value); + } else { + if (failIfKeyNotFound) { + fail("Key not found " + key); + } + } + } + + for (Map.Entry<String, Long> actualWindowStateEntry : windowState.entrySet()) { + if (expectedWindowedCount.containsKey(actualWindowStateEntry.getKey())) { + Long expectedValue = expectedWindowedCount.get(actualWindowStateEntry.getKey()); + assertTrue(actualWindowStateEntry.getValue() >= expectedValue); + } else { + if (failIfKeyNotFound) { + fail("Key not found " + actualWindowStateEntry.getKey()); + } + } + // return this for next round of comparisons + expectedWindowedCount.put(actualWindowStateEntry.getKey(), actualWindowStateEntry.getValue()); + } + + for (Map.Entry<String, Long> actualCountStateEntry : countState.entrySet()) { + if (expectedCount.containsKey(actualCountStateEntry.getKey())) { + Long expectedValue = expectedCount.get(actualCountStateEntry.getKey()); + assertTrue(actualCountStateEntry.getValue() >= expectedValue); + } else { + if (failIfKeyNotFound) { + fail("Key not found " + actualCountStateEntry.getKey()); + } + } + // return this for next round of comparisons + expectedCount.put(actualCountStateEntry.getKey(), actualCountStateEntry.getValue()); + } + + } + + private void waitUntilAtLeastNumRecordProcessed(String topic, int numRecs) throws InterruptedException { final Properties config = new Properties(); config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "queryable-state-consumer"); @@ -235,8 +555,8 @@ public class QueryableStateIntegrationTest { config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); IntegrationTestUtils.waitUntilMinValuesRecordsReceived(config, - OUTPUT_TOPIC, - 1, + topic, + numRecs, 60 * 1000); } @@ -252,5 +572,65 @@ public class QueryableStateIntegrationTest { return Collections.emptySet(); } + private Map<String, Long> fetchMap(final ReadOnlyWindowStore<String, Long> store, + final String key) { + + final WindowStoreIterator<Long> fetch = store.fetch(key, 0, System.currentTimeMillis()); + if (fetch.hasNext()) { + KeyValue<Long, Long> next = fetch.next(); + return Collections.singletonMap(key, next.value); + } + return Collections.emptyMap(); + } + + + /** + * A class that periodically produces records in a separate thread + */ + private class ProducerRunnable implements Runnable { + private String topic; + private final List<String> inputValues; + private final int numIterations; + private int currIteration = 0; + boolean shutdown = false; + + ProducerRunnable(String topic, List<String> inputValues, int numIterations) { + this.topic = topic; + this.inputValues = inputValues; + this.numIterations = numIterations; + } + + private synchronized void incrementInteration() { + currIteration++; + } + public synchronized int getCurrIteration() { + return currIteration; + } + public synchronized void shutdown() { + shutdown = true; + } + + @Override + public void run() { + Properties producerConfig = new Properties(); + producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); + producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); + producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + final KafkaProducer<String, String> + producer = + new KafkaProducer<>(producerConfig, new StringSerializer(), new StringSerializer()); + + while (getCurrIteration() < numIterations && !shutdown) { + for (int i = 0; i < inputValues.size(); i++) { + producer.send(new ProducerRecord<String, String>(topic, inputValues.get(i))); + } + incrementInteration(); + } + } + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/a960faf5/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index 51fa06a..dd43af6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -28,7 +28,7 @@ import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; @@ -67,8 +67,9 @@ import static org.junit.Assert.fail; */ public class RegexSourceIntegrationTest { + private static final int NUM_BROKERS = 1; @ClassRule - public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster(); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); private static final String TOPIC_1 = "topic-1"; private static final String TOPIC_2 = "topic-2"; http://git-wip-us.apache.org/repos/asf/kafka/blob/a960faf5/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java index 155ec10..79ec117 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java @@ -29,7 +29,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; @@ -58,8 +58,9 @@ import static org.hamcrest.MatcherAssert.assertThat; * Tests local state store and global application cleanup. */ public class ResetIntegrationTest { + private static final int NUM_BROKERS = 1; @ClassRule - public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster(); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); private static final String APP_ID = "cleanup-integration-test"; private static final String INPUT_TOPIC = "inputTopic"; http://git-wip-us.apache.org/repos/asf/kafka/blob/a960faf5/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java new file mode 100644 index 0000000..8e9101d --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration.utils; + +import kafka.server.KafkaConfig$; +import kafka.zk.EmbeddedZookeeper; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Properties; + +/** + * Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and 1 Kafka broker. + */ +public class EmbeddedKafkaCluster extends ExternalResource { + + private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); + private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected + private EmbeddedZookeeper zookeeper = null; + private final KafkaEmbedded[] brokers; + + public EmbeddedKafkaCluster(int numBrokers) { + this.brokers = new KafkaEmbedded[numBrokers]; + } + + /** + * Creates and starts a Kafka cluster. + */ + public void start() throws IOException, InterruptedException { + Properties brokerConfig = new Properties(); + + log.debug("Initiating embedded Kafka cluster startup"); + log.debug("Starting a ZooKeeper instance"); + zookeeper = new EmbeddedZookeeper(); + log.debug("ZooKeeper instance is running at {}", zKConnectString()); + brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString()); + brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), DEFAULT_BROKER_PORT); + brokerConfig.put(KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true); + brokerConfig.put(KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L); + brokerConfig.put(KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), 0); + + for (int i = 0; i < this.brokers.length; i++) { + brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), i); + log.debug("Starting a Kafka instance on port {} ...", brokerConfig.getProperty(KafkaConfig$.MODULE$.PortProp())); + brokers[i] = new KafkaEmbedded(brokerConfig); + + log.debug("Kafka instance is running at {}, connected to ZooKeeper at {}", + brokers[i].brokerList(), brokers[i].zookeeperConnect()); + } + } + + /** + * Stop the Kafka cluster. + */ + public void stop() { + for (int i = 0; i < this.brokers.length; i++) { + brokers[i].stop(); + } + zookeeper.shutdown(); + } + + /** + * The ZooKeeper connection string aka `zookeeper.connect` in `hostnameOrIp:port` format. + * Example: `127.0.0.1:2181`. + * + * You can use this to e.g. tell Kafka brokers how to connect to this instance. + */ + public String zKConnectString() { + return "localhost:" + zookeeper.port(); + } + + /** + * This cluster's `bootstrap.servers` value. Example: `127.0.0.1:9092`. + * + * You can use this to tell Kafka producers how to connect to this cluster. + */ + public String bootstrapServers() { + return brokers[0].brokerList(); + } + + protected void before() throws Throwable { + start(); + } + + protected void after() { + stop(); + } + + /** + * Create a Kafka topic with 1 partition and a replication factor of 1. + * + * @param topic The name of the topic. + */ + public void createTopic(String topic) { + createTopic(topic, 1, 1, new Properties()); + } + + /** + * Create a Kafka topic with the given parameters. + * + * @param topic The name of the topic. + * @param partitions The number of partitions for this topic. + * @param replication The replication factor for (the partitions of) this topic. + */ + public void createTopic(String topic, int partitions, int replication) { + createTopic(topic, partitions, replication, new Properties()); + } + + /** + * Create a Kafka topic with the given parameters. + * + * @param topic The name of the topic. + * @param partitions The number of partitions for this topic. + * @param replication The replication factor for (partitions of) this topic. + * @param topicConfig Additional topic-level configuration settings. + */ + public void createTopic(String topic, + int partitions, + int replication, + Properties topicConfig) { + brokers[0].createTopic(topic, partitions, replication, topicConfig); + } + + public void deleteTopic(String topic) { + brokers[0].deleteTopic(topic); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/a960faf5/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java deleted file mode 100644 index 92290f5..0000000 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java +++ /dev/null @@ -1,135 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.integration.utils; - -import kafka.server.KafkaConfig$; -import kafka.zk.EmbeddedZookeeper; -import org.junit.rules.ExternalResource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Properties; - -/** - * Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and 1 Kafka broker. - */ -public class EmbeddedSingleNodeKafkaCluster extends ExternalResource { - - private static final Logger log = LoggerFactory.getLogger(EmbeddedSingleNodeKafkaCluster.class); - private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected - private EmbeddedZookeeper zookeeper = null; - private KafkaEmbedded broker = null; - - /** - * Creates and starts a Kafka cluster. - */ - public void start() throws IOException, InterruptedException { - Properties brokerConfig = new Properties(); - - log.debug("Initiating embedded Kafka cluster startup"); - log.debug("Starting a ZooKeeper instance"); - zookeeper = new EmbeddedZookeeper(); - log.debug("ZooKeeper instance is running at {}", zKConnectString()); - brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString()); - brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), DEFAULT_BROKER_PORT); - brokerConfig.put(KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true); - brokerConfig.put(KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L); - brokerConfig.put(KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), 0); - - log.debug("Starting a Kafka instance on port {} ...", brokerConfig.getProperty(KafkaConfig$.MODULE$.PortProp())); - broker = new KafkaEmbedded(brokerConfig); - - log.debug("Kafka instance is running at {}, connected to ZooKeeper at {}", - broker.brokerList(), broker.zookeeperConnect()); - } - - /** - * Stop the Kafka cluster. - */ - public void stop() { - broker.stop(); - zookeeper.shutdown(); - } - - /** - * The ZooKeeper connection string aka `zookeeper.connect` in `hostnameOrIp:port` format. - * Example: `127.0.0.1:2181`. - * - * You can use this to e.g. tell Kafka brokers how to connect to this instance. - */ - public String zKConnectString() { - return "localhost:" + zookeeper.port(); - } - - /** - * This cluster's `bootstrap.servers` value. Example: `127.0.0.1:9092`. - * - * You can use this to tell Kafka producers how to connect to this cluster. - */ - public String bootstrapServers() { - return broker.brokerList(); - } - - protected void before() throws Throwable { - start(); - } - - protected void after() { - stop(); - } - - /** - * Create a Kafka topic with 1 partition and a replication factor of 1. - * - * @param topic The name of the topic. - */ - public void createTopic(String topic) { - createTopic(topic, 1, 1, new Properties()); - } - - /** - * Create a Kafka topic with the given parameters. - * - * @param topic The name of the topic. - * @param partitions The number of partitions for this topic. - * @param replication The replication factor for (the partitions of) this topic. - */ - public void createTopic(String topic, int partitions, int replication) { - createTopic(topic, partitions, replication, new Properties()); - } - - /** - * Create a Kafka topic with the given parameters. - * - * @param topic The name of the topic. - * @param partitions The number of partitions for this topic. - * @param replication The replication factor for (partitions of) this topic. - * @param topicConfig Additional topic-level configuration settings. - */ - public void createTopic(String topic, - int partitions, - int replication, - Properties topicConfig) { - broker.createTopic(topic, partitions, replication, topicConfig); - } - - public void deleteTopic(String topic) { - broker.deleteTopic(topic); - } -}
