http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java deleted file mode 100644 index 8898284..0000000 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.streams.kafka; - -import static java.util.Objects.requireNonNull; -import static org.junit.Assert.assertEquals; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; -import java.util.Set; -import java.util.UUID; - -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.processor.TopologyBuilder; -import org.apache.rya.api.model.VisibilityStatement; -import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements; -import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer; -import org.apache.rya.test.kafka.KafkaTestInstanceRule; - -import com.google.common.collect.Sets; - -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - -/** - * A set of utility functions that are useful when writing tests against a Kafka instance. - */ -@DefaultAnnotation(NonNull.class) -public final class KafkaTestUtil { - - private KafkaTestUtil() { } - - /** - * Create a {@link Producer} that is able to write to a topic that is hosted within an embedded instance of Kafka. - * - * @param kafka - The Kafka rule used to connect to the embedded Kafkfa instance. (not null) - * @param keySerializerClass - Serializes the keys. (not null) - * @param valueSerializerClass - Serializes the values. (not null) - * @return A {@link Producer} that can be used to write records to a topic. - */ - public static <K, V> Producer<K, V> makeProducer( - final KafkaTestInstanceRule kafka, - final Class<? extends Serializer<K>> keySerializerClass, - final Class<? extends Serializer<V>> valueSerializerClass) { - requireNonNull(kafka); - requireNonNull(keySerializerClass); - requireNonNull(valueSerializerClass); - - final Properties props = kafka.createBootstrapServerConfig(); - props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass.getName()); - props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass.getName()); - return new KafkaProducer<>(props); - } - - /** - * Create a {@link Consumer} that has a unique group ID and reads everything from a topic that is hosted within an - * embedded instance of Kafka starting at the earliest point by default. - * - * @param kafka - The Kafka rule used to connect to the embedded Kafkfa instance. (not null) - * @param keyDeserializerClass - Deserializes the keys. (not null) - * @param valueDeserializerClass - Deserializes the values. (not null) - * @return A {@link Consumer} that can be used to read records from a topic. - */ - public static <K, V> Consumer<K, V> fromStartConsumer( - final KafkaTestInstanceRule kafka, - final Class<? extends Deserializer<K>> keyDeserializerClass, - final Class<? extends Deserializer<V>> valueDeserializerClass) { - requireNonNull(kafka); - requireNonNull(keyDeserializerClass); - requireNonNull(valueDeserializerClass); - - final Properties props = kafka.createBootstrapServerConfig(); - props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); - props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass.getName()); - props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass.getName()); - return new KafkaConsumer<>(props); - } - - /** - * Polls a {@link Consumer> until it has either polled too many times without hitting the target number - * of results, or it hits the target number of results. - * - * @param pollMs - How long each poll could take. - * @param pollIterations - The maximum number of polls that will be attempted. - * @param targetSize - The number of results to read before stopping. - * @param consumer - The consumer that will be polled. - * @return The results that were read frmo the consumer. - * @throws Exception If the poll failed. - */ - public static <K, V> List<V> pollForResults( - final int pollMs, - final int pollIterations, - final int targetSize, - final Consumer<K, V> consumer) throws Exception { - requireNonNull(consumer); - - final List<V> values = new ArrayList<>(); - - int i = 0; - while(values.size() < targetSize && i < pollIterations) { - for(final ConsumerRecord<K, V> record : consumer.poll(pollMs)) { - values.add( record.value() ); - } - i++; - } - - return values; - } - - /** - * Runs a Kafka Streams topology, loads statements into the input topic, read the binding sets that come out of - * the results topic, and ensures the expected results match the read results. - * - * @param <T> The type of value that will be consumed from the results topic. - * @param kafka - The embedded Kafka instance that is being tested with. (not null) - * @param statementsTopic - The topic statements will be written to. (not null) - * @param resultsTopic - The topic results will be read from. (not null) - * @param builder - The streams topology that will be executed. (not null) - * @param startupMs - How long to wait for the topology to start before writing the statements. - * @param statements - The statements that will be loaded into the topic. (not null) - * @param expected - The expected results. (not null) - * @param expectedDeserializerClass - The class of the deserializer that will be used when reading - * values from the results topic. (not null) - * @throws Exception If any exception was thrown while running the test. - */ - public static <T> void runStreamProcessingTest( - final KafkaTestInstanceRule kafka, - final String statementsTopic, - final String resultsTopic, - final TopologyBuilder builder, - final int startupMs, - final List<VisibilityStatement> statements, - final Set<T> expected, - final Class<? extends Deserializer<T>> expectedDeserializerClass) throws Exception { - requireNonNull(kafka); - requireNonNull(statementsTopic); - requireNonNull(resultsTopic); - requireNonNull(builder); - requireNonNull(statements); - requireNonNull(expected); - requireNonNull(expectedDeserializerClass); - - // Explicitly create the topics that are being used. - kafka.createTopic(statementsTopic); - kafka.createTopic(resultsTopic); - - // Start the streams program. - final Properties props = kafka.createBootstrapServerConfig(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, "StatementPatternProcessorIT"); - - final KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(props)); - streams.cleanUp(); - try { - streams.start(); - - // Wait for the streams application to start. Streams only see data after their consumers are connected. - Thread.sleep(startupMs); - - // Load the statements into the input topic. - try(Producer<String, VisibilityStatement> producer = KafkaTestUtil.makeProducer( - kafka, StringSerializer.class, VisibilityStatementSerializer.class)) { - new KafkaLoadStatements(statementsTopic, producer).fromCollection(statements); - } - - // Wait for the final results to appear in the output topic and verify the expected Binding Sets were found. - try(Consumer<String, T> consumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, expectedDeserializerClass)) { - // Register the topic. - consumer.subscribe(Arrays.asList(resultsTopic)); - - // Poll for the result. - final Set<T> results = Sets.newHashSet( KafkaTestUtil.pollForResults(500, 6, expected.size(), consumer) ); - - // Show the correct binding sets results from the job. - assertEquals(expected, results); - } - } finally { - streams.close(); - } - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java new file mode 100644 index 0000000..b7e2be2 --- /dev/null +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RyaStreamsTestUtil.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka; + +import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.rya.api.model.VisibilityStatement; +import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements; +import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.apache.rya.test.kafka.KafkaTestUtil; + +import com.google.common.collect.Sets; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Utility functions that make it easier to test Rya Streams applications. + */ +@DefaultAnnotation(NonNull.class) +public class RyaStreamsTestUtil { + + /** + * Runs a Kafka Streams topology, loads statements into the input topic, read the binding sets that come out of + * the results topic, and ensures the expected results match the read results. + * + * @param <T> The type of value that will be consumed from the results topic. + * @param kafka - The embedded Kafka instance that is being tested with. (not null) + * @param statementsTopic - The topic statements will be written to. (not null) + * @param resultsTopic - The topic results will be read from. (not null) + * @param builder - The streams topology that will be executed. (not null) + * @param startupMs - How long to wait for the topology to start before writing the statements. + * @param statements - The statements that will be loaded into the topic. (not null) + * @param expected - The expected results. (not null) + * @param expectedDeserializerClass - The class of the deserializer that will be used when reading + * values from the results topic. (not null) + * @throws Exception If any exception was thrown while running the test. + */ + public static <T> void runStreamProcessingTest( + final KafkaTestInstanceRule kafka, + final String statementsTopic, + final String resultsTopic, + final TopologyBuilder builder, + final int startupMs, + final List<VisibilityStatement> statements, + final Set<T> expected, + final Class<? extends Deserializer<T>> expectedDeserializerClass) throws Exception { + requireNonNull(kafka); + requireNonNull(statementsTopic); + requireNonNull(resultsTopic); + requireNonNull(builder); + requireNonNull(statements); + requireNonNull(expected); + requireNonNull(expectedDeserializerClass); + + // Explicitly create the topics that are being used. + kafka.createTopic(statementsTopic); + kafka.createTopic(resultsTopic); + + // Start the streams program. + final Properties props = kafka.createBootstrapServerConfig(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "StatementPatternProcessorIT"); + + final KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(props)); + streams.cleanUp(); + try { + streams.start(); + + // Wait for the streams application to start. Streams only see data after their consumers are connected. + Thread.sleep(startupMs); + + // Load the statements into the input topic. + try(Producer<String, VisibilityStatement> producer = KafkaTestUtil.makeProducer( + kafka, StringSerializer.class, VisibilityStatementSerializer.class)) { + new KafkaLoadStatements(statementsTopic, producer).fromCollection(statements); + } + + // Wait for the final results to appear in the output topic and verify the expected Binding Sets were found. + try(Consumer<String, T> consumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, expectedDeserializerClass)) { + // Register the topic. + consumer.subscribe(Arrays.asList(resultsTopic)); + + // Poll for the result. + final Set<T> results = Sets.newHashSet( KafkaTestUtil.pollForResults(500, 6, expected.size(), consumer) ); + + // Show the correct binding sets results from the job. + assertEquals(expected, results); + } + } finally { + streams.close(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java index 67889e9..c740ba2 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java @@ -31,10 +31,10 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.rya.api.model.VisibilityBindingSet; import org.apache.rya.streams.api.entity.QueryResultStream; import org.apache.rya.streams.api.interactor.GetQueryResultStream; -import org.apache.rya.streams.kafka.KafkaTestUtil; import org.apache.rya.streams.kafka.KafkaTopics; import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer; import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.apache.rya.test.kafka.KafkaTestUtil; import org.junit.Rule; import org.junit.Test; import org.openrdf.model.ValueFactory; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java index b48addd..7bfa560 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java @@ -31,11 +31,10 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.rya.api.model.VisibilityStatement; -import org.apache.rya.streams.kafka.KafkaTestUtil; import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer; -import org.apache.rya.test.kafka.KafkaITBase; import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.apache.rya.test.kafka.KafkaTestUtil; import org.junit.Rule; import org.junit.Test; import org.openrdf.model.ValueFactory; @@ -45,7 +44,7 @@ import org.openrdf.rio.UnsupportedRDFormatException; /** * Integration tests the {@link KafkaLoadStatements} command */ -public class KafkaLoadStatementsIT extends KafkaITBase { +public class KafkaLoadStatementsIT { private static final Path TURTLE_FILE = Paths.get("src/test/resources/statements.ttl"); private static final Path INVALID = Paths.get("src/test/resources/invalid.INVALID"); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java new file mode 100644 index 0000000..33b3a92 --- /dev/null +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaRunQueryIT.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.interactor; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.model.VisibilityStatement; +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.exception.RyaStreamsException; +import org.apache.rya.streams.api.interactor.LoadStatements; +import org.apache.rya.streams.api.interactor.RunQuery; +import org.apache.rya.streams.api.queries.InMemoryQueryChangeLog; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; +import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer; +import org.apache.rya.streams.kafka.topology.TopologyFactory; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.apache.rya.test.kafka.KafkaTestUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.impl.MapBindingSet; + +import com.google.common.collect.Lists; + +/** + * Integration tests the methods of {@link KafkaRunQuery}. + */ +public class KafkaRunQueryIT { + + @Rule + public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); + + private Producer<String, VisibilityStatement> producer; + private Consumer<String, VisibilityBindingSet> consumer; + + @Before + public void setup() { + producer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityStatementSerializer.class); + consumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, VisibilityBindingSetDeserializer.class); + } + + @After + public void cleanup() { + producer.close(); + consumer.close(); + } + + @Test + public void runQuery() throws Exception { + // Setup some constant that will be used through the test. + final String ryaInstance = UUID.randomUUID().toString(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + + // This query is completely in memory, so it doesn't need to be closed. + final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog() ); + + // Add the query to the query repository. + final StreamsQuery sQuery = queries.add("SELECT * WHERE { ?person <urn:worksAt> ?business . }"); + final UUID queryId = sQuery.getQueryId(); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // The thread that will run the tested interactor. + final Thread testThread = new Thread() { + @Override + public void run() { + final RunQuery runQuery = new KafkaRunQuery( + kafka.getKafkaHostname(), + kafka.getKafkaPort(), + statementsTopic, + resultsTopic, + queries, + new TopologyFactory()); + try { + runQuery.run(queryId); + } catch (final RyaStreamsException e) { + // Do nothing. Test will still fail because the expected results will be missing. + } + } + }; + + // Create the topics. + kafka.createTopic(statementsTopic); + kafka.createTopic(resultsTopic); + + // Create the statements that will be loaded. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add(new VisibilityStatement(vf.createStatement( + vf.createURI("urn:Alice"), + vf.createURI("urn:worksAt"), + vf.createURI("urn:BurgerJoint")), "a")); + statements.add(new VisibilityStatement(vf.createStatement( + vf.createURI("urn:Bob"), + vf.createURI("urn:worksAt"), + vf.createURI("urn:TacoShop")), "a")); + statements.add(new VisibilityStatement(vf.createStatement( + vf.createURI("urn:Charlie"), + vf.createURI("urn:worksAt"), + vf.createURI("urn:TacoShop")), "a")); + + // Create the expected results. + final List<VisibilityBindingSet> expected = new ArrayList<>(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("business", vf.createURI("urn:BurgerJoint")); + expected.add(new VisibilityBindingSet(bs, "a")); + bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Bob")); + bs.addBinding("business", vf.createURI("urn:TacoShop")); + expected.add(new VisibilityBindingSet(bs, "a")); + bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Charlie")); + bs.addBinding("business", vf.createURI("urn:TacoShop")); + expected.add(new VisibilityBindingSet(bs, "a")); + + // Execute the test. This will result in a set of results that were read from the results topic. + final List<VisibilityBindingSet> results; + try { + // Wait for the program to start. + testThread.start(); + Thread.sleep(2000); + + // Write some statements to the program. + final LoadStatements loadStatements = new KafkaLoadStatements(statementsTopic, producer); + loadStatements.fromCollection(statements); + + // Read the output of the streams program. + consumer.subscribe( Lists.newArrayList(resultsTopic) ); + results = KafkaTestUtil.pollForResults(500, 6, 3, consumer); + } finally { + // Tear down the test. + testThread.interrupt(); + testThread.join(3000); + } + + // Show the read results matched the expected ones. + assertEquals(expected, results); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java index 3e0e64d..80b6e42 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java @@ -28,8 +28,8 @@ import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.rya.api.function.projection.RandomUUIDFactory; import org.apache.rya.api.model.VisibilityBindingSet; import org.apache.rya.api.model.VisibilityStatement; -import org.apache.rya.streams.kafka.KafkaTestUtil; import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.RyaStreamsTestUtil; import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor; import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; import org.apache.rya.streams.kafka.topology.TopologyFactory; @@ -75,7 +75,7 @@ public class StatementPatternProcessorIT { expected.add( new VisibilityBindingSet(bs, "a") ); // Run the test. - KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); } @Test @@ -113,7 +113,7 @@ public class StatementPatternProcessorIT { expected.add( new VisibilityBindingSet(bs, "a|b") ); // Run the test. - KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); } @Test @@ -147,7 +147,7 @@ public class StatementPatternProcessorIT { expected.add( new VisibilityBindingSet(bs, "a") ); // Run the test. - KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); } @Test @@ -190,6 +190,6 @@ public class StatementPatternProcessorIT { expected.add(new VisibilityBindingSet(bs, "a")); // Run the test. - KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java index 0348dcd..fb5305f 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java @@ -28,8 +28,8 @@ import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.rya.api.function.projection.RandomUUIDFactory; import org.apache.rya.api.model.VisibilityBindingSet; import org.apache.rya.api.model.VisibilityStatement; -import org.apache.rya.streams.kafka.KafkaTestUtil; import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.RyaStreamsTestUtil; import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor; import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; import org.apache.rya.streams.kafka.topology.TopologyFactory; @@ -81,6 +81,6 @@ public class FilterProcessorIT { expected.add( new VisibilityBindingSet(bs, "a") ); // Run the test. - KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java index b137a9a..51bb0ae 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java @@ -29,8 +29,8 @@ import org.apache.rya.api.function.join.NaturalJoin; import org.apache.rya.api.function.projection.RandomUUIDFactory; import org.apache.rya.api.model.VisibilityBindingSet; import org.apache.rya.api.model.VisibilityStatement; -import org.apache.rya.streams.kafka.KafkaTestUtil; import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.RyaStreamsTestUtil; import org.apache.rya.streams.kafka.processors.ProcessorResult; import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult; import org.apache.rya.streams.kafka.processors.join.JoinProcessorSupplier.JoinProcessor; @@ -111,7 +111,7 @@ public class JoinProcessorIT { expected.add( new VisibilityBindingSet(bs, "c&(b|c)") ); // Run the test. - KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); } @Test @@ -162,7 +162,7 @@ public class JoinProcessorIT { expected.add( new VisibilityBindingSet(bs, "c&(b|c)") ); // Run the test. - KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); } @Test @@ -219,7 +219,7 @@ public class JoinProcessorIT { expected.add( new VisibilityBindingSet(bs, "a&c") ); // Run the test. - KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); } @Test @@ -260,7 +260,7 @@ public class JoinProcessorIT { expected.add( new VisibilityBindingSet(bs, "a") ); // Run the test. - KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 3000, statements, expected, VisibilityBindingSetDeserializer.class); + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 3000, statements, expected, VisibilityBindingSetDeserializer.class); } @Test @@ -311,6 +311,6 @@ public class JoinProcessorIT { expected.add( new VisibilityBindingSet(bs, "c") ); // Run the test. - KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java index d71577b..c96919c 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java @@ -26,8 +26,8 @@ import java.util.UUID; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.rya.api.model.VisibilityStatement; -import org.apache.rya.streams.kafka.KafkaTestUtil; import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.RyaStreamsTestUtil; import org.apache.rya.streams.kafka.processors.projection.MultiProjectionProcessorSupplier.MultiProjectionProcessor; import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; import org.apache.rya.streams.kafka.topology.TopologyFactory; @@ -87,6 +87,6 @@ public class MultiProjectionProcessorIT { expected.add(new VisibilityStatement(vf.createStatement(blankNode, vf.createURI("urn:location"), vf.createURI("urn:corner1")), "a")); // Run the test. - KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityStatementDeserializer.class); + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityStatementDeserializer.class); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java index bc5f115..63c2cc7 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java @@ -28,8 +28,8 @@ import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.rya.api.function.projection.RandomUUIDFactory; import org.apache.rya.api.model.VisibilityBindingSet; import org.apache.rya.api.model.VisibilityStatement; -import org.apache.rya.streams.kafka.KafkaTestUtil; import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.RyaStreamsTestUtil; import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor; import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; import org.apache.rya.streams.kafka.topology.TopologyFactory; @@ -80,6 +80,6 @@ public class ProjectionProcessorIT { expectedBs.addBinding("otherPerson", vf.createURI("urn:Bob")); expected.add(new VisibilityBindingSet(expectedBs, "a")); - KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, Sets.newHashSet(expected), VisibilityBindingSetDeserializer.class); + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, Sets.newHashSet(expected), VisibilityBindingSetDeserializer.class); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java index ff2b59b..04c81ed 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java @@ -33,11 +33,11 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.rya.streams.api.queries.ChangeLogEntry; import org.apache.rya.streams.api.queries.QueryChange; import org.apache.rya.streams.api.queries.QueryChangeLog.QueryChangeLogException; -import org.apache.rya.streams.kafka.KafkaTestUtil; import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer; import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; import org.apache.rya.test.kafka.KafkaITBase; import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.apache.rya.test.kafka.KafkaTestUtil; import org.junit.After; import org.junit.Before; import org.junit.Rule; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetKafkaIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetKafkaIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetKafkaIT.java index f9129ff..70cba1c 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetKafkaIT.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetKafkaIT.java @@ -29,8 +29,8 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.rya.api.model.VisibilityBindingSet; -import org.apache.rya.streams.kafka.KafkaTestUtil; import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.apache.rya.test.kafka.KafkaTestUtil; import org.junit.Rule; import org.junit.Test; import org.openrdf.model.ValueFactory; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementKafkaIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementKafkaIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementKafkaIT.java index b85eb0c..9e85f52 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementKafkaIT.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementKafkaIT.java @@ -29,8 +29,8 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.rya.api.model.VisibilityStatement; -import org.apache.rya.streams.kafka.KafkaTestUtil; import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.apache.rya.test.kafka.KafkaTestUtil; import org.junit.Rule; import org.junit.Test; import org.openrdf.model.ValueFactory; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/test/kafka/pom.xml ---------------------------------------------------------------------- diff --git a/test/kafka/pom.xml b/test/kafka/pom.xml index 44773a7..8492629 100644 --- a/test/kafka/pom.xml +++ b/test/kafka/pom.xml @@ -70,6 +70,11 @@ under the License. </exclusion> </exclusions> </dependency> + + <dependency> + <groupId>com.github.stephenc.findbugs</groupId> + <artifactId>findbugs-annotations</artifactId> + </dependency> <!-- Testing dependencies. --> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java ---------------------------------------------------------------------- diff --git a/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java index f76fa2b..252c288 100644 --- a/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java +++ b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java @@ -105,6 +105,13 @@ public class KafkaTestInstanceRule extends ExternalResource { } /** + * @return The hostnames of the Zookeeper servers. + */ + public String getZookeeperServers() { + return kafkaInstance.getZookeeperConnect(); + } + + /** * @return The hostname of the Kafka Broker. */ public String getKafkaHostname() { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/94423229/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestUtil.java ---------------------------------------------------------------------- diff --git a/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestUtil.java b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestUtil.java new file mode 100644 index 0000000..4b41f1a --- /dev/null +++ b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestUtil.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.test.kafka; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.UUID; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A set of utility functions that are useful when writing tests against a Kafka instance. + */ +@DefaultAnnotation(NonNull.class) +public final class KafkaTestUtil { + + private KafkaTestUtil() { } + + /** + * Create a {@link Producer} that is able to write to a topic that is hosted within an embedded instance of Kafka. + * + * @param kafka - The Kafka rule used to connect to the embedded Kafkfa instance. (not null) + * @param keySerializerClass - Serializes the keys. (not null) + * @param valueSerializerClass - Serializes the values. (not null) + * @return A {@link Producer} that can be used to write records to a topic. + */ + public static <K, V> Producer<K, V> makeProducer( + final KafkaTestInstanceRule kafka, + final Class<? extends Serializer<K>> keySerializerClass, + final Class<? extends Serializer<V>> valueSerializerClass) { + requireNonNull(kafka); + requireNonNull(keySerializerClass); + requireNonNull(valueSerializerClass); + + final Properties props = kafka.createBootstrapServerConfig(); + props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass.getName()); + props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass.getName()); + return new KafkaProducer<>(props); + } + + /** + * Create a {@link Consumer} that has a unique group ID and reads everything from a topic that is hosted within an + * embedded instance of Kafka starting at the earliest point by default. + * + * @param kafka - The Kafka rule used to connect to the embedded Kafkfa instance. (not null) + * @param keyDeserializerClass - Deserializes the keys. (not null) + * @param valueDeserializerClass - Deserializes the values. (not null) + * @return A {@link Consumer} that can be used to read records from a topic. + */ + public static <K, V> Consumer<K, V> fromStartConsumer( + final KafkaTestInstanceRule kafka, + final Class<? extends Deserializer<K>> keyDeserializerClass, + final Class<? extends Deserializer<V>> valueDeserializerClass) { + requireNonNull(kafka); + requireNonNull(keyDeserializerClass); + requireNonNull(valueDeserializerClass); + + final Properties props = kafka.createBootstrapServerConfig(); + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); + props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass.getName()); + props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass.getName()); + return new KafkaConsumer<>(props); + } + + /** + * Polls a {@link Consumer> until it has either polled too many times without hitting the target number + * of results, or it hits the target number of results. + * + * @param pollMs - How long each poll could take. + * @param pollIterations - The maximum number of polls that will be attempted. + * @param targetSize - The number of results to read before stopping. + * @param consumer - The consumer that will be polled. + * @return The results that were read frmo the consumer. + * @throws Exception If the poll failed. + */ + public static <K, V> List<V> pollForResults( + final int pollMs, + final int pollIterations, + final int targetSize, + final Consumer<K, V> consumer) throws Exception { + requireNonNull(consumer); + + final List<V> values = new ArrayList<>(); + + int i = 0; + while(values.size() < targetSize && i < pollIterations) { + for(final ConsumerRecord<K, V> record : consumer.poll(pollMs)) { + values.add( record.value() ); + } + i++; + } + + return values; + } +} \ No newline at end of file