METRON-1787 Input Time Constraints for Batch Profiler (nickwallen) closes 
apache/metron#1209


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/fa3be8d3
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/fa3be8d3
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/fa3be8d3

Branch: refs/heads/feature/METRON-1090-stellar-assignment
Commit: fa3be8d32ccadcd11edad046cbd063cec3a20624
Parents: 1545978
Author: nickwallen <n...@nickallen.org>
Authored: Wed Sep 26 18:13:30 2018 -0400
Committer: nickallen <nickal...@apache.org>
Committed: Wed Sep 26 18:13:30 2018 -0400

----------------------------------------------------------------------
 .../clock/EventTimeOnlyClockFactory.java        |  58 ++++++++
 .../clock/EventTimeOnlyClockFactoryTest.java    |  61 +++++++++
 .../metron-profiler-spark/README.md             |  19 +++
 .../metron/profiler/spark/BatchProfiler.java    |  38 +++++-
 .../profiler/spark/BatchProfilerConfig.java     |   6 +-
 .../metron/profiler/spark/TimestampParser.java  |  55 ++++++++
 .../spark/function/MessageRouterFunction.java   | 106 +++++++++++++--
 .../spark/BatchProfilerIntegrationTest.java     |  58 +++++++-
 .../profiler/spark/TimestampParserTest.java     |  67 ++++++++++
 .../function/MessageRouterFunctionTest.java     | 133 +++++++++++++++++--
 10 files changed, 579 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/fa3be8d3/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/EventTimeOnlyClockFactory.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/EventTimeOnlyClockFactory.java
 
