METRON-1772 Support alternative input formats in the Batch Profiler (nickwallen) closes apache/metron#1191
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/1545978e Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/1545978e Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/1545978e Branch: refs/heads/feature/METRON-1090-stellar-assignment Commit: 1545978e169a01e4a06735b8713c8fa65373a394 Parents: f83f0ac Author: nickwallen <n...@nickallen.org> Authored: Wed Sep 19 10:11:28 2018 -0400 Committer: nickallen <nickal...@apache.org> Committed: Wed Sep 19 10:11:28 2018 -0400 ---------------------------------------------------------------------- .../metron-profiler-spark/README.md | 47 +++++++++- metron-analytics/metron-profiler-spark/pom.xml | 18 ++-- .../metron/profiler/spark/BatchProfiler.java | 21 +++-- .../profiler/spark/cli/BatchProfilerCLI.java | 40 +++++++-- .../spark/cli/BatchProfilerCLIOptions.java | 10 ++- .../spark/BatchProfilerIntegrationTest.java | 91 +++++++++++++++++--- 6 files changed, 189 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/1545978e/metron-analytics/metron-profiler-spark/README.md ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-spark/README.md b/metron-analytics/metron-profiler-spark/README.md index 3d7017c..99e8c7e 100644 --- a/metron-analytics/metron-profiler-spark/README.md +++ b/metron-analytics/metron-profiler-spark/README.md @@ -131,6 +131,14 @@ The Batch Profiler requires Spark version 2.3.0+. ## Running the Profiler +* [Usage](#usage) +* [Advanced Usage](#advanced-usage) +* [Spark Execution](#spark-execution) +* [Kerberos](#kerberos) +* [Input Formats](#input-formats) + +### Usage + A script located at `$METRON_HOME/bin/start_batch_profiler.sh` has been provided to simplify running the Batch Profiler. This script makes the following assumptions. * The script builds the profiles defined in `$METRON_HOME/config/zookeeper/profiler.json`. @@ -156,11 +164,28 @@ The Batch Profiler accepts the following arguments when run from the command lin | Argument | Description |--- |--- -| -p, --profiles | The path to a file containing the profile definitions. -| -c, --config | The path to the profiler properties file. -| -g, --globals | The path to a properties file containing global properties. +| -p, --profiles | Path to the profile definitions. +| -c, --config | Path to the profiler properties file. +| -g, --globals | Path to the Stellar global config file. +| -r, --reader | Path to properties for the DataFrameReader. | -h, --help | Print the help text. +#### `--profiles` + +The path to a file containing the profile definition in JSON. + +#### `--config` + +The path to a file containing key-value properties for the Profiler. This file would contain the properties described under [Configuring the Profiler](#configuring-the-profiler). + +#### `--globals` + +The path to a file containing key-value properties that define the global properties. This can be used to customize how certain Stellar functions behave during execution. + +#### `--reader` + +The path to a file containing key-value properties that are passed to the DataFrameReader when reading the input telemetry. This allows additional customization for how the input telemetry is read. + ### Spark Execution Spark supports a number of different [cluster managers](https://spark.apache.org/docs/latest/cluster-overview.html#cluster-manager-types). The underlying cluster manager is transparent to the Profiler. To run the Profiler on a particular cluster manager, it is just a matter of setting the appropriate options as defined in the Spark documentation. @@ -191,10 +216,24 @@ The following command can be useful to review the logs generated when the Profil yarn logs -applicationId <application-id> ``` -#### Kerberos +### Kerberos See the Spark documentation for information on running the Batch Profiler in a [secure, kerberized cluster](https://spark.apache.org/docs/latest/running-on-yarn.html#running-in-a-secure-cluster). +### Input Formats + +The Profiler can consume archived telemetry stored in a variety of input formats. By default, it is configured to consume the text/json that Metron archives in HDFS. This is often not the best format for archiving telemetry. If you choose a different format, you should be able to configure the Profiler to consume it by doing the following. + +1. Edit [`profiler.batch.input.format`](#profilerbatchinputformat) and [`profiler.batch.input.path`](#profilerbatchinputpath) as needed. For example, to read ORC you might do the following. + + `$METRON_HOME/config/batch-profiler.properties` + ``` + profiler.batch.input.format=org.apache.spark.sql.execution.datasources.orc + profiler.batch.input.path=hdfs://localhost:9000/apps/metron/indexing/orc/\*/\* + ``` + +1. If additional options are required for your input format, then use the [`--reader`](#--reader) command-line argument when launching the Batch Profiler as [described here](#advanced-usage). + ## Configuring the Profiler http://git-wip-us.apache.org/repos/asf/metron/blob/1545978e/metron-analytics/metron-profiler-spark/pom.xml ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-spark/pom.xml b/metron-analytics/metron-profiler-spark/pom.xml index 587b38c..668ee2c 100644 --- a/metron-analytics/metron-profiler-spark/pom.xml +++ b/metron-analytics/metron-profiler-spark/pom.xml @@ -25,6 +25,7 @@ <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + <spark_antlr_version>4.7</spark_antlr_version> </properties> <dependencies> <dependency> @@ -36,12 +37,11 @@ <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${global_spark_version}</version> - <exclusions> - <exclusion> - <groupId>org.antlr</groupId> - <artifactId>antlr-runtime</artifactId> - </exclusion> - </exclusions> + </dependency> + <dependency> + <groupId>org.antlr</groupId> + <artifactId>antlr4-runtime</artifactId> + <version>${spark_antlr_version}</version> </dependency> <dependency> <groupId>org.apache.metron</groupId> @@ -53,6 +53,12 @@ <artifactId>metron-profiler-client</artifactId> <version>${project.parent.version}</version> <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.antlr</groupId> + <artifactId>antlr4-runtime</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.metron</groupId> http://git-wip-us.apache.org/repos/asf/metron/blob/1545978e/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java index f999613..d75abc3 100644 --- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java +++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java @@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; import java.lang.invoke.MethodHandles; +import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -54,25 +55,29 @@ public class BatchProfiler implements Serializable { * Execute the Batch Profiler. * * @param spark The spark session. - * @param properties The profiler configuration properties. + * @param profilerProps The profiler configuration properties. + * @param globalProperties The Stellar global properties. + * @param readerProps The properties passed to the {@link org.apache.spark.sql.DataFrameReader}. * @param profiles The profile definitions. * @return The number of profile measurements produced. */ public long run(SparkSession spark, - Properties properties, + Properties profilerProps, Properties globalProperties, + Properties readerProps, ProfilerConfig profiles) { LOG.debug("Building {} profile(s)", profiles.getProfiles().size()); Map<String, String> globals = Maps.fromProperties(globalProperties); - String inputFormat = TELEMETRY_INPUT_FORMAT.get(properties, String.class); - String inputPath = TELEMETRY_INPUT_PATH.get(properties, String.class); + String inputFormat = TELEMETRY_INPUT_FORMAT.get(profilerProps, String.class); + String inputPath = TELEMETRY_INPUT_PATH.get(profilerProps, String.class); LOG.debug("Loading telemetry from '{}'", inputPath); // fetch the archived telemetry Dataset<String> telemetry = spark .read() + .options(Maps.fromProperties(readerProps)) .format(inputFormat) .load(inputPath) .as(Encoders.STRING()); @@ -85,13 +90,13 @@ public class BatchProfiler implements Serializable { // build the profiles Dataset<ProfileMeasurementAdapter> measurements = routes - .groupByKey(new GroupByPeriodFunction(properties), Encoders.STRING()) - .mapGroups(new ProfileBuilderFunction(properties, globals), Encoders.bean(ProfileMeasurementAdapter.class)); + .groupByKey(new GroupByPeriodFunction(profilerProps), Encoders.STRING()) + .mapGroups(new ProfileBuilderFunction(profilerProps, globals), Encoders.bean(ProfileMeasurementAdapter.class)); LOG.debug("Produced {} profile measurement(s)", measurements.cache().count()); // write the profile measurements to HBase long count = measurements - .mapPartitions(new HBaseWriterFunction(properties), Encoders.INT()) + .mapPartitions(new HBaseWriterFunction(profilerProps), Encoders.INT()) .agg(sum("value")) .head() .getLong(0); @@ -99,4 +104,4 @@ public class BatchProfiler implements Serializable { return count; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/metron/blob/1545978e/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java index bdcf231..29fe4a2 100644 --- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java +++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java @@ -37,9 +37,10 @@ import java.io.Serializable; import java.lang.invoke.MethodHandles; import java.util.Properties; -import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.CONFIGURATION_FILE; +import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.PROFILER_PROPS_FILE; import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.GLOBALS_FILE; import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.PROFILE_DEFN_FILE; +import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.READER_PROPS_FILE; import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.parse; /** @@ -54,7 +55,8 @@ import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.parse * metron-profiler-spark-<version>.jar \ * --config profiler.properties \ * --globals global.properties \ - * --profiles profiles.json + * --profiles profiles.json \ + * --reader reader.properties * }</pre> */ public class BatchProfilerCLI implements Serializable { @@ -63,6 +65,7 @@ public class BatchProfilerCLI implements Serializable { public static Properties globals; public static Properties profilerProps; + public static Properties readerProps; public static ProfilerConfig profiles; public static void main(String[] args) throws IOException, org.apache.commons.cli.ParseException { @@ -71,6 +74,7 @@ public class BatchProfilerCLI implements Serializable { profilerProps = handleProfilerProperties(commandLine); globals = handleGlobals(commandLine); profiles = handleProfileDefinitions(commandLine); + readerProps = handleReaderProperties(commandLine); // the batch profiler must use 'event time' if(!profiles.getTimestampField().isPresent()) { @@ -88,7 +92,7 @@ public class BatchProfilerCLI implements Serializable { .getOrCreate(); BatchProfiler profiler = new BatchProfiler(); - long count = profiler.run(spark, profilerProps, globals, profiles); + long count = profiler.run(spark, profilerProps, globals, readerProps, profiles); LOG.info("Profiler produced {} profile measurement(s)", count); } @@ -117,13 +121,31 @@ public class BatchProfilerCLI implements Serializable { */ private static Properties handleProfilerProperties(CommandLine commandLine) throws IOException { Properties config = new Properties(); - if(CONFIGURATION_FILE.has(commandLine)) { - String propertiesPath = CONFIGURATION_FILE.get(commandLine); + if(PROFILER_PROPS_FILE.has(commandLine)) { + String propertiesPath = PROFILER_PROPS_FILE.get(commandLine); LOG.info("Loading profiler properties from '{}'", propertiesPath); config.load(new FileInputStream(propertiesPath)); - LOG.info("Properties = {}", config.toString()); + LOG.info("Profiler properties = {}", config.toString()); + } + return config; + } + + /** + * Load the properties for the {@link org.apache.spark.sql.DataFrameReader}. + * + * @param commandLine The command line. + */ + private static Properties handleReaderProperties(CommandLine commandLine) throws IOException { + Properties config = new Properties(); + if(READER_PROPS_FILE.has(commandLine)) { + String readerPropsPath = READER_PROPS_FILE.get(commandLine); + + LOG.info("Loading reader properties from '{}'", readerPropsPath); + config.load(new FileInputStream(readerPropsPath)); + + LOG.info("Reader properties = {}", config.toString()); } return config; } @@ -171,4 +193,8 @@ public class BatchProfilerCLI implements Serializable { public static ProfilerConfig getProfiles() { return profiles; } -} \ No newline at end of file + + public static Properties getReaderProps() { + return readerProps; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/1545978e/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java index f5dfe12..d58728a 100644 --- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java +++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java @@ -36,12 +36,12 @@ import java.util.function.Supplier; public enum BatchProfilerCLIOptions { PROFILE_DEFN_FILE(() -> { - Option o = new Option("p", "profiles", true, "Path to a file containing profile definitions."); + Option o = new Option("p", "profiles", true, "Path to the profile definitions."); o.setRequired(true); return o; }), - CONFIGURATION_FILE(() -> { + PROFILER_PROPS_FILE(() -> { Option o = new Option("c", "config", true, "Path to the profiler properties file."); o.setRequired(false); return o; @@ -53,6 +53,12 @@ public enum BatchProfilerCLIOptions { return o; }), + READER_PROPS_FILE(() -> { + Option o = new Option("r", "reader", true, "Path to properties for the DataFrameReader."); + o.setRequired(false); + return o; + }), + HELP(() -> { Option o = new Option("h", "help", false, "Usage instructions."); o.setRequired(false); http://git-wip-us.apache.org/repos/asf/metron/blob/1545978e/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java index 376623c..87c4246 100644 --- a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java +++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java @@ -30,11 +30,14 @@ import org.apache.metron.stellar.common.StellarStatefulExecutor; import org.apache.metron.stellar.dsl.Context; import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver; import org.apache.spark.SparkConf; +import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.IOException; import java.util.Collections; @@ -82,8 +85,12 @@ public class BatchProfilerIntegrationTest { private static String profileJson; private static SparkSession spark; private Properties profilerProperties; + private Properties readerProperties; private StellarStatefulExecutor executor; + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + @BeforeClass public static void setupSpark() { SparkConf conf = new SparkConf() @@ -105,12 +112,9 @@ public class BatchProfilerIntegrationTest { @Before public void setup() { + readerProperties = new Properties(); profilerProperties = new Properties(); - // the input telemetry is read from the local filesystem - profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), "src/test/resources/telemetry.json"); - profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "text"); - // the output will be written to a mock HBase table String tableName = HBASE_TABLE_NAME.get(profilerProperties, String.class); String columnFamily = HBASE_COLUMN_FAMILY.get(profilerProperties, String.class); @@ -147,15 +151,80 @@ public class BatchProfilerIntegrationTest { * produced will center around this date. */ @Test - public void testBatchProfiler() throws Exception { - // run the batch profiler + public void testBatchProfilerWithJSON() throws Exception { + // the input telemetry is text/json stored in the local filesystem + profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), "src/test/resources/telemetry.json"); + profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "text"); + + BatchProfiler profiler = new BatchProfiler(); + profiler.run(spark, profilerProperties, getGlobals(), readerProperties, getProfile()); + + validateProfiles(); + } + + @Test + public void testBatchProfilerWithORC() throws Exception { + // re-write the test data as ORC + String pathToORC = tempFolder.getRoot().getAbsolutePath(); + spark.read() + .format("text") + .load("src/test/resources/telemetry.json") + .as(Encoders.STRING()) + .write() + .mode("overwrite") + .format("org.apache.spark.sql.execution.datasources.orc") + .save(pathToORC); + + // tell the profiler to use the ORC input data + profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), pathToORC); + profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "org.apache.spark.sql.execution.datasources.orc"); + + BatchProfiler profiler = new BatchProfiler(); + profiler.run(spark, profilerProperties, getGlobals(), readerProperties, getProfile()); + + validateProfiles(); + } + + @Test + public void testBatchProfilerWithCSV() throws Exception { + // re-write the test data as a CSV with a header record + String pathToCSV = tempFolder.getRoot().getAbsolutePath(); + spark.read() + .format("text") + .load("src/test/resources/telemetry.json") + .as(Encoders.STRING()) + .write() + .mode("overwrite") + .option("header", "true") + .format("csv") + .save(pathToCSV); + + // tell the profiler to use the CSV input data + profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), pathToCSV); + profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "csv"); + + // set a reader property; tell the reader to expect a header + readerProperties.put("header", "true"); + BatchProfiler profiler = new BatchProfiler(); - profiler.run(spark, profilerProperties, getGlobals(), getProfile()); + profiler.run(spark, profilerProperties, getGlobals(), readerProperties, getProfile()); + + validateProfiles(); + } + + /** + * Validates the profiles that were built. + * + * These tests use the Batch Profiler to seed two profiles with archived telemetry. The first profile + * called 'count-by-ip', counts the number of messages by 'ip_src_addr'. The second profile called + * 'total-count', counts the total number of messages. + */ + private void validateProfiles() { + // the max timestamp in the data is around July 7, 2018 + assign("maxTimestamp", "1530978728982L"); - // validate the measurements written by the batch profiler using `PROFILE_GET` - // the 'window' looks up to 5 hours before the last timestamp contained in the telemetry - assign("lastTimestamp", "1530978728982L"); - assign("window", "PROFILE_WINDOW('from 5 hours ago', lastTimestamp)"); + // the 'window' looks up to 5 hours before the max timestamp + assign("window", "PROFILE_WINDOW('from 5 hours ago', maxTimestamp)"); // there are 26 messages where ip_src_addr = 192.168.66.1 assertTrue(execute("[26] == PROFILE_GET('count-by-ip', '192.168.66.1', window)", Boolean.class));