Repository: metron
Updated Branches:
  refs/heads/master acab9436d -> 39e22fa1b


METRON-1571 Correct KAFKA_TAIL Seek to End Logic (nickwallen) closes 
apache/metron#1023


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/39e22fa1
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/39e22fa1
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/39e22fa1

Branch: refs/heads/master
Commit: 39e22fa1b343b08fe2dcb0debdb78df5e3909a0a
Parents: acab943
Author: nickwallen <n...@nickallen.org>
Authored: Fri Jun 1 15:31:37 2018 -0400
Committer: nickallen <nickal...@apache.org>
Committed: Fri Jun 1 15:31:37 2018 -0400

----------------------------------------------------------------------
 .../CURRENT/package/scripts/metron_service.py   |   5 +
 .../metron/management/KafkaFunctions.java       | 298 ++++++++++++++-----
 .../KafkaFunctionsIntegrationTest.java          | 236 +++++++++++++--
 3 files changed, 428 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/39e22fa1/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
index 2478b8b..894ba44 100644
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
@@ -104,6 +104,11 @@ def build_global_config_patch(params, patch_file):
         "op": "add",
         "path": "/user.settings.hbase.cf",
         "value": "{{user_settings_hbase_cf}}"
+    },
+    {
+        "op": "add",
+        "path": "/bootstrap.servers",
+        "value": "{{kafka_brokers}}"
     }
   ]
   """

http://git-wip-us.apache.org/repos/asf/metron/blob/39e22fa1/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
 
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
index 1b0af51..316e19d 100644
--- 
a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
+++ 
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
@@ -18,73 +18,115 @@
 
 package org.apache.metron.management;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.metron.common.system.Clock;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.ParseException;
 import org.apache.metron.stellar.dsl.Stellar;
 import org.apache.metron.stellar.dsl.StellarFunction;
-import org.apache.metron.stellar.common.utils.ConversionUtils;
-
-import java.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
+import static java.lang.String.format;
 import static org.apache.metron.stellar.dsl.Context.Capabilities.GLOBAL_CONFIG;
 
 /**
- * Kafka functions available in Stellar.
+ * Defines the following Kafka-related functions available in Stellar.
  *
- * KAFKA_GET
- * KAFKA_TAIL
- * KAFKA_PUT
- * KAFKA_PROPS
+ *  KAFKA_GET
+ *  KAFKA_PUT
+ *  KAFKA_TAIL
+ *  KAFKA_PROPS
  */
 public class KafkaFunctions {
 
-  /**
-   * How long to wait on each poll request in milliseconds.  There will be 
multiple
-   * poll requests, each waiting this period of time.  The maximum amount of 
time
-   * that the user is willing to wait for a message will be some multiple of 
this value.
-   */
-  private static final int POLL_TIMEOUT = 1000;
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   /**
    * The key for the property that defines the maximum amount of time
    * to wait to receive messages.
    */
-  private static final String MAX_WAIT_PROPERTY = "stellar.kafka.max.wait";
+  public static final String POLL_TIMEOUT_PROPERTY = 
"stellar.kafka.poll.timeout";
 
   /**
-   * Maintaining the default Kafka properties as a static class member is
-   * critical to consumer offset management.
+   * How long to wait on each poll request in milliseconds.
    *
-   * A unique 'group.id' is generated when creating these default properties.  
This
-   * value is used when storing Kafka consumer offsets.  Multiple executions 
of any
-   * KAFKA_* functions, within the same Stellar REPL session, must maintain 
the same
-   * 'group.id'.  At the same time, different Stellar REPL sessions running
-   * simultaneously should each have their own 'group.id'.
+   * <p>One each function call, there will likely be multiple poll requests, 
each
+   * waiting this period of time.
+   */
+  private static final int DEFAULT_POLL_TIMEOUT = 500;
+
+  /**
+   * The key for the property that defines the maximum amount of time
+   * to wait to receive messages in milliseconds.
+   */
+  public static final String MAX_WAIT_PROPERTY = 
"stellar.kafka.max.wait.millis";
+
+  /**
+   * The default max wait time in milliseconds.
+   */
+  public static final int DEFAULT_MAX_WAIT = 5000;
+
+  /**
+   * The default set of Kafka properties.
    */
   private static Properties defaultProperties = defaultKafkaProperties();
 
   /**
+   * A clock to tell time.
+   *
+   * Allows any functions that depend on the system clock to be more readily 
tested.
+   */
+  protected static Clock clock = new Clock();
+
+  /**
    * KAFKA_GET
    *
-   * Retrieves messages from a Kafka topic.  Subsequent calls will continue 
retrieving messages
+   * <p>Retrieves messages from a Kafka topic.  Subsequent calls will continue 
retrieving messages
    * sequentially from the original offset.
    *
-   * Example: Retrieve one message from a topic.
-   *  KAFKA_GET('topic')
+   * <p>Example: Retrieve one message from a topic.
+   * <pre>
+   *   {@code
+   *   KAFKA_GET('topic')
+   *   }
+   * </pre>
    *
-   * Example: Retrieve 10 messages from a topic.
-   *  KAFKA_GET('topic', 10)
+   * <p>Example: Retrieve 10 messages from a topic.
+   * <pre>
+   *   {@code
+   *   KAFKA_GET('topic', 10)
+   *   }
+   * </pre>
    *
-   * Example: Retrieve the first message from a topic.  This must be the first 
retrieval
+   * <p>Example: Retrieve the first message from a topic.  This must be the 
first retrieval
    * from the topic, otherwise the messages will be retrieved starting from the
    * previously stored consumer offset.
-   *  KAFKA_GET('topic', 1, { "auto.offset.reset": "earliest" })
+   * <pre>
+   *   {@code
+   *   KAFKA_GET('topic', 1, { "auto.offset.reset": "earliest" })
+   *   }
+   * </pre>
    */
   @Stellar(
           namespace = "KAFKA",
@@ -96,13 +138,12 @@ public class KafkaFunctions {
                   "count - The number of Kafka messages to retrieve",
                   "config - Optional map of key/values that override any 
global properties."
           },
-          returns = "List of strings"
+          returns = "The messages as a list of strings"
   )
   public static class KafkaGet implements StellarFunction {
 
     @Override
     public Object apply(List<Object> args, Context context) throws 
ParseException {
-      List<String> messages = new ArrayList<>();
 
       // required - name of the topic to retrieve messages from
       String topic = ConversionUtils.convert(args.get(0), String.class);
@@ -123,15 +164,43 @@ public class KafkaFunctions {
       Properties properties = buildKafkaProperties(overrides, context);
       properties.put("max.poll.records", count);
 
+      return getMessages(topic, count, properties);
+    }
+
+    /**
+     * Gets messages from a Kafka topic.
+     *
+     * @param topic The Kafka topic.
+     * @param count The maximum number of messages to get.
+     * @param properties The function properties.
+     * @return
+     */
+    private Object getMessages(String topic, int count, Properties properties) 
{
+
+      int maxWait = getMaxWait(properties);
+      int pollTimeout = getPollTimeout(properties);
+      List<Object> messages = new ArrayList<>();
+
       // read some messages
       try (KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(properties)) {
-        consumer.subscribe(Arrays.asList(topic));
 
-        int maxAttempts = getMaxAttempts(properties);
-        int i = 0;
-        while(messages.size() < count && i++ < maxAttempts) {
-          consumer.poll(POLL_TIMEOUT).forEach(record -> 
messages.add(record.value()));
+        manualPartitionAssignment(topic, consumer);
+
+        // continue until we have enough messages or exceeded the max wait time
+        long wait = 0L;
+        final long start = clock.currentTimeMillis();
+        while(messages.size() < count && wait < maxWait) {
+
+          for(ConsumerRecord<String, String> record: 
consumer.poll(pollTimeout)) {
+            messages.add(record.value());
+          }
+
+          // how long have we waited?
+          wait = clock.currentTimeMillis() - start;
           consumer.commitSync();
+
+          LOG.debug("KAFKA_GET polled for messages; topic={}, count={}, 
waitTime={} ms",
+                  topic, messages.size(), wait);
         }
       }
 
@@ -153,31 +222,37 @@ public class KafkaFunctions {
   /**
    * KAFKA_TAIL
    *
-   * Retrieves messages from a Kafka topic always starting with
-   * the most recent message first.
+   * <p>Tails messages from a Kafka topic always starting with the most 
recently received message.
    *
-   * Example: Retrieve the latest message from a topic.
-   *  KAFKA_TAIL('topic')
+   * <p>Example: Retrieve the latest message from a topic.
+   * <pre>
+   *   {@code
+   *   KAFKA_TAIL('topic')
+   *   }
+   * </pre>
    *
-   * Example: Retrieve 10 messages from a topic starting with the latest.
-   *  KAFKA_TAIL('topic', 10)
+   * <p>Example: Retrieve 10 messages from a topic starting with the latest.
+   * <pre>
+   *   {@code
+   *   KAFKA_TAIL('topic', 10)
+   *   }
+   * </pre>
    */
   @Stellar(
           namespace = "KAFKA",
           name = "TAIL",
-          description = "Retrieves messages from a Kafka topic always starting 
with the most recent message first.",
+          description = "Tails messages from a Kafka topic always starting 
with the most recently received message.",
           params = {
                   "topic - The name of the Kafka topic",
                   "count - The number of Kafka messages to retrieve",
                   "config - Optional map of key/values that override any 
global properties."
           },
-          returns = "Messages retrieved from the Kafka topic"
+          returns = "The messages as a list of strings"
   )
   public static class KafkaTail implements StellarFunction {
 
     @Override
     public Object apply(List<Object> args, Context context) throws 
ParseException {
-      List<String> messages = new ArrayList<>();
 
       // required - name of the topic to retrieve messages from
       String topic = ConversionUtils.convert(args.get(0), String.class);
@@ -194,21 +269,48 @@ public class KafkaFunctions {
         overrides = ConversionUtils.convert(args.get(2), Map.class);
       }
 
-      // build the properties for kafka
       Properties properties = buildKafkaProperties(overrides, context);
+      properties.put("max.poll.records", count);
+
+      return tailMessages(topic, count, properties);
+    }
+
+    /**
+     * Gets messages from the tail end of a Kafka topic.
+     *
+     * @param topic The name of the kafka topic.
+     * @param count The maximum number of messages to get.
+     * @param properties The function configuration properties.
+     * @return A list of messages from the tail end of a Kafka topic.
+     */
+    private Object tailMessages(String topic, int count, Properties 
properties) {
 
-      // ensures messages pulled from latest offset, versus a previously 
stored consumer offset
-      properties.put("group.id", generateGroupId());
-      properties.put("auto.offset.reset", "latest");
+      List<Object> messages = new ArrayList<>();
+      int pollTimeout = getPollTimeout(properties);
+      int maxWait = getMaxWait(properties);
 
+      // create the consumer
       try (KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(properties)) {
-        consumer.subscribe(Arrays.asList(topic));
 
-        int maxAttempts = getMaxAttempts(properties);
-        int i = 0;
-        while(messages.size() < count && i++ < maxAttempts) {
-          consumer.poll(POLL_TIMEOUT).forEach(record -> 
messages.add(record.value()));
+        // seek to the end of all topic/partitions
+        Set<TopicPartition> partitions = manualPartitionAssignment(topic, 
consumer);
+        consumer.seekToEnd(partitions);
+
+        // continue until we have enough messages or exceeded the max wait time
+        long wait = 0L;
+        final long start = clock.currentTimeMillis();
+        while(messages.size() < count && wait < maxWait) {
+
+          for(ConsumerRecord<String, String> record: 
consumer.poll(pollTimeout)) {
+            messages.add(record.value());
+          }
+
+          // how long have we waited?
+          wait = clock.currentTimeMillis() - start;
           consumer.commitSync();
+
+          LOG.debug("KAFKA_TAIL polled for messages; topic={}, count={}, 
waitTime={} ms",
+                  topic, messages.size(), wait);
         }
       }
 
@@ -353,6 +455,31 @@ public class KafkaFunctions {
   }
 
   /**
+   * Manually assigns all partitions in a topic to a consumer
+   *
+   * @param topic The topic whose partitions will be assigned.
+   * @param consumer The consumer to assign partitions to.
+   * @return A set of topic-partitions that were manually assigned to the 
consumer.
+   */
+  private static Set<TopicPartition> manualPartitionAssignment(String topic, 
KafkaConsumer<String, String> consumer) {
+
+    // find all partitions for the topic
+    Set<TopicPartition> partitions = new HashSet<>();
+    for(PartitionInfo partition : consumer.partitionsFor(topic)) {
+      partitions.add(new TopicPartition(topic, partition.partition()));
+    }
+
+    if(partitions.size() == 0) {
+      throw new IllegalStateException(format("No partitions available for 
consumer assignment; topic=%s", topic));
+    }
+
+    // manually assign this consumer to each partition in the topic
+    consumer.assign(partitions);
+
+    return partitions;
+  }
+
+  /**
    * Assembles the set of Properties required by the Kafka client.
    *
    * A set of default properties has been defined to provide minimum 
functionality.
@@ -382,13 +509,40 @@ public class KafkaFunctions {
   }
 
   /**
-   * Determine how many poll attempts should be made based on the user's 
patience.
-   * @param properties
-   * @return The maximum number of poll attempts to make.
+   * Return the max wait time setting.
+   *
+   * @param properties The function configuration properties.
+   * @return The mex wait time in milliseconds.
    */
-  private static int getMaxAttempts(Properties properties) {
-    int maxWait = ConversionUtils.convert(properties.get(MAX_WAIT_PROPERTY), 
Integer.class);
-    return maxWait / POLL_TIMEOUT;
+  private static int getMaxWait(Properties properties) {
+    int maxWait = DEFAULT_MAX_WAIT;
+
+    Object value = properties.get(MAX_WAIT_PROPERTY);
+    if(value != null) {
+      maxWait = ConversionUtils.convert(value, Integer.class);
+    }
+
+    return maxWait;
+  }
+
+  /**
+   * Returns the poll timeout setting.
+   *
+   * <p>The maximum amount of time waited each time that Kafka is polled
+   * for messages.
+   *
+   * @param properties The function configuration properties.
+   * @return
+   */
+  private static int getPollTimeout(Properties properties) {
+    int pollTimeout = DEFAULT_POLL_TIMEOUT;
+
+    Object value = properties.get(POLL_TIMEOUT_PROPERTY);
+    if(value != null) {
+      pollTimeout = ConversionUtils.convert(value, Integer.class);
+    }
+
+    return pollTimeout;
   }
 
   /**
@@ -399,15 +553,7 @@ public class KafkaFunctions {
 
     Properties properties = new Properties();
     properties.put("bootstrap.servers", "localhost:9092");
-
-    /*
-     * A unique 'group.id' is generated when creating these default 
properties.  This
-     * value is used when storing Kafka consumer offsets.  Multiple executions 
of any
-     * KAFKA_* functions, within the same Stellar REPL session, must maintain 
the same
-     * 'group.id'.  At the same time, different Stellar REPL sessions running
-     * simultaneously should each have their own 'group.id'.
-     */
-    properties.put("group.id", generateGroupId());
+    properties.put("group.id", "kafka-functions-stellar");
 
     /*
      * What to do when there is no initial offset in Kafka or if the current
@@ -431,16 +577,12 @@ public class KafkaFunctions {
     properties.put("key.serializer", StringSerializer.class.getName());
     properties.put("value.serializer", StringSerializer.class.getName());
 
-    // the maximum time to wait for messages
-    properties.put(MAX_WAIT_PROPERTY, 5000);
+    // set the default max time to wait for messages
+    properties.put(MAX_WAIT_PROPERTY, DEFAULT_MAX_WAIT);
 
-    return properties;
-  }
+    // set the default poll timeout
+    properties.put(POLL_TIMEOUT_PROPERTY, DEFAULT_POLL_TIMEOUT);
 
-  /**
-   * Generates a unique 'group.id' for a session.
-   */
-  private static String generateGroupId() {
-    return String.format("stellar-shell-%s", UUID.randomUUID().toString());
+    return properties;
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/39e22fa1/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
 
b/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
index 7a42d8b..74c6705 100644
--- 
a/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
+++ 
b/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
@@ -18,20 +18,23 @@
 
 package org.apache.metron.management;
 
-import org.apache.metron.stellar.dsl.Context;
-import org.apache.metron.stellar.dsl.DefaultVariableResolver;
-import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
-import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
-import org.apache.metron.stellar.common.StellarProcessor;
 import org.apache.metron.integration.BaseIntegrationTest;
 import org.apache.metron.integration.ComponentRunner;
 import org.apache.metron.integration.components.KafkaComponent;
 import org.apache.metron.integration.components.ZKServerComponent;
+import org.apache.metron.stellar.common.StellarProcessor;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.DefaultVariableResolver;
+import org.apache.metron.stellar.dsl.functions.MapFunctions;
+import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
+import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestName;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -39,7 +42,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 
@@ -55,11 +61,28 @@ public class KafkaFunctionsIntegrationTest extends 
BaseIntegrationTest {
   private static final String message2 = "{ \"ip_src_addr\": \"10.0.0.1\", 
\"value\": 23 }";
   private static final String message3 = "{ \"ip_src_addr\": \"10.0.0.1\", 
\"value\": 29011 }";
 
-  private static Map<String, Object> variables = new HashMap<>();
+  private static Map<String, Object> variables;
   private static ZKServerComponent zkServerComponent;
   private static KafkaComponent kafkaComponent;
   private static ComponentRunner runner;
   private static Properties global;
+  private static FunctionResolver functionResolver;
+  private static ExecutorService executor;
+
+  @Rule
+  public TestName testName = new TestName();
+
+  @BeforeClass
+  public static void setupExecutor() {
+    executor = Executors.newFixedThreadPool(2);
+  }
+
+  @AfterClass
+  public static void tearDownExecutor() {
+    if(executor != null && !executor.isShutdown()) {
+      executor.shutdown();
+    }
+  }
 
   @BeforeClass
   public static void setupKafka() throws Exception {
@@ -67,7 +90,6 @@ public class KafkaFunctionsIntegrationTest extends 
BaseIntegrationTest {
     Properties properties = new Properties();
     zkServerComponent = getZKServerComponent(properties);
     kafkaComponent = getKafkaComponent(properties, new ArrayList<>());
-
     runner = new ComponentRunner.Builder()
             .withComponent("zk", zkServerComponent)
             .withComponent("kafka", kafkaComponent)
@@ -78,10 +100,23 @@ public class KafkaFunctionsIntegrationTest extends 
BaseIntegrationTest {
     runner.start();
   }
 
+  @BeforeClass
+  public static void setupFunctionResolver() {
+
+    // used when executing Stellar expressions
+    functionResolver = new SimpleFunctionResolver()
+            .withClass(KafkaFunctions.KafkaGet.class)
+            .withClass(KafkaFunctions.KafkaPut.class)
+            .withClass(KafkaFunctions.KafkaProps.class)
+            .withClass(KafkaFunctions.KafkaTail.class)
+            .withClass(MapFunctions.MapGet.class);
+  }
+
   @Before
   public void setup() {
 
     // messages that will be read/written during the tests
+    variables = new HashMap<>();
     variables.put("message1", message1);
     variables.put("message2", message2);
     variables.put("message3", message3);
@@ -105,23 +140,44 @@ public class KafkaFunctionsIntegrationTest extends 
BaseIntegrationTest {
   }
 
   /**
-   * Write one message, read one message.
+   * KAFKA_PUT should be able to write one message to a topic.
+   * KAFKA_GET should be able to read one message from a topic.
    */
   @Test
-  public void testOneMessage() {
-    run("KAFKA_PUT('topic1', [message1])");
-    Object actual = run("KAFKA_GET('topic1')");
+  public void testKafkaPut() {
+
+    // use a unique topic name for this test
+    final String topicName = testName.getMethodName();
+    variables.put("topic", topicName);
+
+    // put a message onto the topic
+    run("KAFKA_PUT(topic, [message1])");
+
+    // get a message from the topic
+    Object actual = run("KAFKA_GET(topic)");
+
+    // validate
     assertEquals(Collections.singletonList(message1), actual);
   }
 
   /**
-   * Multiple messages can be read with a single call.
+   * KAFKA_PUT should be able to write multiple messages passed as a List.
+   * KAFKA_GET should be able to read multiple messages at once.
    */
   @Test
-  public void testMultipleMessages() {
-    run("KAFKA_PUT('topic2', [message1, message2, message3])");
-    Object actual = run("KAFKA_GET('topic2', 3)");
+  public void testKafkaPutThenGetWithMultipleMessages() {
 
+    // use a unique topic name for this test
+    final String topicName = testName.getMethodName();
+    variables.put("topic", topicName);
+
+    // put multiple messages onto the topic
+    run("KAFKA_PUT(topic, [message1, message2, message3])");
+
+    // get 3 messages from the topic
+    Object actual = run("KAFKA_GET(topic, 3)");
+
+    // validate that all 3 messages were read
     List<String> expected = new ArrayList<String>() {{
       add(message1);
       add(message2);
@@ -131,20 +187,99 @@ public class KafkaFunctionsIntegrationTest extends 
BaseIntegrationTest {
   }
 
   /**
+   * KAFKA_GET should maintain its consumer offsets and reuse them across 
subsequent calls.
+   *
    * Does the client maintain the consumer offset correctly?
+   *
+   * The offsets must be maintained correctly to read messages sequentially, 
in order
+   * across separate executions of KAKFA_GET
    */
   @Test
-  public void testConsumerOffsets() {
-    run("KAFKA_PUT('topic3', [message1, message2, message3])");
+  public void testKafkaGetWithSequentialReads() {
+
+    // use a unique topic name for this test
+    final String topicName = testName.getMethodName();
+    variables.put("topic", topicName);
+
+    // put multiple messages onto the topic
+    run("KAFKA_PUT(topic, [message1, message2, message3])");
 
-    // the offsets must be maintained correctly for us to read each message, 
in order,
-    // sequentially across separate calls to KAFKA_GET
-    assertEquals(Collections.singletonList(message1), run("KAFKA_GET('topic3', 
1)"));
-    assertEquals(Collections.singletonList(message2), run("KAFKA_GET('topic3', 
1)"));
-    assertEquals(Collections.singletonList(message3), run("KAFKA_GET('topic3', 
1)"));
+    // read the first message
+    assertEquals(Collections.singletonList(message1), run("KAFKA_GET(topic, 
1)"));
+
+    // pick-up from where we left off and read the second message
+    assertEquals(Collections.singletonList(message2), run("KAFKA_GET(topic, 
1)"));
+
+    // pick-up from where we left off and read the third message
+    assertEquals(Collections.singletonList(message3), run("KAFKA_GET(topic, 
1)"));
+
+    // no more messages left to read
+    assertEquals(Collections.emptyList(), run("KAFKA_GET(topic, 1)"));
   }
 
   /**
+   * KAFKA_GET should return nothing if a topic does not exist
+   */
+  @Test
+  public void testKafkaGetWithNonExistentTopic() {
+
+    // use a unique topic name for this test
+    final String topicName = testName.getMethodName();
+    variables.put("topic", topicName);
+
+    // no more messages left to read
+    assertEquals(Collections.emptyList(), run("KAFKA_GET(topic, 1)"));
+  }
+
+  /**
+   * KAFKA_TAIL should return new messages from the end of a topic.
+   */
+  @Test
+  public void testKafkaTail() throws Exception {
+
+    // use a unique topic name for this test
+    final String topicName = testName.getMethodName();
+    variables.put("topic", topicName);
+
+    // put multiple messages onto the topic; KAFKA tail should NOT retrieve 
these
+    run("KAFKA_PUT(topic, [message2, message2, message2])");
+
+    // get a message from the topic; will block until messages arrive
+    Future<Object> tailFuture = runAsync("KAFKA_TAIL(topic, 1)");
+
+    // put 10 messages onto the topic for KAFKA_TAIL to grab
+    runAsyncAndWait(Collections.nCopies(10, "KAFKA_PUT(topic, [message1])"));
+
+    // expect to receive message1, which were added to the topic while 
KAFKA_TAIL was running
+    Object actual = tailFuture.get(10, TimeUnit.SECONDS);
+    List<String> expected = Collections.singletonList(message1);
+    assertEquals(expected, actual);
+  }
+
+  /**
+   * KAFKA_TAIL should always seek to end of the topic.  If no messages 
arrives after the 'seek to end'
+   * then no messages will be returned.
+   */
+  @Test
+  public void testKafkaTailNone() {
+
+    // shorten the max wait time so we do not have to wait so long
+    global.put(KafkaFunctions.MAX_WAIT_PROPERTY, 2000);
+
+    // use a unique topic name for this test
+    final String topicName = testName.getMethodName();
+    variables.put("topic", topicName);
+
+    // put multiple messages onto the topic
+    run("KAFKA_PUT(topic, [message1, message2, message3])");
+
+    // no messages to read as KAFKA_TAIL should "seek to end" of the topic
+    assertEquals(Collections.emptyList(), run("KAFKA_TAIL(topic, 1)"));
+  }
+
+  /**
+   * KAFKA_PROPS should return the set of properties used to configure the 
Kafka consumer
+   *
    * The properties used for the KAFKA_* functions are calculated by compiling 
the default, global and user
    * properties into a single set of properties.  The global properties should 
override any default properties.
    */
@@ -162,6 +297,8 @@ public class KafkaFunctionsIntegrationTest extends 
BaseIntegrationTest {
   }
 
   /**
+   * KAFKA_PROPS should allow the global properties to be overridden
+   *
    * The properties used for the KAFKA_* functions are calculated by compiling 
the default, global and user
    * properties into a single set of properties.  The user properties should 
override any default or global properties.
    */
@@ -180,26 +317,59 @@ public class KafkaFunctionsIntegrationTest extends 
BaseIntegrationTest {
     Map<String, String> properties = (Map<String, String>) run(expression);
     assertEquals(expected, properties.get(overriddenKey));
   }
-
+  
   /**
    * Runs a Stellar expression.
-   * @param expr The expression to run.
+   * @param expression The expression to run.
    */
-  private Object run(String expr) {
+  private Object run(String expression) {
 
     // make the global properties available to the function
     Context context = new Context.Builder()
             .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
             .build();
 
-    FunctionResolver functionResolver = new SimpleFunctionResolver()
-            .withClass(KafkaFunctions.KafkaGet.class)
-            .withClass(KafkaFunctions.KafkaPut.class)
-            .withClass(KafkaFunctions.KafkaProps.class)
-            .withClass(KafkaFunctions.KafkaTail.class);
-
+    // execute the expression
     StellarProcessor processor = new StellarProcessor();
-    return processor.parse(expr, new DefaultVariableResolver(x -> 
variables.get(x),x -> variables.containsKey(x)), functionResolver, context);
+    return processor.parse(expression,
+            new DefaultVariableResolver(
+                    x -> variables.get(x),
+                    x -> variables.containsKey(x)),
+            functionResolver,
+            context);
   }
 
+  /**
+   * Runs a Stellar expression asynchronously.
+   *
+   * <p>Does not block until the expression completes.
+   *
+   * @param expression The expression to run.
+   * @return The result of executing the expression.
+   */
+  private Future<Object> runAsync(String expression) {
+    return executor.submit(() -> run(expression));
+  }
+
+  /**
+   * Runs a set of Stellar expression asynchronously and waits
+   * for each to complete before returning.
+   *
+   * @param expressions The expressions to complete.
+   * @throws Exception
+   */
+  private void runAsyncAndWait(Iterable<String> expressions) throws Exception {
+
+    // put multiple messages onto the topic asynchronously for KAFKA_TAIL to 
grab
+    List<Future<Object>> putFutures = new ArrayList<>();
+    for(String expression: expressions) {
+      Future<Object> future = runAsync(expression);
+      putFutures.add(future);
+    }
+
+    // wait for the puts to complete
+    for(Future<Object> future: putFutures) {
+      future.get(5, TimeUnit.SECONDS);
+    }
+  }
 }

Reply via email to