b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/EventTimeOnlyClockFactory.java
new file mode 100644
index 0000000..2f9ca7c
--- /dev/null
+++ 
b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/EventTimeOnlyClockFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.clock;
+
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+
+import java.io.Serializable;
+
+/**
+ * Creates a {@link Clock} based on the profiler configuration.  This should
+ * be used in cases where only event time is accceptable.
+ *
+ * <p>If the Profiler is configured to use event time, a {@link 
EventTimeClock} will
+ * be created.  Otherwise, an {@link IllegalStateException} is thrown.
+ */
+public class EventTimeOnlyClockFactory implements ClockFactory, Serializable {
+
+  /**
+   * If the Profiler is configured to use event time, a {@link EventTimeClock} 
is created.
+   * Otherwise, an {@link IllegalArgumentException} is thrown.
+   *
+   * @param config The profiler configuration.
+   * @return The appropriate Clock based on the profiler configuration.
+   * @throws IllegalStateException If the profiler configuration is set to 
system time.
+   */
+  @Override
+  public Clock createClock(ProfilerConfig config) {
+    Clock clock;
+
+    boolean isEventTime = config.getTimestampField().isPresent();
+    if(isEventTime) {
+      String timestampField = config.getTimestampField().get();
+      clock = new EventTimeClock(timestampField);
+
+    } else {
+      throw new IllegalStateException("Expected profiler to use event time.");
+    }
+
+    return clock;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/fa3be8d3/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/EventTimeOnlyClockFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/EventTimeOnlyClockFactoryTest.java
 
b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/EventTimeOnlyClockFactoryTest.java
new file mode 100644
index 0000000..f1d4114
--- /dev/null
+++ 
b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/EventTimeOnlyClockFactoryTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.clock;
+
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Optional;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests the {@link EventTimeOnlyClockFactory}.
+ */
+public class EventTimeOnlyClockFactoryTest {
+
+  private EventTimeOnlyClockFactory clockFactory;
+
+  @Before
+  public void setup() {
+    clockFactory = new EventTimeOnlyClockFactory();
+  }
+
+  @Test
+  public void testCreateEventTimeClock() {
+    // configure the profiler to use event time
+    ProfilerConfig config = new ProfilerConfig();
+    config.setTimestampField(Optional.of("timestamp"));
+
+    // the factory should return a clock that handles 'event time'
+    Clock clock = clockFactory.createClock(config);
+    assertTrue(clock instanceof EventTimeClock);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testCreateProcessingTimeClock() {
+    // the profiler uses processing time by default
+    ProfilerConfig config = new ProfilerConfig();
+    clockFactory.createClock(config);
+    fail("Expected exception");
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/fa3be8d3/metron-analytics/metron-profiler-spark/README.md
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/README.md 
b/metron-analytics/metron-profiler-spark/README.md
index 99e8c7e..df143f1 100644
--- a/metron-analytics/metron-profiler-spark/README.md
+++ b/metron-analytics/metron-profiler-spark/README.md
@@ -245,6 +245,8 @@ You can store both settings for the Profiler along with 
settings for Spark in th
 |---                                                                           
 |---
 | [`profiler.batch.input.path`](#profilerbatchinputpath)                       
 | The path to the input data read by the Batch Profiler.
 | [`profiler.batch.input.format`](#profilerbatchinputformat)                   
 | The format of the input data read by the Batch Profiler.
+| [`profiler.batch.input.begin`](#profilerbatchinputend)                       
 | Only messages with a timestamp after this will be profiled.
+| [`profiler.batch.input.end`](#profilerbatchinputbegin)                       
 | Only messages with a timestamp before this will be profiled.
 | [`profiler.period.duration`](#profilerperiodduration)                        
 | The duration of each profile period.  
 | [`profiler.period.duration.units`](#profilerperioddurationunits)             
 | The units used to specify the 
[`profiler.period.duration`](#profilerperiodduration).
 | [`profiler.hbase.salt.divisor`](#profilerhbasesaltdivisor)                   
 | A salt is prepended to the row key to help prevent hot-spotting.
@@ -263,6 +265,23 @@ The path to the input data read by the Batch Profiler.
 
 The format of the input data read by the Batch Profiler.
 
+### `profiler.batch.input.begin`
+
+*Default*: undefined; no time constraint
+
+Only messages with a timestamp equal to or after this will be profiled. The 
Profiler will only profiles messages with a timestamp in 
[`profiler.batch.input.begin`, `profiler.batch.input.end`] inclusive.
+
+By default, no time constraint is defined. The value is expected to follow the 
[ISO-8601 instant 
format](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_INSTANT);
 2011-12-03T10:15:30Z.
+
+
+### `profiler.batch.input.end`
+
+*Default*: undefined; no time constraint
+
+Only messages with a timestamp before or equal to this will be profiled. The 
Profiler will only profiles messages with a timestamp in 
[`profiler.batch.input.begin`, `profiler.batch.input.end`] inclusive.
+
+By default, no time constraint is defined. The value is expected to follow the 
[ISO-8601 instant 
format](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_INSTANT);
 2011-12-03T10:15:30Z.
+
 ### `profiler.period.duration`
 
 *Default*: 15

http://git-wip-us.apache.org/repos/asf/metron/blob/fa3be8d3/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
 
b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
index d75abc3..39f8b3a 100644
--- 
a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
+++ 
b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
@@ -34,10 +34,12 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 
+import static 
org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_BEGIN;
+import static 
org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_END;
 import static 
org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_FORMAT;
 import static 
org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_PATH;
 import static org.apache.spark.sql.functions.sum;
@@ -51,6 +53,12 @@ public class BatchProfiler implements Serializable {
 
   protected static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+  private TimestampParser timestampParser;
+
+  public BatchProfiler() {
+    this.timestampParser = new TimestampParser();
+  }
+
   /**
    * Execute the Batch Profiler.
    *
@@ -69,7 +77,6 @@ public class BatchProfiler implements Serializable {
 
     LOG.debug("Building {} profile(s)", profiles.getProfiles().size());
     Map<String, String> globals = Maps.fromProperties(globalProperties);
-
     String inputFormat = TELEMETRY_INPUT_FORMAT.get(profilerProps, 
String.class);
     String inputPath = TELEMETRY_INPUT_PATH.get(profilerProps, String.class);
     LOG.debug("Loading telemetry from '{}'", inputPath);
@@ -85,7 +92,7 @@ public class BatchProfiler implements Serializable {
 
     // find all routes for each message
     Dataset<MessageRoute> routes = telemetry
-            .flatMap(new MessageRouterFunction(profiles, globals), 
Encoders.bean(MessageRoute.class));
+            .flatMap(messageRouterFunction(profilerProps, profiles, globals), 
Encoders.bean(MessageRoute.class));
     LOG.debug("Generated {} message route(s)", routes.cache().count());
 
     // build the profiles
@@ -104,4 +111,29 @@ public class BatchProfiler implements Serializable {
 
     return count;
   }
+
+  /**
+   * Builds the function that performs message routing.
+   *
+   * @param profilerProps The profiler configuration properties.
+   * @param profiles The profile definitions.
+   * @param globals The Stellar global properties.
+   * @return A {@link MessageRouterFunction}.
+   */
+  private MessageRouterFunction messageRouterFunction(
+          Properties profilerProps,
+          ProfilerConfig profiles,
+          Map<String, String> globals) {
+    MessageRouterFunction routerFunction = new MessageRouterFunction(profiles, 
globals);
+
+    // an optional time constraint to limit how far back to look for telemetry
+    Optional<Long> beginAt = 
timestampParser.parse(TELEMETRY_INPUT_BEGIN.get(profilerProps, String.class));
+    beginAt.ifPresent(begin -> routerFunction.withBegin(begin));
+
+    // an optional time constraint to limit the most recent telemetry
+    Optional<Long> endAt = 
timestampParser.parse(TELEMETRY_INPUT_END.get(profilerProps, String.class));
+    endAt.ifPresent(end -> routerFunction.withEnd(end));
+
+    return routerFunction;
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/fa3be8d3/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java
 
b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java
index 054806e..e8cd160 100644
--- 
a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java
+++ 
b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java
@@ -46,7 +46,11 @@ public enum BatchProfilerConfig {
 
   TELEMETRY_INPUT_FORMAT("profiler.batch.input.format", "text", String.class),
 
-  TELEMETRY_INPUT_PATH("profiler.batch.input.path", 
"hdfs://localhost:9000/apps/metron/indexing/indexed/*/*", String.class);
+  TELEMETRY_INPUT_PATH("profiler.batch.input.path", 
"hdfs://localhost:9000/apps/metron/indexing/indexed/*/*", String.class),
+
+  TELEMETRY_INPUT_BEGIN("profiler.batch.input.begin", "", String.class),
+
+  TELEMETRY_INPUT_END("profiler.batch.input.end", "", String.class);
 
   /**
    * The key for the configuration value.

http://git-wip-us.apache.org/repos/asf/metron/blob/fa3be8d3/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/TimestampParser.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/TimestampParser.java
 
b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/TimestampParser.java
new file mode 100644
index 0000000..4be8b3e
--- /dev/null
+++ 
b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/TimestampParser.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.spark;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.util.Optional;
+
+/**
+ * Parses an input string and returns a timestamp in epoch milliseconds.
+ */
+public class TimestampParser {
+
+  /**
+   * Parses an input string and returns an optional timestamp in epoch 
milliseconds.
+   *
+   * @param inputString The input defining a timestamp.
+   * @return A timestamp in epoch milliseconds.
+   */
+  public Optional<Long> parse(String inputString) {
+    Optional<Long> epochMilli = Optional.empty();
+
+    // a blank is acceptable and treated as undefined
+    if (StringUtils.isNotBlank(inputString)) {
+      epochMilli = Optional.of(new DateTimeFormatterBuilder()
+              .append(DateTimeFormatter.ISO_INSTANT)
+              .toFormatter()
+              .parse(inputString, Instant::from)
+              .toEpochMilli());
+    }
+
+    return epochMilli;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/fa3be8d3/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/MessageRouterFunction.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/MessageRouterFunction.java
 
b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/MessageRouterFunction.java
index cf8029f..31734d0 100644
--- 
a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/MessageRouterFunction.java
+++ 
b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/MessageRouterFunction.java
@@ -23,7 +23,11 @@ import 
org.apache.metron.common.configuration.profiler.ProfilerConfig;
 import org.apache.metron.profiler.DefaultMessageRouter;
 import org.apache.metron.profiler.MessageRoute;
 import org.apache.metron.profiler.MessageRouter;
+import org.apache.metron.profiler.clock.Clock;
+import org.apache.metron.profiler.clock.ClockFactory;
+import org.apache.metron.profiler.clock.EventTimeOnlyClockFactory;
 import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.StellarFunctions;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
@@ -54,9 +58,27 @@ public class MessageRouterFunction implements 
FlatMapFunction<String, MessageRou
    */
   private ProfilerConfig profilerConfig;
 
+  /**
+   * A clock that can extract time from the messages received.
+   */
+  private Clock clock;
+
+  /**
+   * Only messages with a timestamp after this will be routed.
+   */
+  private Long begin;
+
+  /**
+   * Only messages with a timestamp before this will be routed.
+   */
+  private Long end;
+
   public MessageRouterFunction(ProfilerConfig profilerConfig, Map<String, 
String> globals) {
     this.profilerConfig = profilerConfig;
     this.globals = globals;
+    this.begin = Long.MIN_VALUE;
+    this.end = Long.MAX_VALUE;
+    withClockFactory(new EventTimeOnlyClockFactory());
   }
 
   /**
@@ -70,8 +92,7 @@ public class MessageRouterFunction implements 
FlatMapFunction<String, MessageRou
    */
   @Override
   public Iterator<MessageRoute> call(String jsonMessage) throws Exception {
-    List<MessageRoute> routes;
-
+    List<MessageRoute> routes = Collections.emptyList();
     JSONParser parser = new JSONParser();
     Context context = TaskUtils.getContext(globals);
     MessageRouter router = new DefaultMessageRouter(context);
@@ -80,20 +101,67 @@ public class MessageRouterFunction implements 
FlatMapFunction<String, MessageRou
     Optional<JSONObject> message = toMessage(jsonMessage, parser);
     if(message.isPresent()) {
 
-      // find all routes
-      routes = router.route(message.get(), profilerConfig, context);
-      LOG.trace("Found {} route(s) for a message", routes.size());
+      // extract the timestamp from the message
+      Optional<Long> timestampOpt = clock.currentTimeMillis(message.get());
+      if (timestampOpt.isPresent()) {
+
+        // timestamp must be in [begin, end]
+        Long timestamp = timestampOpt.get();
+        if(timestamp >= begin && timestamp <= end) {
+          routes = router.route(message.get(), profilerConfig, context);
+          LOG.trace("Found {} route(s) for a message", routes.size());
+
+        } else {
+          LOG.trace("Ignoring message; timestamp={} not in [{},{}]", 
timestamp, prettyPrint(begin), prettyPrint(end));
+        }
+
+      } else {
+        LOG.trace("No timestamp in message. Message will be ignored.");
+      }
 
     } else {
-      // the message is not valid and must be ignored
-      routes = Collections.emptyList();
-      LOG.trace("No route possible. Unable to parse message.");
+      LOG.trace("Unable to parse message. Message will be ignored");
     }
 
     return routes.iterator();
   }
 
   /**
+   * Set a time constraint.
+   *
+   * @param begin Only messages with a timestamp after this will be routed.
+   * @return The message router function
+   */
+  public MessageRouterFunction withBegin(Long begin) {
+    this.begin = begin;
+    return this;
+  }
+
+  /**
+   * Set a time constraint.
+   *
+   * @param end Only messages with a timestamp before this will be routed.
+   * @return The message router function
+   */
+  public MessageRouterFunction withEnd(Long end) {
+    this.end = end;
+    return this;
+  }
+
+  /**
+   * Defines the {@link ClockFactory} used to create the {@link Clock}.
+   *
+   * <p>Calling this method is only needed to override the default behavior.
+   *
+   * @param clockFactory The factory to use for creating the {@link Clock}.
+   * @return The message router function.
+   */
+  public MessageRouterFunction withClockFactory(ClockFactory clockFactory) {
+    this.clock = clockFactory.createClock(profilerConfig);
+    return this;
+  }
+
+  /**
    * Parses the raw JSON of a message.
    *
    * @param json The raw JSON to parse.
@@ -110,4 +178,26 @@ public class MessageRouterFunction implements 
FlatMapFunction<String, MessageRou
       return Optional.empty();
     }
   }
+
+  /**
+   * Pretty prints a Long value for use when logging these values.
+   *
+   * <p>Long.MIN_VALUE and Long.MAX_VALUE will occur frequently and is 
difficult to grok in the logs.  Instead
+   * Long.MIN_VALUE is rendered as "MIN". Long.MAX_VALUE is srendered as 
"MAX". All other values are rendered
+   * directly.
+   *
+   * @param value The value to pretty print.
+   * @return
+   */
+  private static String prettyPrint(Long value) {
+    String result;
+    if(value == Long.MIN_VALUE) {
+      result = "MIN";
+    } else if(value == Long.MAX_VALUE) {
+      result = "MAX";
+    } else {
+      result = value.toString();
+    }
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/fa3be8d3/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
 
b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
index 87c4246..c33644f 100644
--- 
a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
+++ 
b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
@@ -38,8 +38,11 @@ import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.lang.invoke.MethodHandles;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -51,6 +54,8 @@ import static 
org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PRO
 import static 
org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_COLUMN_FAMILY;
 import static 
org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_NAME;
 import static 
org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_PROVIDER;
+import static 
org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_BEGIN;
+import static 
org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_END;
 import static 
org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_FORMAT;
 import static 
org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_PATH;
 import static org.junit.Assert.assertTrue;
@@ -60,6 +65,7 @@ import static org.junit.Assert.assertTrue;
  */
 public class BatchProfilerIntegrationTest {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   /**
    * {
    *   "timestampField": "timestamp",
@@ -212,6 +218,54 @@ public class BatchProfilerIntegrationTest {
     validateProfiles();
   }
 
+  @Test
+  public void testBatchProfilerWithEndTimeConstraint() throws Exception {
+    // the input telemetry is text/json stored in the local filesystem
+    profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), 
"src/test/resources/telemetry.json");
+    profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "text");
+
+    // there are 40 messages before "2018-07-07T15:51:48Z" in the test data
+    profilerProperties.put(TELEMETRY_INPUT_BEGIN.getKey(), "");
+    profilerProperties.put(TELEMETRY_INPUT_END.getKey(), 
"2018-07-07T15:51:48Z");
+
+    BatchProfiler profiler = new BatchProfiler();
+    profiler.run(spark, profilerProperties, getGlobals(), readerProperties, 
getProfile());
+
+    // the max timestamp in the data is around July 7, 2018
+    assign("maxTimestamp", "1530978728982L");
+
+    // the 'window' looks up to 5 hours before the max timestamp
+    assign("window", "PROFILE_WINDOW('from 5 hours ago', maxTimestamp)");
+
+    assertTrue(execute("[12] == PROFILE_GET('count-by-ip', '192.168.66.1', 
window)", Boolean.class));
+    assertTrue(execute("[28] == PROFILE_GET('count-by-ip', '192.168.138.158', 
window)", Boolean.class));
+    assertTrue(execute("[40] == PROFILE_GET('total-count', 'total', window)", 
Boolean.class));
+  }
+
+  @Test
+  public void testBatchProfilerWithBeginTimeConstraint() throws Exception {
+    // the input telemetry is text/json stored in the local filesystem
+    profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), 
"src/test/resources/telemetry.json");
+    profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "text");
+
+    // there are 60 messages after "2018-07-07T15:51:48Z" in the test data
+    profilerProperties.put(TELEMETRY_INPUT_BEGIN.getKey(), 
"2018-07-07T15:51:48Z");
+    profilerProperties.put(TELEMETRY_INPUT_END.getKey(), "");
+
+    BatchProfiler profiler = new BatchProfiler();
+    profiler.run(spark, profilerProperties, getGlobals(), readerProperties, 
getProfile());
+
+    // the max timestamp in the data is around July 7, 2018
+    assign("maxTimestamp", "1530978728982L");
+
+    // the 'window' looks up to 5 hours before the max timestamp
+    assign("window", "PROFILE_WINDOW('from 5 hours ago', maxTimestamp)");
+
+    assertTrue(execute("[14] == PROFILE_GET('count-by-ip', '192.168.66.1', 
window)", Boolean.class));
+    assertTrue(execute("[46] == PROFILE_GET('count-by-ip', '192.168.138.158', 
window)", Boolean.class));
+    assertTrue(execute("[60] == PROFILE_GET('total-count', 'total', window)", 
Boolean.class));
+  }
+
   /**
    * Validates the profiles that were built.
    *
@@ -263,6 +317,8 @@ public class BatchProfilerIntegrationTest {
    * @return The result of executing the Stellar expression.
    */
   private <T> T execute(String expression, Class<T> clazz) {
-    return executor.execute(expression, Collections.emptyMap(), clazz);
+    T results = executor.execute(expression, Collections.emptyMap(), clazz);
+    LOG.debug("{} = {}", expression, results);
+    return results;
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/fa3be8d3/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/TimestampParserTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/TimestampParserTest.java
 
b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/TimestampParserTest.java
new file mode 100644
index 0000000..f760b35
--- /dev/null
+++ 
b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/TimestampParserTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.spark;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.format.DateTimeParseException;
+import java.util.Optional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TimestampParserTest {
+
+  private TimestampParser parser;
+
+  @Before
+  public void setup() {
+    parser = new TimestampParser();
+  }
+
+  @Test
+  public void testEmpty() {
+    Optional<Long> millis = parser.parse("");
+    assertFalse(millis.isPresent());
+  }
+
+  @Test
+  public void testBlank() {
+    Optional<Long> millis = parser.parse("      ");
+    assertFalse(millis.isPresent());
+  }
+
+  @Test
+  public void testIsoInstantFormat() {
+    // ISO-8601 instant format
+    Optional<Long> millis = parser.parse("2011-12-03T10:15:30Z");
+    assertTrue(millis.isPresent());
+    assertEquals(1322907330000L, millis.get().longValue());
+  }
+
+  @Test(expected = DateTimeParseException.class)
+  public void testInvalidFormat() {
+    parser.parse("1537502400000");
+    fail("Expected exception");
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/fa3be8d3/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/MessageRouterFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/MessageRouterFunctionTest.java
 
b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/MessageRouterFunctionTest.java
index ceaa7cd..9a2cbf4 100644
--- 
a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/MessageRouterFunctionTest.java
+++ 
b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/MessageRouterFunctionTest.java
@@ -31,14 +31,17 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * Tests the {@link MessageRouterFunction}.
  */
 public class MessageRouterFunctionTest {
 
+  private Long messageTimestamp = 1537468508360L;
+
   /**
-   * { "ip_src_addr": "192.168.1.22" }
+   * { "ip_src_addr": "192.168.1.22", "timestamp": 1537468508360 }
    */
   @Multiline
   private String goodMessage;
@@ -48,9 +51,15 @@ public class MessageRouterFunctionTest {
    */
   private String badMessage;
 
+  /**
+   * { "ip_src_addr": "192.168.1.22" }
+   */
+  @Multiline
+  private String messageNoTimestamp;
+
   @Test
   public void testFindRoutes() throws Exception {
-    MessageRouterFunction function = new MessageRouterFunction(oneProfile(), 
getGlobals());
+    MessageRouterFunction function = new MessageRouterFunction(profile(), 
getGlobals());
     Iterator<MessageRoute> iter = function.call(goodMessage);
 
     List<MessageRoute> routes = Lists.newArrayList(iter);
@@ -58,14 +67,20 @@ public class MessageRouterFunctionTest {
     Assert.assertEquals("profile1", 
routes.get(0).getProfileDefinition().getProfile());
   }
 
-  /**
-   * A bad or invalid message should return no routes.
-   */
+  @Test(expected = IllegalStateException.class)
+  public void testWithSystemTime() throws Exception {
+    MessageRouterFunction function = new 
MessageRouterFunction(profileWithSystemTime(), getGlobals());
+    Iterator<MessageRoute> iter = function.call(goodMessage);
+
+    Assert.fail("Exception expected as system time is not supported.");
+  }
+
   @Test
   public void testWithBadMessage() throws Exception {
-    MessageRouterFunction function = new MessageRouterFunction(oneProfile(), 
getGlobals());
+    MessageRouterFunction function = new MessageRouterFunction(profile(), 
getGlobals());
     Iterator<MessageRoute> iter = function.call(badMessage);
 
+    // an invalid message should return no routes
     List<MessageRoute> routes = Lists.newArrayList(iter);
     Assert.assertEquals(0, routes.size());
   }
@@ -81,7 +96,88 @@ public class MessageRouterFunctionTest {
     Assert.assertEquals("profile2", 
routes.get(1).getProfileDefinition().getProfile());
   }
 
-  private ProfilerConfig oneProfile() {
+  @Test
+  public void testWithNoTimestampInMessage() throws Exception {
+    MessageRouterFunction function = new MessageRouterFunction(profile(), 
getGlobals());
+    Iterator<MessageRoute> iter = function.call(messageNoTimestamp);
+
+    // with no timestamp, the message should be ignored
+    List<MessageRoute> routes = Lists.newArrayList(iter);
+    Assert.assertEquals(0, routes.size());
+  }
+
+  @Test
+  public void testMessageFilteredByBegin() throws Exception {
+    MessageRouterFunction function = new MessageRouterFunction(profile(), 
getGlobals())
+            .withBegin(messageTimestamp + 1000);
+    Iterator<MessageRoute> iter = function.call(goodMessage);
+
+    // the message should be filtered because it is before `beginAt`
+    List<MessageRoute> routes = Lists.newArrayList(iter);
+    Assert.assertEquals(0, routes.size());
+  }
+
+  @Test
+  public void testMessageNotFilteredByBegin() throws Exception {
+    MessageRouterFunction function = new MessageRouterFunction(profile(), 
getGlobals())
+            .withBegin(messageTimestamp - 1000);
+    Iterator<MessageRoute> iter = function.call(goodMessage);
+
+    // the message should NOT be filtered because it is after 'beginAt'
+    List<MessageRoute> routes = Lists.newArrayList(iter);
+    Assert.assertEquals(1, routes.size());
+  }
+
+  @Test
+  public void testMessageFilteredByEnd() throws Exception {
+    MessageRouterFunction function = new MessageRouterFunction(profile(), 
getGlobals())
+            .withEnd(messageTimestamp - 1000);
+    Iterator<MessageRoute> iter = function.call(goodMessage);
+
+    // the message should be filtered because it is after 'endAt'
+    List<MessageRoute> routes = Lists.newArrayList(iter);
+    Assert.assertEquals(0, routes.size());
+  }
+
+  @Test
+  public void testMessageNotFilteredByEnd() throws Exception {
+    MessageRouterFunction function = new MessageRouterFunction(profile(), 
getGlobals())
+            .withEnd(messageTimestamp + 1000);
+    Iterator<MessageRoute> iter = function.call(goodMessage);
+
+    // the message should NOT be filtered because it is before 'endAt'
+    List<MessageRoute> routes = Lists.newArrayList(iter);
+    Assert.assertEquals(1, routes.size());
+  }
+
+  @Test
+  public void testMessageFilteredByBeginAndEnd() throws Exception {
+    MessageRouterFunction function = new MessageRouterFunction(profile(), 
getGlobals())
+            .withBegin(messageTimestamp + 1000)
+            .withEnd(messageTimestamp + 2000);
+    Iterator<MessageRoute> iter = function.call(goodMessage);
+
+    // the message should be filtered because it is outside of [beginAt, endAt]
+    List<MessageRoute> routes = Lists.newArrayList(iter);
+    Assert.assertEquals(0, routes.size());
+  }
+
+  @Test
+  public void testMessageNotFilteredByBeginAndEnd() throws Exception {
+    MessageRouterFunction function = new MessageRouterFunction(profile(), 
getGlobals())
+            .withBegin(messageTimestamp - 1000)
+            .withEnd(messageTimestamp + 1000);
+    Iterator<MessageRoute> iter = function.call(goodMessage);
+
+    // the message should NOT be filtered because it is after 'endAt'
+    List<MessageRoute> routes = Lists.newArrayList(iter);
+    Assert.assertEquals(1, routes.size());
+  }
+
+  /**
+   * Creates a profiler definition, using event time, and containing one 
profile.
+   */
+  private ProfilerConfig profile() {
     ProfileConfig profile = new ProfileConfig()
             .withProfile("profile1")
             .withForeach("ip_src_addr")
@@ -89,9 +185,13 @@ public class MessageRouterFunctionTest {
             .withResult("count");
 
     return new ProfilerConfig()
-            .withProfile(profile);
+            .withProfile(profile)
+            .withTimestampField(Optional.of("timestamp"));
   }
 
+  /**
+   * Creates a profiler definition, using event time, and containing two 
profiles.
+   */
   private ProfilerConfig twoProfiles() {
     ProfileConfig profile1 = new ProfileConfig()
             .withProfile("profile1")
@@ -105,7 +205,22 @@ public class MessageRouterFunctionTest {
             .withResult("count");
     return new ProfilerConfig()
             .withProfile(profile1)
-            .withProfile(profile2);
+            .withProfile(profile2)
+            .withTimestampField(Optional.of("timestamp"));
+  }
+
+  /**
+   * Creates a profiler definition using system time.
+   */
+  private ProfilerConfig profileWithSystemTime() {
+    ProfileConfig profile = new ProfileConfig()
+            .withProfile("profile1")
+            .withForeach("ip_src_addr")
+            .withUpdate("count", "count + 1")
+            .withResult("count");
+
+    return new ProfilerConfig()
+            .withProfile(profile);
   }
 
   private Map<String, String> getGlobals() {

Reply via email to