METRON-1230: As a stopgap prior to METRON-777, add more simplistic sideloading of custom Parsers closes apache/incubator-metron#785
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/a262f87f Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/a262f87f Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/a262f87f Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual Commit: a262f87f0c31637d76a77a538976673c87188eeb Parents: 5d3e73a Author: cstella <ceste...@gmail.com> Authored: Tue Jan 16 12:37:59 2018 -0500 Committer: cstella <ceste...@gmail.com> Committed: Tue Jan 16 12:37:59 2018 -0500 ---------------------------------------------------------------------- metron-interface/metron-rest/pom.xml | 3 + .../metron/rest/MetronRestApplication.java | 2 + .../impl/SensorParserConfigServiceImpl.java | 20 +- .../apache/metron/rest/util/ParserIndex.java | 92 ++++++ .../metron-rest/src/main/scripts/metron-rest.sh | 17 + .../metron-parsers/3rdPartyParser.md | 330 +++++++++++++++++++ metron-platform/metron-parsers/pom.xml | 6 + .../topology/MergeAndShadeTransformer.java | 101 ++++++ .../parsers/topology/ParserTopologyCLI.java | 18 +- .../src/main/scripts/start_parser_topology.sh | 9 +- 10 files changed, 576 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/a262f87f/metron-interface/metron-rest/pom.xml ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/pom.xml b/metron-interface/metron-rest/pom.xml index 302b784..44bad97 100644 --- a/metron-interface/metron-rest/pom.xml +++ b/metron-interface/metron-rest/pom.xml @@ -189,6 +189,7 @@ <groupId>org.apache.metron</groupId> <artifactId>metron-parsers</artifactId> <version>${project.parent.version}</version> + <scope>provided</scope> <exclusions> <exclusion> <groupId>org.apache.hbase</groupId> @@ -317,6 +318,7 @@ <groupId>org.apache.metron</groupId> <artifactId>metron-indexing</artifactId> <version>${project.parent.version}</version> + <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.metron</groupId> @@ -417,6 +419,7 @@ <goal>shade</goal> </goals> <configuration> + <createDependencyReducedPom>false</createDependencyReducedPom> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.handlers</resource> http://git-wip-us.apache.org/repos/asf/metron/blob/a262f87f/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestApplication.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestApplication.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestApplication.java index ac89b1d..5135849 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestApplication.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestApplication.java @@ -17,6 +17,7 @@ */ package org.apache.metron.rest; +import org.apache.metron.rest.util.ParserIndex; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -24,6 +25,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; public class MetronRestApplication { public static void main(String[] args) { + ParserIndex.reload(); SpringApplication.run(MetronRestApplication.class, args); } } http://git-wip-us.apache.org/repos/asf/metron/blob/a262f87f/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java index c460e3c..85b84b8 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java @@ -38,10 +38,12 @@ import org.apache.metron.rest.RestException; import org.apache.metron.rest.model.ParseMessageRequest; import org.apache.metron.rest.service.GrokService; import org.apache.metron.rest.service.SensorParserConfigService; +import org.apache.metron.rest.util.ParserIndex; import org.apache.metron.common.zookeeper.ZKConfigurationsCache; import org.apache.zookeeper.KeeperException; import org.json.simple.JSONObject; import org.reflections.Reflections; +import org.reflections.util.ConfigurationBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -118,29 +120,15 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService @Override public Map<String, String> getAvailableParsers() { - if (availableParsers == null) { - availableParsers = new HashMap<>(); - Set<Class<? extends MessageParser>> parserClasses = getParserClasses(); - parserClasses.forEach(parserClass -> { - if (!"BasicParser".equals(parserClass.getSimpleName())) { - availableParsers.put(parserClass.getSimpleName().replaceAll("Basic|Parser", ""), - parserClass.getName()); - } - }); - } - return availableParsers; + return ParserIndex.INSTANCE.getIndex(); } @Override public Map<String, String> reloadAvailableParsers() { - availableParsers = null; + ParserIndex.INSTANCE.reload(); return getAvailableParsers(); } - private Set<Class<? extends MessageParser>> getParserClasses() { - Reflections reflections = new Reflections("org.apache.metron.parsers"); - return reflections.getSubTypesOf(MessageParser.class); - } @Override public JSONObject parseMessage(ParseMessageRequest parseMessageRequest) throws RestException { http://git-wip-us.apache.org/repos/asf/metron/blob/a262f87f/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/util/ParserIndex.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/util/ParserIndex.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/util/ParserIndex.java new file mode 100644 index 0000000..ef9c83c --- /dev/null +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/util/ParserIndex.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.rest.util; + +import org.apache.metron.parsers.interfaces.MessageParser; +import org.reflections.Reflections; +import org.reflections.util.ClasspathHelper; +import org.reflections.util.ConfigurationBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; +import java.net.URL; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * Index the parsers. Analyzing the classpath is a costly operation, so caching it makes sense. + * Eventually, we will probably want to have a timer that periodically reindexes so that new parsers show up. + */ +public enum ParserIndex { + INSTANCE; + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static Set<Class<? extends MessageParser>> index; + private static Map<String, String> availableParsers ; + + static { + load(); + } + + public synchronized Map<String, String> getIndex() { + if(availableParsers == null) { + load(); + } + return availableParsers; + } + + public synchronized Set<Class<? extends MessageParser>> getClasses() { + if(index == null) { + load(); + } + return index; + } + + public static void reload() { + load(); + } + + /** + * To handle the situation where classpath is specified in the manifest of the jar, we have to augment the URLs. + * This happens as part of the surefire plugin as well as elsewhere in the wild (especially in maven when running tests. ;). + * @param classLoaders + * @return A collection of URLs representing the effective classpath URLs + */ + private static Collection<URL> effectiveClassPathUrls(ClassLoader... classLoaders) { + return ClasspathHelper.forManifest(ClasspathHelper.forClassLoader(classLoaders)); + } + + private static synchronized void load() { + LOG.debug("Starting Parser Index Load"); + ClassLoader classLoader = ParserIndex.class.getClassLoader(); + Reflections reflections = new Reflections(new ConfigurationBuilder().setUrls(effectiveClassPathUrls(classLoader))); + Set<Class<? extends MessageParser>> indexLoc = reflections.getSubTypesOf(MessageParser.class); + Map<String, String> availableParsersLoc = new HashMap<>(); + indexLoc.forEach(parserClass -> { + if (!"BasicParser".equals(parserClass.getSimpleName())) { + availableParsersLoc.put(parserClass.getSimpleName().replaceAll("Basic|Parser", ""), + parserClass.getName()); + } + }); + LOG.debug("Finished Parser Index Load; found {} parsers, indexed {} parsers", indexLoc.size(), availableParsersLoc.size()); + index = indexLoc; + availableParsers = availableParsersLoc; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/a262f87f/metron-interface/metron-rest/src/main/scripts/metron-rest.sh ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/scripts/metron-rest.sh b/metron-interface/metron-rest/src/main/scripts/metron-rest.sh index 638589a..f9a2b69 100644 --- a/metron-interface/metron-rest/src/main/scripts/metron-rest.sh +++ b/metron-interface/metron-rest/src/main/scripts/metron-rest.sh @@ -21,6 +21,12 @@ if [ -z "${METRON_JDBC_PASSWORD}" ]; then echo "METRON_JDBC_PASSWORD unset. Exiting." exit 1 fi +## Join a list by a character +function join_by { + local IFS="$1" + shift + echo "$*" +} METRON_VERSION=${project.version} METRON_HOME="${METRON_HOME:-/usr/metron/${METRON_VERSION}}" @@ -29,6 +35,8 @@ METRON_REST_PORT=8082 METRON_SYSCONFIG="${METRON_SYSCONFIG:-/etc/default/metron}" METRON_LOG_DIR="${METRON_LOG_DIR:-/var/log/metron}" METRON_PID_FILE="${METRON_PID_FILE:-/var/run/metron/metron-rest.pid}" +PARSER_CONTRIB=${PARSER_CONTRIB:-$METRON_HOME/parser_contrib} +PARSER_LIB=$(find $METRON_HOME/lib/ -name metron-parsers*.jar) echo "METRON_VERSION=${METRON_VERSION}" echo "METRON_HOME=${METRON_HOME}" @@ -47,6 +55,15 @@ rest_jar_pattern="${METRON_HOME}/lib/metron-rest*.jar" rest_files=( ${rest_jar_pattern} ) echo "Default metron-rest jar is: ${rest_files[0]}" METRON_REST_CLASSPATH+=":${rest_files[0]}" +METRON_REST_CLASSPATH+=":$PARSER_LIB" + +if [ -d "$PARSER_CONTRIB" ]; then + contrib_jar_pattern="${PARSER_CONTRIB}/*.jar" + contrib_list=( $contrib_jar_pattern ) # expand the glob to a list + contrib_classpath=$(join_by : "${contrib_list[@]}") #join the list by a colon + echo "Parser Contrib jars are: $contrib_classpath" + METRON_REST_CLASSPATH+=":${contrib_classpath}" +fi echo "METRON_SPRING_PROFILES_ACTIVE=${METRON_SPRING_PROFILES_ACTIVE}" http://git-wip-us.apache.org/repos/asf/metron/blob/a262f87f/metron-platform/metron-parsers/3rdPartyParser.md ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/3rdPartyParser.md b/metron-platform/metron-parsers/3rdPartyParser.md new file mode 100644 index 0000000..61095ea --- /dev/null +++ b/metron-platform/metron-parsers/3rdPartyParser.md @@ -0,0 +1,330 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Custom Metron Parsers + +We have many stock parsers for normal operations. Some of these are +networking and cybersecurity focused (e.g. the ASA Parser), some of +these are general purpose (e.g. the CSVParser), but inevitably users +will want to extend the system to process their own data formats. To +enable this, this is a walkthrough of how to create and use a custom +parser within Metron. + +# Writing A Custom Parser +Before we can use a parser, we will need to create a custom parser. The +parser is the workhorse of Metron ingest. It provides the mapping +between the raw data coming in via the Kafka value and a `JSONObject`, +the internal data structure provided. + +## Implementation + +In order to do create a custom parser, we need to do one of the following: +* Write a class which conforms to the `org.apache.metron.parsers.interfaces.MessageParser<JSONObject>` and `java.util.Serializable` interfaces + * Implement `init()`, `validate(JSONObject message)`, and `List<JSONObject> parse(byte[] rawMessage)` +* Write a class which extends `org.apache.metron.parsers.BasicParser` + * Provides convenience implementations to `validate` which ensures `timestamp` and `original_string` fields exist. + +## Example + +In order to illustrate how this might be done, let's create a very +simple parser that takes a comma separated pair and creates a couple of +fields: +* `original_string` -- the raw data +* `timestamp` -- the current time +* `first` -- the first field of the comma separated pair +* `last` -- the last field of the comma separated pair + +For this demonstration, let's create a maven project to compile our +project. We'll call it `extra_parsers`, so in your workspace, let's set +up the maven project: +* Create the maven infrastructure for `extra_parsers` via +``` +mkdir -p extra_parsers/src/{main,test}/java +``` +* Create a pom file indicating how we should build our parsers by + editing `extra_parsers/pom.xml` with the following content: +``` +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>com.3rdparty</groupId> + <artifactId>extra-parsers</artifactId> + <packaging>jar</packaging> + <version>1.0-SNAPSHOT</version> + <name>extra-parsers</name> + <url>http://thirdpartysoftware.org</url> + <properties> + <!-- The java version to conform to. Metron works all the way to 1.8 --> + <java_version>1.8</java_version> + <!-- The version of Metron that we'll be targetting. --> + <metron_version>0.4.1</metron_version> + <!-- To complete the simulation, we'll depend on a common dependency --> + <guava_version>19.0</guava_version> + <!-- We will shade our dependencies to create a single jar at the end --> + <shade_version>2.4.3</shade_version> + </properties> + <dependencies> + <!-- + We want to depend on Metron, but ensure that the scope is "provided" + as we do not want to include it in our bundle. + --> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-parsers</artifactId> + <version>${metron_version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava_version}</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>3.8.1</version> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <plugins> + <!-- We will set up the shade plugin to create a single jar at the + end of the build lifecycle. We will exclude some things and + relocate others to simulate a real situation. + + One thing to note is that it's a good practice to shade and + relocate common libraries that may be dependencies in Metron. + Your jar will be merged with the parsers jar, so the metron + version will be included for all overlapping classes. + So, shade and relocate to ensure that YOUR version of the library is used. + --> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>${shade_version}</version> + <configuration> + <createDependencyReducedPom>true</createDependencyReducedPom> + <artifactSet> + <excludes> + <!-- Exclude slf4j for no reason other than to illustrate how to exclude dependencies. + The metron team has nothing against slf4j. :-) + --> + <exclude>*slf4j*</exclude> + </excludes> + </artifactSet> + </configuration> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <shadedArtifactAttached>true</shadedArtifactAttached> + <shadedClassifierName>uber</shadedClassifierName> + <filters> + <filter> + <!-- Sometimes these get added and confuse the uber jar out of shade --> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + <relocations> + <!-- Relocate guava as it's used in Metron and I really want 0.19 --> + <relocation> + <pattern>com.google</pattern> + <shadedPattern>com.thirdparty.guava</shadedPattern> + </relocation> + </relocations> + <artifactSet> + <excludes> + <!-- We can also exclude by artifactId and groupId --> + <exclude>storm:storm-core:*</exclude> + <exclude>storm:storm-lib:*</exclude> + <exclude>org.slf4j.impl*</exclude> + <exclude>org.slf4j:slf4j-log4j*</exclude> + </excludes> + </artifactSet> + </configuration> + </execution> + </executions> + </plugin> + <!-- + We want to make sure we compile using java 1.8. + --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.5.1</version> + <configuration> + <forceJavacCompilerUse>true</forceJavacCompilerUse> + <source>${java_version}</source> + <compilerArgument>-Xlint:unchecked</compilerArgument> + <target>${java_version}</target> + <showWarnings>true</showWarnings> + </configuration> + </plugin> + </plugins> + </build> +</project> +``` +* Now let's create our parser `com.thirdparty.SimpleParser` by creating the file `extra-parsers/src/main/java/com/thirdparty/SimpleParser.java` with the following content: +``` +package com.thirdparty; + +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import org.apache.metron.parsers.BasicParser; +import org.json.simple.JSONObject; + +import java.util.List; +import java.util.Map; + +public class SimpleParser extends BasicParser { + @Override + public void init() { + + } + + @Override + public List<JSONObject> parse(byte[] bytes) { + String input = new String(bytes); + Iterable<String> it = Splitter.on(",").split(input); + JSONObject ret = new JSONObject(); + ret.put("original_string", input); + ret.put("timestamp", System.currentTimeMillis()); + ret.put("first", Iterables.getFirst(it, "missing")); + ret.put("last", Iterables.getLast(it, "missing")); + return ImmutableList.of(ret); + } + + @Override + public void configure(Map<String, Object> map) { + + } +} +``` +* Compile the parser via `mvn clean package` in `extra_parsers` + +This will create a jar containing your parser and its dependencies (sans Metron dependencies) in `extra-parsers/target/extra-parsers-1.0-SNAPSHOT-uber.jar` + +# Deploying Your Custom Parser + +In order to deploy your newly built custom parser, you would place the jar file above in the `$METRON_HOME/parser_contrib` directory on the Metron host (i.e. any host you would start parsers from or, alternatively, where the Metron REST is hosted). + +## Example + +Let's work through deploying the example above. + +### Preliminaries + +We assume that the following environment variables are set: +* `METRON_HOME` - the home directory for metron +* `ZOOKEEPER` - The zookeeper quorum (comma separated with port specified: e.g. `node1:2181` for full-dev) +* `BROKERLIST` - The Kafka broker list (comma separated with port specified: e.g. `node1:6667` for full-dev) +* `ES_HOST` - The elasticsearch master (and port) e.g. `node1:9200` for full-dev. + +Also, this does not assume that you are using a kerberized cluster. If you are, then the parser start command will adjust slightly to include the security protocol. + +### Copy the jar file up + +Copy the jar file located in `extra-parsers/target/extra-parsers-1.0-SNAPSHOT-uber.jar` to `$METRON_HOME/parser_contrib` and ensure the permissions are such that the `metron` user can read and execute. + +### Restart the REST service in Ambari + +In order for new parsers to be picked up, the REST service must be restarted. You can do that from within Ambari by restarting the `Metron REST` service. + +### Push the Zookeeper Configs + +Now push the config to Zookeeper with the following command: +`$METRON_HOME/bin/zk_load_configs.sh -m PUSH -i $METRON_HOME/config/zookeeper/ -z $ZOOKEEPER` + + +### Create a Kafka Topic + +Create a kafka topic, let's call it `test` via: +`/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create --topic test --partitions 1 --replication-factor 1` + +Note, in a real deployment, that topic would be named something more descriptive and would have replication factor and partitions set to something less trivial. + +### Configure Test Parser + +Create the a file called `$METRON_HOME/config/zookeeper/parsers/test.json` with the following content: +``` +{ + "parserClassName":"com.thirdparty.SimpleParser", + "sensorTopic":"test" +} +``` + +### Start Parser +Now we can start the parser and send some data through: +* Start the parser +``` +$METRON_HOME/bin/start_parser_topology.sh -k $BROKERLIST -z $ZOOKEEPER -s test +``` +* Send example data through: +``` +echo "apache,metron" | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $BROKERLIST --topic test +``` +* Validate data was written in ES: +``` +curl -XPOST "http://$ES_HOST/test*/_search?pretty" -d ' +{ + "_source" : [ "original_string", "timestamp", "first", "last"] +} +' +``` +This should yield something like: +``` +{ + "took" : 23, + "timed_out" : false, + "_shards" : { + "total" : 1, + "successful" : 1, + "failed" : 0 + }, + "hits" : { + "total" : 1, + "max_score" : 1.0, + "hits" : [ { + "_index" : "test_index_2017.10.04.17", + "_type" : "test_doc", + "_id" : "3ae4dd4d-8c09-4f2a-93c0-26ec5508baaa", + "_score" : 1.0, + "_source" : { + "original_string" : "apache,metron", + "last" : "metron", + "first" : "apache", + "timestamp" : 1507138373223 + } + } ] + } +} +``` + +### Via the Management UI + +As long as the REST service is restarted after new parsers are added to `$METRON_HOME/parser_contrib`, they are available in the UI to creating and deploying parsers. http://git-wip-us.apache.org/repos/asf/metron/blob/a262f87f/metron-platform/metron-parsers/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/pom.xml b/metron-platform/metron-parsers/pom.xml index 18377d3..f856654 100644 --- a/metron-platform/metron-parsers/pom.xml +++ b/metron-platform/metron-parsers/pom.xml @@ -125,6 +125,12 @@ </dependency> <dependency> <groupId>org.apache.storm</groupId> + <artifactId>storm-rename-hack</artifactId> + <version>${global_storm_version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${global_storm_version}</version> <scope>provided</scope> http://git-wip-us.apache.org/repos/asf/metron/blob/a262f87f/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/MergeAndShadeTransformer.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/MergeAndShadeTransformer.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/MergeAndShadeTransformer.java new file mode 100644 index 0000000..eaadf71 --- /dev/null +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/MergeAndShadeTransformer.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.parsers.topology; + +import com.google.common.base.Splitter; +import org.apache.storm.daemon.JarTransformer; +import org.apache.storm.hack.StormShadeTransformer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.lang.invoke.MethodHandles; +import java.util.HashSet; +import java.util.Set; +import java.util.jar.JarEntry; +import java.util.jar.JarInputStream; +import java.util.jar.JarOutputStream; + +/** + * This is a storm jar transformer that will add in additional jars pulled from an + * environment variable. The jars will be merged with the main uber jar and then + * the resulting jar will be shaded and relocated according to the StormShadeTransformer. + * + */ +public class MergeAndShadeTransformer implements JarTransformer { + public static final String EXTRA_JARS_ENV = "EXTRA_JARS"; + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private StormShadeTransformer underlyingTransformer = new StormShadeTransformer(); + @Override + public void transform(InputStream input, OutputStream output) throws IOException { + String extraJars = System.getenv().get(EXTRA_JARS_ENV); + if(extraJars == null || extraJars.length() == 0) { + underlyingTransformer.transform(input, output); + return; + } + File tmpFile = File.createTempFile("metron", "jar"); + tmpFile.deleteOnExit(); + Set<String> entries = new HashSet<>(); + try (JarOutputStream jout = new JarOutputStream(new BufferedOutputStream(new FileOutputStream(tmpFile)))) { + try (JarInputStream jin = new JarInputStream(new BufferedInputStream(input))){ + copy(jin, jout, entries); + } + for (String fileStr : Splitter.on(",").split(extraJars)) { + File f = new File(fileStr); + if (!f.exists()) { + continue; + } + LOG.info("Merging jar {} from {}", f.getName(), f.getAbsolutePath()); + try (JarInputStream jin = new JarInputStream(new BufferedInputStream(new FileInputStream(f)))) { + copy(jin, jout, entries); + } + } + } + underlyingTransformer.transform(new BufferedInputStream(new FileInputStream(tmpFile)), output); + } + + /** + * Merges two jars. The first jar will get merged into the output jar. + * A running set of jar entries is kept so that duplicates are skipped. + * This has the side-effect that the first instance of a given entry will be added + * and all subsequent entries are skipped. + * + * @param jin The input jar + * @param jout The output jar + * @param entries The set of existing entries. Note that this set will be mutated as part of this call. + * @return The set of entries. + * @throws IOException + */ + private Set<String> copy(JarInputStream jin, JarOutputStream jout, Set<String> entries) throws IOException { + byte[] buffer = new byte[1024]; + for(JarEntry entry = jin.getNextJarEntry(); entry != null; entry = jin.getNextJarEntry()) { + if(entries.contains(entry.getName())) { + continue; + } + LOG.debug("Merging jar entry {}", entry.getName()); + entries.add(entry.getName()); + jout.putNextEntry(entry); + int len = 0; + while( (len = jin.read(buffer)) > 0 ) { + jout.write(buffer, 0, len); + } + } + return entries; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/a262f87f/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java index b5ee628..4ce0508 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java @@ -23,10 +23,6 @@ import org.apache.metron.storm.kafka.flux.SpoutConfiguration; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; -import org.apache.storm.generated.AlreadyAliveException; -import org.apache.storm.generated.AuthorizationException; -import org.apache.storm.generated.InvalidTopologyException; -import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.utils.Utils; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Joiner; @@ -285,7 +281,19 @@ public class ParserTopologyCLI { } private static CommandLine parse(Options options, String[] args) { - CommandLineParser parser = new PosixParser(); + /* + * The general gist is that in order to pass args to storm jar, + * we have to disregard options that we don't know about in the CLI. + * Storm will ignore our args, we have to do the same. + */ + CommandLineParser parser = new PosixParser() { + @Override + protected void processOption(String arg, ListIterator iter) throws ParseException { + if(getOptions().hasOption(arg)) { + super.processOption(arg, iter); + } + } + }; try { return ParserOptions.parse(parser, args); } catch (ParseException pe) { http://git-wip-us.apache.org/repos/asf/metron/blob/a262f87f/metron-platform/metron-parsers/src/main/scripts/start_parser_topology.sh ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/scripts/start_parser_topology.sh b/metron-platform/metron-parsers/src/main/scripts/start_parser_topology.sh index 8faf89e..00ac809 100755 --- a/metron-platform/metron-parsers/src/main/scripts/start_parser_topology.sh +++ b/metron-platform/metron-parsers/src/main/scripts/start_parser_topology.sh @@ -19,4 +19,11 @@ METRON_VERSION=${project.version} METRON_HOME=/usr/metron/$METRON_VERSION TOPOLOGY_JAR=metron-parsers-$METRON_VERSION-uber.jar -storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.metron.parsers.topology.ParserTopologyCLI "$@" +PARSER_CONTRIB=${PARSER_CONTRIB:-$METRON_HOME/parser_contrib} +if [ -d "$PARSER_CONTRIB" ]; then + export STORM_EXT_CLASSPATH=$METRON_HOME/lib/$TOPOLOGY_JAR + export EXTRA_JARS=$(ls -m $PARSER_CONTRIB/*.jar | tr -d ' ' | tr -d '\n' | sed 's/\/\//\//g') + storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.metron.parsers.topology.ParserTopologyCLI "$@" -c client.jartransformer.class=org.apache.metron.parsers.topology.MergeAndShadeTransformer +else + storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.metron.parsers.topology.ParserTopologyCLI "$@" +fi