Repository: metron Updated Branches: refs/heads/master acab9436d -> 39e22fa1b
METRON-1571 Correct KAFKA_TAIL Seek to End Logic (nickwallen) closes apache/metron#1023 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/39e22fa1 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/39e22fa1 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/39e22fa1 Branch: refs/heads/master Commit: 39e22fa1b343b08fe2dcb0debdb78df5e3909a0a Parents: acab943 Author: nickwallen <n...@nickallen.org> Authored: Fri Jun 1 15:31:37 2018 -0400 Committer: nickallen <nickal...@apache.org> Committed: Fri Jun 1 15:31:37 2018 -0400 ---------------------------------------------------------------------- .../CURRENT/package/scripts/metron_service.py | 5 + .../metron/management/KafkaFunctions.java | 298 ++++++++++++++----- .../KafkaFunctionsIntegrationTest.java | 236 +++++++++++++-- 3 files changed, 428 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/39e22fa1/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py index 2478b8b..894ba44 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py @@ -104,6 +104,11 @@ def build_global_config_patch(params, patch_file): "op": "add", "path": "/user.settings.hbase.cf", "value": "{{user_settings_hbase_cf}}" + }, + { + "op": "add", + "path": "/bootstrap.servers", + "value": "{{kafka_brokers}}" } ] """ http://git-wip-us.apache.org/repos/asf/metron/blob/39e22fa1/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java index 1b0af51..316e19d 100644 --- a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java +++ b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java @@ -18,73 +18,115 @@ package org.apache.metron.management; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.metron.common.system.Clock; +import org.apache.metron.stellar.common.utils.ConversionUtils; import org.apache.metron.stellar.dsl.Context; import org.apache.metron.stellar.dsl.ParseException; import org.apache.metron.stellar.dsl.Stellar; import org.apache.metron.stellar.dsl.StellarFunction; -import org.apache.metron.stellar.common.utils.ConversionUtils; - -import java.util.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; import java.util.concurrent.ExecutionException; +import static java.lang.String.format; import static org.apache.metron.stellar.dsl.Context.Capabilities.GLOBAL_CONFIG; /** - * Kafka functions available in Stellar. + * Defines the following Kafka-related functions available in Stellar. * - * KAFKA_GET - * KAFKA_TAIL - * KAFKA_PUT - * KAFKA_PROPS + * KAFKA_GET + * KAFKA_PUT + * KAFKA_TAIL + * KAFKA_PROPS */ public class KafkaFunctions { - /** - * How long to wait on each poll request in milliseconds. There will be multiple - * poll requests, each waiting this period of time. The maximum amount of time - * that the user is willing to wait for a message will be some multiple of this value. - */ - private static final int POLL_TIMEOUT = 1000; + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); /** * The key for the property that defines the maximum amount of time * to wait to receive messages. */ - private static final String MAX_WAIT_PROPERTY = "stellar.kafka.max.wait"; + public static final String POLL_TIMEOUT_PROPERTY = "stellar.kafka.poll.timeout"; /** - * Maintaining the default Kafka properties as a static class member is - * critical to consumer offset management. + * How long to wait on each poll request in milliseconds. * - * A unique 'group.id' is generated when creating these default properties. This - * value is used when storing Kafka consumer offsets. Multiple executions of any - * KAFKA_* functions, within the same Stellar REPL session, must maintain the same - * 'group.id'. At the same time, different Stellar REPL sessions running - * simultaneously should each have their own 'group.id'. + * <p>One each function call, there will likely be multiple poll requests, each + * waiting this period of time. + */ + private static final int DEFAULT_POLL_TIMEOUT = 500; + + /** + * The key for the property that defines the maximum amount of time + * to wait to receive messages in milliseconds. + */ + public static final String MAX_WAIT_PROPERTY = "stellar.kafka.max.wait.millis"; + + /** + * The default max wait time in milliseconds. + */ + public static final int DEFAULT_MAX_WAIT = 5000; + + /** + * The default set of Kafka properties. */ private static Properties defaultProperties = defaultKafkaProperties(); /** + * A clock to tell time. + * + * Allows any functions that depend on the system clock to be more readily tested. + */ + protected static Clock clock = new Clock(); + + /** * KAFKA_GET * - * Retrieves messages from a Kafka topic. Subsequent calls will continue retrieving messages + * <p>Retrieves messages from a Kafka topic. Subsequent calls will continue retrieving messages * sequentially from the original offset. * - * Example: Retrieve one message from a topic. - * KAFKA_GET('topic') + * <p>Example: Retrieve one message from a topic. + * <pre> + * {@code + * KAFKA_GET('topic') + * } + * </pre> * - * Example: Retrieve 10 messages from a topic. - * KAFKA_GET('topic', 10) + * <p>Example: Retrieve 10 messages from a topic. + * <pre> + * {@code + * KAFKA_GET('topic', 10) + * } + * </pre> * - * Example: Retrieve the first message from a topic. This must be the first retrieval + * <p>Example: Retrieve the first message from a topic. This must be the first retrieval * from the topic, otherwise the messages will be retrieved starting from the * previously stored consumer offset. - * KAFKA_GET('topic', 1, { "auto.offset.reset": "earliest" }) + * <pre> + * {@code + * KAFKA_GET('topic', 1, { "auto.offset.reset": "earliest" }) + * } + * </pre> */ @Stellar( namespace = "KAFKA", @@ -96,13 +138,12 @@ public class KafkaFunctions { "count - The number of Kafka messages to retrieve", "config - Optional map of key/values that override any global properties." }, - returns = "List of strings" + returns = "The messages as a list of strings" ) public static class KafkaGet implements StellarFunction { @Override public Object apply(List<Object> args, Context context) throws ParseException { - List<String> messages = new ArrayList<>(); // required - name of the topic to retrieve messages from String topic = ConversionUtils.convert(args.get(0), String.class); @@ -123,15 +164,43 @@ public class KafkaFunctions { Properties properties = buildKafkaProperties(overrides, context); properties.put("max.poll.records", count); + return getMessages(topic, count, properties); + } + + /** + * Gets messages from a Kafka topic. + * + * @param topic The Kafka topic. + * @param count The maximum number of messages to get. + * @param properties The function properties. + * @return + */ + private Object getMessages(String topic, int count, Properties properties) { + + int maxWait = getMaxWait(properties); + int pollTimeout = getPollTimeout(properties); + List<Object> messages = new ArrayList<>(); + // read some messages try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) { - consumer.subscribe(Arrays.asList(topic)); - int maxAttempts = getMaxAttempts(properties); - int i = 0; - while(messages.size() < count && i++ < maxAttempts) { - consumer.poll(POLL_TIMEOUT).forEach(record -> messages.add(record.value())); + manualPartitionAssignment(topic, consumer); + + // continue until we have enough messages or exceeded the max wait time + long wait = 0L; + final long start = clock.currentTimeMillis(); + while(messages.size() < count && wait < maxWait) { + + for(ConsumerRecord<String, String> record: consumer.poll(pollTimeout)) { + messages.add(record.value()); + } + + // how long have we waited? + wait = clock.currentTimeMillis() - start; consumer.commitSync(); + + LOG.debug("KAFKA_GET polled for messages; topic={}, count={}, waitTime={} ms", + topic, messages.size(), wait); } } @@ -153,31 +222,37 @@ public class KafkaFunctions { /** * KAFKA_TAIL * - * Retrieves messages from a Kafka topic always starting with - * the most recent message first. + * <p>Tails messages from a Kafka topic always starting with the most recently received message. * - * Example: Retrieve the latest message from a topic. - * KAFKA_TAIL('topic') + * <p>Example: Retrieve the latest message from a topic. + * <pre> + * {@code + * KAFKA_TAIL('topic') + * } + * </pre> * - * Example: Retrieve 10 messages from a topic starting with the latest. - * KAFKA_TAIL('topic', 10) + * <p>Example: Retrieve 10 messages from a topic starting with the latest. + * <pre> + * {@code + * KAFKA_TAIL('topic', 10) + * } + * </pre> */ @Stellar( namespace = "KAFKA", name = "TAIL", - description = "Retrieves messages from a Kafka topic always starting with the most recent message first.", + description = "Tails messages from a Kafka topic always starting with the most recently received message.", params = { "topic - The name of the Kafka topic", "count - The number of Kafka messages to retrieve", "config - Optional map of key/values that override any global properties." }, - returns = "Messages retrieved from the Kafka topic" + returns = "The messages as a list of strings" ) public static class KafkaTail implements StellarFunction { @Override public Object apply(List<Object> args, Context context) throws ParseException { - List<String> messages = new ArrayList<>(); // required - name of the topic to retrieve messages from String topic = ConversionUtils.convert(args.get(0), String.class); @@ -194,21 +269,48 @@ public class KafkaFunctions { overrides = ConversionUtils.convert(args.get(2), Map.class); } - // build the properties for kafka Properties properties = buildKafkaProperties(overrides, context); + properties.put("max.poll.records", count); + + return tailMessages(topic, count, properties); + } + + /** + * Gets messages from the tail end of a Kafka topic. + * + * @param topic The name of the kafka topic. + * @param count The maximum number of messages to get. + * @param properties The function configuration properties. + * @return A list of messages from the tail end of a Kafka topic. + */ + private Object tailMessages(String topic, int count, Properties properties) { - // ensures messages pulled from latest offset, versus a previously stored consumer offset - properties.put("group.id", generateGroupId()); - properties.put("auto.offset.reset", "latest"); + List<Object> messages = new ArrayList<>(); + int pollTimeout = getPollTimeout(properties); + int maxWait = getMaxWait(properties); + // create the consumer try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) { - consumer.subscribe(Arrays.asList(topic)); - int maxAttempts = getMaxAttempts(properties); - int i = 0; - while(messages.size() < count && i++ < maxAttempts) { - consumer.poll(POLL_TIMEOUT).forEach(record -> messages.add(record.value())); + // seek to the end of all topic/partitions + Set<TopicPartition> partitions = manualPartitionAssignment(topic, consumer); + consumer.seekToEnd(partitions); + + // continue until we have enough messages or exceeded the max wait time + long wait = 0L; + final long start = clock.currentTimeMillis(); + while(messages.size() < count && wait < maxWait) { + + for(ConsumerRecord<String, String> record: consumer.poll(pollTimeout)) { + messages.add(record.value()); + } + + // how long have we waited? + wait = clock.currentTimeMillis() - start; consumer.commitSync(); + + LOG.debug("KAFKA_TAIL polled for messages; topic={}, count={}, waitTime={} ms", + topic, messages.size(), wait); } } @@ -353,6 +455,31 @@ public class KafkaFunctions { } /** + * Manually assigns all partitions in a topic to a consumer + * + * @param topic The topic whose partitions will be assigned. + * @param consumer The consumer to assign partitions to. + * @return A set of topic-partitions that were manually assigned to the consumer. + */ + private static Set<TopicPartition> manualPartitionAssignment(String topic, KafkaConsumer<String, String> consumer) { + + // find all partitions for the topic + Set<TopicPartition> partitions = new HashSet<>(); + for(PartitionInfo partition : consumer.partitionsFor(topic)) { + partitions.add(new TopicPartition(topic, partition.partition())); + } + + if(partitions.size() == 0) { + throw new IllegalStateException(format("No partitions available for consumer assignment; topic=%s", topic)); + } + + // manually assign this consumer to each partition in the topic + consumer.assign(partitions); + + return partitions; + } + + /** * Assembles the set of Properties required by the Kafka client. * * A set of default properties has been defined to provide minimum functionality. @@ -382,13 +509,40 @@ public class KafkaFunctions { } /** - * Determine how many poll attempts should be made based on the user's patience. - * @param properties - * @return The maximum number of poll attempts to make. + * Return the max wait time setting. + * + * @param properties The function configuration properties. + * @return The mex wait time in milliseconds. */ - private static int getMaxAttempts(Properties properties) { - int maxWait = ConversionUtils.convert(properties.get(MAX_WAIT_PROPERTY), Integer.class); - return maxWait / POLL_TIMEOUT; + private static int getMaxWait(Properties properties) { + int maxWait = DEFAULT_MAX_WAIT; + + Object value = properties.get(MAX_WAIT_PROPERTY); + if(value != null) { + maxWait = ConversionUtils.convert(value, Integer.class); + } + + return maxWait; + } + + /** + * Returns the poll timeout setting. + * + * <p>The maximum amount of time waited each time that Kafka is polled + * for messages. + * + * @param properties The function configuration properties. + * @return + */ + private static int getPollTimeout(Properties properties) { + int pollTimeout = DEFAULT_POLL_TIMEOUT; + + Object value = properties.get(POLL_TIMEOUT_PROPERTY); + if(value != null) { + pollTimeout = ConversionUtils.convert(value, Integer.class); + } + + return pollTimeout; } /** @@ -399,15 +553,7 @@ public class KafkaFunctions { Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9092"); - - /* - * A unique 'group.id' is generated when creating these default properties. This - * value is used when storing Kafka consumer offsets. Multiple executions of any - * KAFKA_* functions, within the same Stellar REPL session, must maintain the same - * 'group.id'. At the same time, different Stellar REPL sessions running - * simultaneously should each have their own 'group.id'. - */ - properties.put("group.id", generateGroupId()); + properties.put("group.id", "kafka-functions-stellar"); /* * What to do when there is no initial offset in Kafka or if the current @@ -431,16 +577,12 @@ public class KafkaFunctions { properties.put("key.serializer", StringSerializer.class.getName()); properties.put("value.serializer", StringSerializer.class.getName()); - // the maximum time to wait for messages - properties.put(MAX_WAIT_PROPERTY, 5000); + // set the default max time to wait for messages + properties.put(MAX_WAIT_PROPERTY, DEFAULT_MAX_WAIT); - return properties; - } + // set the default poll timeout + properties.put(POLL_TIMEOUT_PROPERTY, DEFAULT_POLL_TIMEOUT); - /** - * Generates a unique 'group.id' for a session. - */ - private static String generateGroupId() { - return String.format("stellar-shell-%s", UUID.randomUUID().toString()); + return properties; } } http://git-wip-us.apache.org/repos/asf/metron/blob/39e22fa1/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java b/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java index 7a42d8b..74c6705 100644 --- a/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java +++ b/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java @@ -18,20 +18,23 @@ package org.apache.metron.management; -import org.apache.metron.stellar.dsl.Context; -import org.apache.metron.stellar.dsl.DefaultVariableResolver; -import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver; -import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver; -import org.apache.metron.stellar.common.StellarProcessor; import org.apache.metron.integration.BaseIntegrationTest; import org.apache.metron.integration.ComponentRunner; import org.apache.metron.integration.components.KafkaComponent; import org.apache.metron.integration.components.ZKServerComponent; +import org.apache.metron.stellar.common.StellarProcessor; +import org.apache.metron.stellar.dsl.Context; +import org.apache.metron.stellar.dsl.DefaultVariableResolver; +import org.apache.metron.stellar.dsl.functions.MapFunctions; +import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver; +import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; import java.util.ArrayList; import java.util.Collections; @@ -39,7 +42,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; - +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; @@ -55,11 +61,28 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest { private static final String message2 = "{ \"ip_src_addr\": \"10.0.0.1\", \"value\": 23 }"; private static final String message3 = "{ \"ip_src_addr\": \"10.0.0.1\", \"value\": 29011 }"; - private static Map<String, Object> variables = new HashMap<>(); + private static Map<String, Object> variables; private static ZKServerComponent zkServerComponent; private static KafkaComponent kafkaComponent; private static ComponentRunner runner; private static Properties global; + private static FunctionResolver functionResolver; + private static ExecutorService executor; + + @Rule + public TestName testName = new TestName(); + + @BeforeClass + public static void setupExecutor() { + executor = Executors.newFixedThreadPool(2); + } + + @AfterClass + public static void tearDownExecutor() { + if(executor != null && !executor.isShutdown()) { + executor.shutdown(); + } + } @BeforeClass public static void setupKafka() throws Exception { @@ -67,7 +90,6 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest { Properties properties = new Properties(); zkServerComponent = getZKServerComponent(properties); kafkaComponent = getKafkaComponent(properties, new ArrayList<>()); - runner = new ComponentRunner.Builder() .withComponent("zk", zkServerComponent) .withComponent("kafka", kafkaComponent) @@ -78,10 +100,23 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest { runner.start(); } + @BeforeClass + public static void setupFunctionResolver() { + + // used when executing Stellar expressions + functionResolver = new SimpleFunctionResolver() + .withClass(KafkaFunctions.KafkaGet.class) + .withClass(KafkaFunctions.KafkaPut.class) + .withClass(KafkaFunctions.KafkaProps.class) + .withClass(KafkaFunctions.KafkaTail.class) + .withClass(MapFunctions.MapGet.class); + } + @Before public void setup() { // messages that will be read/written during the tests + variables = new HashMap<>(); variables.put("message1", message1); variables.put("message2", message2); variables.put("message3", message3); @@ -105,23 +140,44 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest { } /** - * Write one message, read one message. + * KAFKA_PUT should be able to write one message to a topic. + * KAFKA_GET should be able to read one message from a topic. */ @Test - public void testOneMessage() { - run("KAFKA_PUT('topic1', [message1])"); - Object actual = run("KAFKA_GET('topic1')"); + public void testKafkaPut() { + + // use a unique topic name for this test + final String topicName = testName.getMethodName(); + variables.put("topic", topicName); + + // put a message onto the topic + run("KAFKA_PUT(topic, [message1])"); + + // get a message from the topic + Object actual = run("KAFKA_GET(topic)"); + + // validate assertEquals(Collections.singletonList(message1), actual); } /** - * Multiple messages can be read with a single call. + * KAFKA_PUT should be able to write multiple messages passed as a List. + * KAFKA_GET should be able to read multiple messages at once. */ @Test - public void testMultipleMessages() { - run("KAFKA_PUT('topic2', [message1, message2, message3])"); - Object actual = run("KAFKA_GET('topic2', 3)"); + public void testKafkaPutThenGetWithMultipleMessages() { + // use a unique topic name for this test + final String topicName = testName.getMethodName(); + variables.put("topic", topicName); + + // put multiple messages onto the topic + run("KAFKA_PUT(topic, [message1, message2, message3])"); + + // get 3 messages from the topic + Object actual = run("KAFKA_GET(topic, 3)"); + + // validate that all 3 messages were read List<String> expected = new ArrayList<String>() {{ add(message1); add(message2); @@ -131,20 +187,99 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest { } /** + * KAFKA_GET should maintain its consumer offsets and reuse them across subsequent calls. + * * Does the client maintain the consumer offset correctly? + * + * The offsets must be maintained correctly to read messages sequentially, in order + * across separate executions of KAKFA_GET */ @Test - public void testConsumerOffsets() { - run("KAFKA_PUT('topic3', [message1, message2, message3])"); + public void testKafkaGetWithSequentialReads() { + + // use a unique topic name for this test + final String topicName = testName.getMethodName(); + variables.put("topic", topicName); + + // put multiple messages onto the topic + run("KAFKA_PUT(topic, [message1, message2, message3])"); - // the offsets must be maintained correctly for us to read each message, in order, - // sequentially across separate calls to KAFKA_GET - assertEquals(Collections.singletonList(message1), run("KAFKA_GET('topic3', 1)")); - assertEquals(Collections.singletonList(message2), run("KAFKA_GET('topic3', 1)")); - assertEquals(Collections.singletonList(message3), run("KAFKA_GET('topic3', 1)")); + // read the first message + assertEquals(Collections.singletonList(message1), run("KAFKA_GET(topic, 1)")); + + // pick-up from where we left off and read the second message + assertEquals(Collections.singletonList(message2), run("KAFKA_GET(topic, 1)")); + + // pick-up from where we left off and read the third message + assertEquals(Collections.singletonList(message3), run("KAFKA_GET(topic, 1)")); + + // no more messages left to read + assertEquals(Collections.emptyList(), run("KAFKA_GET(topic, 1)")); } /** + * KAFKA_GET should return nothing if a topic does not exist + */ + @Test + public void testKafkaGetWithNonExistentTopic() { + + // use a unique topic name for this test + final String topicName = testName.getMethodName(); + variables.put("topic", topicName); + + // no more messages left to read + assertEquals(Collections.emptyList(), run("KAFKA_GET(topic, 1)")); + } + + /** + * KAFKA_TAIL should return new messages from the end of a topic. + */ + @Test + public void testKafkaTail() throws Exception { + + // use a unique topic name for this test + final String topicName = testName.getMethodName(); + variables.put("topic", topicName); + + // put multiple messages onto the topic; KAFKA tail should NOT retrieve these + run("KAFKA_PUT(topic, [message2, message2, message2])"); + + // get a message from the topic; will block until messages arrive + Future<Object> tailFuture = runAsync("KAFKA_TAIL(topic, 1)"); + + // put 10 messages onto the topic for KAFKA_TAIL to grab + runAsyncAndWait(Collections.nCopies(10, "KAFKA_PUT(topic, [message1])")); + + // expect to receive message1, which were added to the topic while KAFKA_TAIL was running + Object actual = tailFuture.get(10, TimeUnit.SECONDS); + List<String> expected = Collections.singletonList(message1); + assertEquals(expected, actual); + } + + /** + * KAFKA_TAIL should always seek to end of the topic. If no messages arrives after the 'seek to end' + * then no messages will be returned. + */ + @Test + public void testKafkaTailNone() { + + // shorten the max wait time so we do not have to wait so long + global.put(KafkaFunctions.MAX_WAIT_PROPERTY, 2000); + + // use a unique topic name for this test + final String topicName = testName.getMethodName(); + variables.put("topic", topicName); + + // put multiple messages onto the topic + run("KAFKA_PUT(topic, [message1, message2, message3])"); + + // no messages to read as KAFKA_TAIL should "seek to end" of the topic + assertEquals(Collections.emptyList(), run("KAFKA_TAIL(topic, 1)")); + } + + /** + * KAFKA_PROPS should return the set of properties used to configure the Kafka consumer + * * The properties used for the KAFKA_* functions are calculated by compiling the default, global and user * properties into a single set of properties. The global properties should override any default properties. */ @@ -162,6 +297,8 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest { } /** + * KAFKA_PROPS should allow the global properties to be overridden + * * The properties used for the KAFKA_* functions are calculated by compiling the default, global and user * properties into a single set of properties. The user properties should override any default or global properties. */ @@ -180,26 +317,59 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest { Map<String, String> properties = (Map<String, String>) run(expression); assertEquals(expected, properties.get(overriddenKey)); } - + /** * Runs a Stellar expression. - * @param expr The expression to run. + * @param expression The expression to run. */ - private Object run(String expr) { + private Object run(String expression) { // make the global properties available to the function Context context = new Context.Builder() .with(Context.Capabilities.GLOBAL_CONFIG, () -> global) .build(); - FunctionResolver functionResolver = new SimpleFunctionResolver() - .withClass(KafkaFunctions.KafkaGet.class) - .withClass(KafkaFunctions.KafkaPut.class) - .withClass(KafkaFunctions.KafkaProps.class) - .withClass(KafkaFunctions.KafkaTail.class); - + // execute the expression StellarProcessor processor = new StellarProcessor(); - return processor.parse(expr, new DefaultVariableResolver(x -> variables.get(x),x -> variables.containsKey(x)), functionResolver, context); + return processor.parse(expression, + new DefaultVariableResolver( + x -> variables.get(x), + x -> variables.containsKey(x)), + functionResolver, + context); } + /** + * Runs a Stellar expression asynchronously. + * + * <p>Does not block until the expression completes. + * + * @param expression The expression to run. + * @return The result of executing the expression. + */ + private Future<Object> runAsync(String expression) { + return executor.submit(() -> run(expression)); + } + + /** + * Runs a set of Stellar expression asynchronously and waits + * for each to complete before returning. + * + * @param expressions The expressions to complete. + * @throws Exception + */ + private void runAsyncAndWait(Iterable<String> expressions) throws Exception { + + // put multiple messages onto the topic asynchronously for KAFKA_TAIL to grab + List<Future<Object>> putFutures = new ArrayList<>(); + for(String expression: expressions) { + Future<Object> future = runAsync(expression); + putFutures.add(future); + } + + // wait for the puts to complete + for(Future<Object> future: putFutures) { + future.get(5, TimeUnit.SECONDS); + } + } }