GitHub user nickwallen reopened a pull request: https://github.com/apache/metron/pull/1191
METRON-1772 Support alternative input formats in the Batch Profiler [Feature Branch] By default, the Batch Profiler supports the text/json that Metron lands in HDFS as the source of the archived telemetry. Of course, this is not the best option for archiving telemetry in many cases and users may choose to store it in alternative formats. Alternatives like ORC should be supported when reading the input telemetry in the Batch Profiler. The user should be able to customize the profiler based on how they have chosen to archive their telemetry. - Updated README to describe how to read alternative input formats. - Added an additional command line option that allows the user to pass custom options to the `DataFrameReader`. This may be needed by a user depending on how the telemetry is archived. - For example, this allows the user to pass reader options like `quote`, `nullValue`, etc needed by [csv](https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/sql/DataFrameReader.html#csv-java.lang.String...-) or `allowSingleQuote`, `allowComments` needed by [json](https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/sql/DataFrameReader.html#json-scala.collection.Seq-) - Added an integration test that validates that the Batch Profiler can read ORC data. - Added an integration test that validates that the Batch Profiler can read CSV data. I added CSV as a test so that I could validate the user providing custom options to the `DataFrameReader`. This is a pull request against the `METRON-1699-create-batch-profiler` feature branch. This is dependent on the following PRs. By filtering on the last commit, this PR can be reviewed before the others are reviewed and merged. - [ ] #1189 ## Testing 1. Stand-up a development environment. ``` cd metron-deployment/development/centos6 vagrant up vagrant ssh sudo su - ``` 1. Validate the environment by ensuring alerts are visible within the Alerts UI and that the Metron Service Check in Ambari passes. 1. Allow some telemetry to be archived in HDFS. ``` [root@node1 ~]# hdfs dfs -cat /apps/metron/indexing/indexed/*/* | wc -l 6916 ``` 1. Shutdown Metron topologies, Storm, Elasticsearch, Kibana, MapReduce2 to free up some resources on the VM. 1. Use Ambari to install Spark (version 2.3+). Actions > Add Service > Spark2 1. Make sure Spark can talk to HBase. ``` SPARK_HOME=/usr/hdp/current/spark2-client cp /usr/hdp/current/hbase-client/conf/hbase-site.xml $SPARK_HOME/conf/ ``` 1. Follow the [Getting Started](https://github.com/apache/metron/tree/feature/METRON-1699-create-batch-profiler/metron-analytics/metron-profiler-spark#getting-started) section of the README to seed a basic profile using the text/json telemetry that is archived in HDFS. 1. Create the Profile. ``` [root@node1 ~]# source /etc/default/metron [root@node1 ~]# cat $METRON_HOME/config/zookeeper/profiler.json { "profiles": [ { "profile": "hello-world", "foreach": "'global'", "init": { "count": "0" }, "update": { "count": "count + 1" }, "result": "count" } ], "timestampField": "timestamp" } ``` 1. Edit the Batch Profiler properties. to point it at the correct input path (changed localhost:9000 to localhost:8020). ``` [root@node1 ~]# cat /usr/metron/0.5.1/config/batch-profiler.properties spark.app.name=Batch Profiler spark.master=local spark.sql.shuffle.partitions=8 profiler.batch.input.path=hdfs://localhost:8020/apps/metron/indexing/indexed/*/* profiler.batch.input.format=text profiler.period.duration=15 profiler.period.duration.units=MINUTES ``` 1. Edit logging as you see fit. For example, set Spark logging to WARN and Profiler logging to DEBUG. This is described in the README. 1. Run the Batch Profiler. ``` $METRON_HOME/bin/start_batch_profiler.sh ``` 1. Launch the Stellar REPL and retrieve the profile data. Save this result as it will be used for validation in subsequent steps. ``` [root@node1 ~]# $METRON_HOME/bin/stellar -z $ZOOKEEPER ... Stellar, Go! Functions are loading lazily in the background and will be unavailable until loaded fully. ... [Stellar]>>> window := PROFILE_FIXED(2, "HOURS") [ProfilePeriod{period=1707332, durationMillis=900000}, ProfilePeriod{period=1707333, durationMillis=900000}, ProfilePeriod{period=1707334, durationMillis=900000}, ProfilePeriod{period=1707335, durationMillis=900000}, ProfilePeriod{period=1707336, durationMillis=900000}, ProfilePeriod{period=1707337, durationMillis=900000}, ProfilePeriod{period=1707338, durationMillis=900000}, ProfilePeriod{period=1707339, durationMillis=900000}, ProfilePeriod{period=1707340, durationMillis=900000}] [Stellar]>>> PROFILE_GET("hello-world","global", window) [1020, 5066, 830] ``` 1. Delete the profiler data. ``` echo "truncate 'profiler'" | hbase shell ``` 1. Create a new directory in HDFS for the ORC data that we are about to generate. ``` export HADOOP_USER_NAME=hdfs hdfs dfs -mkdir /apps/metron/indexing/orc hdfs dfs -chown metron:hadoop /apps/metron/indexing/orc ``` 1. You may need to also create this directory for Spark. ``` export HADOOP_USER_NAME=hdfs hdfs dfs -mkdir /spark2-history ``` 1. Launch the Spark shell ``` export SPARK_MAJOR_VERSION=2 export HADOOP_USER_NAME=hdfs spark-shell ``` 1. Use the Spark Shell to transform the text/json telemetry to ORC. ``` scala> val jsonPath = "hdfs://localhost:8020/apps/metron/indexing/indexed/*/*" jsonPath: String = hdfs://localhost:8020/apps/metron/indexing/indexed/*/* scala> val orcPath = "hdfs://localhost:8020/apps/metron/orc/" orcPath: String = hdfs://localhost:8020/apps/metron/orc/ scala> val msgs = spark.read.format("text").load(jsonPath).as[String] msgs: org.apache.spark.sql.Dataset[String] = [value: string] scala> msgs.count res0: Long = 6916 scala> msgs.write.mode("overwrite").format("org.apache.spark.sql.execution.datasources.orc").save(orcPath) scala> spark.read.format("org.apache.spark.sql.execution.datasources.orc").load(orcPath).as[String].count res3: Long = 6916 ``` 1. Edit `$METRON_HOME/config/batch-profiler.properties` so that the Batch Profiler consumes that telemetry stored as ORC. ``` [root@node1 ~]# cat /usr/metron/0.5.1/config/batch-profiler.properties spark.app.name=Batch Profiler spark.master=local spark.sql.shuffle.partitions=8 profiler.batch.input.path=hdfs://localhost:8020/apps/metron/orc/ profiler.batch.input.format=org.apache.spark.sql.execution.datasources.orc profiler.period.duration=15 profiler.period.duration.units=MINUTES ``` 1. Again, run the Batch Profiler again. It will now consume the ORC data. ``` $METRON_HOME/bin/start_batch_profiler.sh ``` 1. Again, launch the Stellar REPL and retrieve the profile data. The data should match the previous profile data that was generated using the test/json telemetry. ``` [root@node1 ~]# $METRON_HOME/bin/stellar -z $ZOOKEEPER ... Stellar, Go! Functions are loading lazily in the background and will be unavailable until loaded fully. ... [Stellar]>>> window := PROFILE_FIXED(2, "HOURS") [ProfilePeriod{period=1707332, durationMillis=900000}, ProfilePeriod{period=1707333, durationMillis=900000}, ProfilePeriod{period=1707334, durationMillis=900000}, ProfilePeriod{period=1707335, durationMillis=900000}, ProfilePeriod{period=1707336, durationMillis=900000}, ProfilePeriod{period=1707337, durationMillis=900000}, ProfilePeriod{period=1707338, durationMillis=900000}, ProfilePeriod{period=1707339, durationMillis=900000}, ProfilePeriod{period=1707340, durationMillis=900000}] [Stellar]>>> PROFILE_GET("hello-world","global", window) [1020, 5066, 830] ``` 1. Notice that the output is exactly the same no matter which input format we have used. ## Pull Request Checklist - [x] Have you included steps to reproduce the behavior or problem that is being changed or addressed? - [x] Have you included steps or a guide to how the change may be verified and tested manually? - [x] Have you ensured that the full suite of tests and checks have been executed in the root metron folder via: - [x] Have you written or updated unit tests and or integration tests to verify your changes? - [x] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [x] Have you verified the basic functionality of the build by building and running locally with Vagrant full-dev environment or the equivalent? You can merge this pull request into a Git repository by running: $ git pull https://github.com/nickwallen/metron METRON-1772 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/metron/pull/1191.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1191 ---- commit eb419d8bdc6e60900bb08142d4d2bb50b2bd8c23 Author: Nick Allen <nick@...> Date: 2018-09-06T18:47:28Z METRON-1770 Add Docs for Running the Profiler with Spark on YARN commit b17e328fed0f3f51f14383ae48cb526b0993d314 Author: Nick Allen <nick@...> Date: 2018-09-07T21:07:44Z METRON-1772 Support alternative input formats in the Batch Profiler commit 0bd24b02b953b65d7c4b36283973f71ce4715f45 Author: Nick Allen <nick@...> Date: 2018-09-10T16:15:54Z Fixed dependencies to avoid clash between Antlr versions commit 185cf6d9930de215c45f7cc228613daee4fc335f Author: Nick Allen <nick@...> Date: 2018-09-10T18:04:23Z Improved README for using alternative input formats commit 6f165632d10eb9e8b2ad0414d77ec4ec562ead28 Author: Nick Allen <nick@...> Date: 2018-09-10T21:28:37Z Trying to fix IT errors with Antlr commit 31e7b071e911d867806a77357522442f38bc6884 Author: Nick Allen <nick@...> Date: 2018-09-10T21:34:32Z Merge remote-tracking branch 'apache/feature/METRON-1699-create-batch-profiler' into METRON-1772 commit 2cbdb84e323ff0f2ece98659f4be94843bf7f2d5 Author: Nick Allen <nick@...> Date: 2018-09-18T17:37:38Z Merge branch 'feature/METRON-1699-create-batch-profiler' into METRON-1772 ---- ---