METRON-1230: As a stopgap prior to METRON-777, add more simplistic sideloading 
of custom Parsers closes apache/incubator-metron#785


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/a262f87f
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/a262f87f
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/a262f87f

Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual
Commit: a262f87f0c31637d76a77a538976673c87188eeb
Parents: 5d3e73a
Author: cstella <ceste...@gmail.com>
Authored: Tue Jan 16 12:37:59 2018 -0500
Committer: cstella <ceste...@gmail.com>
Committed: Tue Jan 16 12:37:59 2018 -0500

----------------------------------------------------------------------
 metron-interface/metron-rest/pom.xml            |   3 +
 .../metron/rest/MetronRestApplication.java      |   2 +
 .../impl/SensorParserConfigServiceImpl.java     |  20 +-
 .../apache/metron/rest/util/ParserIndex.java    |  92 ++++++
 .../metron-rest/src/main/scripts/metron-rest.sh |  17 +
 .../metron-parsers/3rdPartyParser.md            | 330 +++++++++++++++++++
 metron-platform/metron-parsers/pom.xml          |   6 +
 .../topology/MergeAndShadeTransformer.java      | 101 ++++++
 .../parsers/topology/ParserTopologyCLI.java     |  18 +-
 .../src/main/scripts/start_parser_topology.sh   |   9 +-
 10 files changed, 576 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/a262f87f/metron-interface/metron-rest/pom.xml
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/pom.xml 
b/metron-interface/metron-rest/pom.xml
index 302b784..44bad97 100644
--- a/metron-interface/metron-rest/pom.xml
+++ b/metron-interface/metron-rest/pom.xml
@@ -189,6 +189,7 @@
             <groupId>org.apache.metron</groupId>
             <artifactId>metron-parsers</artifactId>
             <version>${project.parent.version}</version>
+            <scope>provided</scope>
             <exclusions>
                 <exclusion>
                     <groupId>org.apache.hbase</groupId>
@@ -317,6 +318,7 @@
             <groupId>org.apache.metron</groupId>
             <artifactId>metron-indexing</artifactId>
             <version>${project.parent.version}</version>
+            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>
@@ -417,6 +419,7 @@
                             <goal>shade</goal>
                         </goals>
                         <configuration>
+                            
<createDependencyReducedPom>false</createDependencyReducedPom>
                             <transformers>
                                 <transformer 
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                     
<resource>META-INF/spring.handlers</resource>

http://git-wip-us.apache.org/repos/asf/metron/blob/a262f87f/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestApplication.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestApplication.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestApplication.java
index ac89b1d..5135849 100644
--- 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestApplication.java
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestApplication.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.rest;
 
+import org.apache.metron.rest.util.ParserIndex;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 
@@ -24,6 +25,7 @@ import 
org.springframework.boot.autoconfigure.SpringBootApplication;
 public class MetronRestApplication {
 
   public static void main(String[] args) {
+    ParserIndex.reload();
     SpringApplication.run(MetronRestApplication.class, args);
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/a262f87f/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
index c460e3c..85b84b8 100644
--- 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
@@ -38,10 +38,12 @@ import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.model.ParseMessageRequest;
 import org.apache.metron.rest.service.GrokService;
 import org.apache.metron.rest.service.SensorParserConfigService;
+import org.apache.metron.rest.util.ParserIndex;
 import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
 import org.apache.zookeeper.KeeperException;
 import org.json.simple.JSONObject;
 import org.reflections.Reflections;
+import org.reflections.util.ConfigurationBuilder;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -118,29 +120,15 @@ public class SensorParserConfigServiceImpl implements 
SensorParserConfigService
 
   @Override
   public Map<String, String> getAvailableParsers() {
-    if (availableParsers == null) {
-      availableParsers = new HashMap<>();
-      Set<Class<? extends MessageParser>> parserClasses = getParserClasses();
-      parserClasses.forEach(parserClass -> {
-        if (!"BasicParser".equals(parserClass.getSimpleName())) {
-          
availableParsers.put(parserClass.getSimpleName().replaceAll("Basic|Parser", ""),
-              parserClass.getName());
-        }
-      });
-    }
-    return availableParsers;
+    return ParserIndex.INSTANCE.getIndex();
   }
 
   @Override
   public Map<String, String> reloadAvailableParsers() {
-    availableParsers = null;
+    ParserIndex.INSTANCE.reload();
     return getAvailableParsers();
   }
 
-  private Set<Class<? extends MessageParser>> getParserClasses() {
-    Reflections reflections = new Reflections("org.apache.metron.parsers");
-    return reflections.getSubTypesOf(MessageParser.class);
-  }
 
   @Override
   public JSONObject parseMessage(ParseMessageRequest parseMessageRequest) 
throws RestException {

http://git-wip-us.apache.org/repos/asf/metron/blob/a262f87f/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/util/ParserIndex.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/util/ParserIndex.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/util/ParserIndex.java
new file mode 100644
index 0000000..ef9c83c
--- /dev/null
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/util/ParserIndex.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.util;
+
+import org.apache.metron.parsers.interfaces.MessageParser;
+import org.reflections.Reflections;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.net.URL;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Index the parsers.  Analyzing the classpath is a costly operation, so 
caching it makes sense.
+ * Eventually, we will probably want to have a timer that periodically 
reindexes so that new parsers show up.
+ */
+public enum ParserIndex {
+  INSTANCE;
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static Set<Class<? extends MessageParser>> index;
+  private static Map<String, String> availableParsers ;
+
+  static {
+    load();
+  }
+
+  public synchronized Map<String, String> getIndex() {
+    if(availableParsers == null) {
+      load();
+    }
+    return availableParsers;
+  }
+
+  public synchronized Set<Class<? extends MessageParser>> getClasses() {
+    if(index == null) {
+      load();
+    }
+    return index;
+  }
+
+  public static void reload() {
+    load();
+  }
+
+  /**
+   * To handle the situation where classpath is specified in the manifest of 
the jar, we have to augment the URLs.
+   * This happens as part of the surefire plugin as well as elsewhere in the 
wild (especially in maven when running tests. ;).
+   * @param classLoaders
+   * @return A collection of URLs representing the effective classpath URLs
+   */
+  private static Collection<URL> effectiveClassPathUrls(ClassLoader... 
classLoaders) {
+    return 
ClasspathHelper.forManifest(ClasspathHelper.forClassLoader(classLoaders));
+  }
+
+  private static synchronized void load() {
+    LOG.debug("Starting Parser Index Load");
+    ClassLoader classLoader = ParserIndex.class.getClassLoader();
+    Reflections reflections = new Reflections(new 
ConfigurationBuilder().setUrls(effectiveClassPathUrls(classLoader)));
+    Set<Class<? extends MessageParser>> indexLoc = 
reflections.getSubTypesOf(MessageParser.class);
+    Map<String, String> availableParsersLoc = new HashMap<>();
+    indexLoc.forEach(parserClass -> {
+      if (!"BasicParser".equals(parserClass.getSimpleName())) {
+        
availableParsersLoc.put(parserClass.getSimpleName().replaceAll("Basic|Parser", 
""),
+                parserClass.getName());
+      }
+    });
+    LOG.debug("Finished Parser Index Load; found {} parsers, indexed {} 
parsers", indexLoc.size(), availableParsersLoc.size());
+    index = indexLoc;
+    availableParsers = availableParsersLoc;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/a262f87f/metron-interface/metron-rest/src/main/scripts/metron-rest.sh
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/scripts/metron-rest.sh 
b/metron-interface/metron-rest/src/main/scripts/metron-rest.sh
index 638589a..f9a2b69 100644
--- a/metron-interface/metron-rest/src/main/scripts/metron-rest.sh
+++ b/metron-interface/metron-rest/src/main/scripts/metron-rest.sh
@@ -21,6 +21,12 @@ if [ -z "${METRON_JDBC_PASSWORD}" ]; then
     echo "METRON_JDBC_PASSWORD unset. Exiting."
     exit 1
 fi
+## Join a list by a character
+function join_by {
+  local IFS="$1"
+  shift
+  echo "$*" 
+}
 
 METRON_VERSION=${project.version}
 METRON_HOME="${METRON_HOME:-/usr/metron/${METRON_VERSION}}"
@@ -29,6 +35,8 @@ METRON_REST_PORT=8082
 METRON_SYSCONFIG="${METRON_SYSCONFIG:-/etc/default/metron}"
 METRON_LOG_DIR="${METRON_LOG_DIR:-/var/log/metron}"
 METRON_PID_FILE="${METRON_PID_FILE:-/var/run/metron/metron-rest.pid}"
+PARSER_CONTRIB=${PARSER_CONTRIB:-$METRON_HOME/parser_contrib}
+PARSER_LIB=$(find $METRON_HOME/lib/ -name metron-parsers*.jar)
 
 echo "METRON_VERSION=${METRON_VERSION}"
 echo "METRON_HOME=${METRON_HOME}"
@@ -47,6 +55,15 @@ rest_jar_pattern="${METRON_HOME}/lib/metron-rest*.jar"
 rest_files=( ${rest_jar_pattern} )
 echo "Default metron-rest jar is: ${rest_files[0]}"
 METRON_REST_CLASSPATH+=":${rest_files[0]}"
+METRON_REST_CLASSPATH+=":$PARSER_LIB"
+
+if [ -d "$PARSER_CONTRIB" ]; then
+  contrib_jar_pattern="${PARSER_CONTRIB}/*.jar"
+  contrib_list=( $contrib_jar_pattern ) # expand the glob to a list
+  contrib_classpath=$(join_by : "${contrib_list[@]}") #join the list by a colon
+  echo "Parser Contrib jars are: $contrib_classpath"
+  METRON_REST_CLASSPATH+=":${contrib_classpath}"
+fi
 
 echo "METRON_SPRING_PROFILES_ACTIVE=${METRON_SPRING_PROFILES_ACTIVE}"
 

http://git-wip-us.apache.org/repos/asf/metron/blob/a262f87f/metron-platform/metron-parsers/3rdPartyParser.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/3rdPartyParser.md 
b/metron-platform/metron-parsers/3rdPartyParser.md
new file mode 100644
index 0000000..61095ea
--- /dev/null
+++ b/metron-platform/metron-parsers/3rdPartyParser.md
@@ -0,0 +1,330 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Custom Metron Parsers
+
+We have many stock parsers for normal operations.  Some of these are
+networking and cybersecurity focused (e.g. the ASA Parser), some of
+these are general purpose (e.g. the CSVParser), but inevitably users
+will want to extend the system to process their own data formats.  To
+enable this, this is a walkthrough of how to create and use a custom
+parser within Metron.
+
+# Writing A Custom Parser
+Before we can use a parser, we will need to create a custom parser.  The
+parser is the workhorse of Metron ingest.  It provides the mapping
+between the raw data coming in via the Kafka value and a `JSONObject`,
+the internal data structure provided.
+
+## Implementation
+
+In order to do create a custom parser, we need to do one of the following:
+* Write a class which conforms to the 
`org.apache.metron.parsers.interfaces.MessageParser<JSONObject>` and 
`java.util.Serializable` interfaces
+  * Implement `init()`, `validate(JSONObject message)`, and `List<JSONObject> 
parse(byte[] rawMessage)`
+* Write a class which extends `org.apache.metron.parsers.BasicParser`
+  * Provides convenience implementations to `validate` which ensures 
`timestamp` and `original_string` fields exist.
+
+## Example
+
+In order to illustrate how this might be done, let's create a very
+simple parser that takes a comma separated pair and creates a couple of
+fields:
+* `original_string` -- the raw data
+* `timestamp` -- the current time
+* `first` -- the first field of the comma separated pair
+* `last` -- the last field of the comma separated pair
+
+For this demonstration, let's create a maven project to compile our
+project.  We'll call it `extra_parsers`, so in your workspace, let's set
+up the maven project:
+* Create the maven infrastructure for `extra_parsers` via
+```
+mkdir -p extra_parsers/src/{main,test}/java
+```
+* Create a pom file indicating how we should build our parsers by
+  editing `extra_parsers/pom.xml` with the following content:
+```
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>com.3rdparty</groupId>
+  <artifactId>extra-parsers</artifactId>
+  <packaging>jar</packaging>
+  <version>1.0-SNAPSHOT</version>
+  <name>extra-parsers</name>
+  <url>http://thirdpartysoftware.org</url>
+  <properties>
+    <!-- The java version to conform to.  Metron works all the way to 1.8 -->
+    <java_version>1.8</java_version>
+    <!-- The version of Metron that we'll be targetting. -->
+    <metron_version>0.4.1</metron_version>
+    <!-- To complete the simulation, we'll depend on a common dependency -->
+    <guava_version>19.0</guava_version>
+    <!-- We will shade our dependencies to create a single jar at the end -->
+    <shade_version>2.4.3</shade_version>
+  </properties>
+  <dependencies>
+    <!--
+    We want to depend on Metron, but ensure that the scope is "provided"
+    as we do not want to include it in our bundle.
+    -->
+    <dependency>
+      <groupId>org.apache.metron</groupId>
+      <artifactId>metron-parsers</artifactId>
+      <version>${metron_version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>${guava_version}</version>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>3.8.1</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+     <!-- We will set up the shade plugin to create a single jar at the
+           end of the build lifecycle.  We will exclude some things and
+           relocate others to simulate a real situation.
+           
+           One thing to note is that it's a good practice to shade and
+           relocate common libraries that may be dependencies in Metron.
+           Your jar will be merged with the parsers jar, so the metron
+           version will be included for all overlapping classes.
+           So, shade and relocate to ensure that YOUR version of the library 
is used.
+      -->
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <version>${shade_version}</version>
+        <configuration>
+          <createDependencyReducedPom>true</createDependencyReducedPom>
+          <artifactSet>
+            <excludes>
+              <!-- Exclude slf4j for no reason other than to illustrate how to 
exclude dependencies.
+                   The metron team has nothing against slf4j. :-)
+               -->
+              <exclude>*slf4j*</exclude>
+            </excludes>
+          </artifactSet>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <shadedArtifactAttached>true</shadedArtifactAttached>
+              <shadedClassifierName>uber</shadedClassifierName>
+              <filters>
+                <filter>
+                  <!-- Sometimes these get added and confuse the uber jar out 
of shade -->
+                  <artifact>*:*</artifact>
+                  <excludes>
+                    <exclude>META-INF/*.SF</exclude>
+                    <exclude>META-INF/*.DSA</exclude>
+                    <exclude>META-INF/*.RSA</exclude>
+                  </excludes>
+                </filter>
+              </filters>
+              <relocations>
+                <!-- Relocate guava as it's used in Metron and I really want 
0.19 -->
+                <relocation>
+                  <pattern>com.google</pattern>
+                  <shadedPattern>com.thirdparty.guava</shadedPattern>
+                </relocation>
+              </relocations>
+              <artifactSet>
+                <excludes>
+                  <!-- We can also exclude by artifactId and groupId -->
+                  <exclude>storm:storm-core:*</exclude>
+                  <exclude>storm:storm-lib:*</exclude>
+                  <exclude>org.slf4j.impl*</exclude>
+                  <exclude>org.slf4j:slf4j-log4j*</exclude>
+                </excludes>
+              </artifactSet>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <!--
+      We want to make sure we compile using java 1.8.
+      -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.5.1</version>
+        <configuration>
+          <forceJavacCompilerUse>true</forceJavacCompilerUse>
+          <source>${java_version}</source>
+          <compilerArgument>-Xlint:unchecked</compilerArgument>
+          <target>${java_version}</target>
+          <showWarnings>true</showWarnings>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>
+```
+* Now let's create our parser  `com.thirdparty.SimpleParser` by creating the 
file `extra-parsers/src/main/java/com/thirdparty/SimpleParser.java` with the 
following content:
+```
+package com.thirdparty;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.metron.parsers.BasicParser;
+import org.json.simple.JSONObject;
+
+import java.util.List;
+import java.util.Map;
+
+public class SimpleParser extends BasicParser {
+  @Override
+  public void init() {
+
+  }
+
+  @Override
+  public List<JSONObject> parse(byte[] bytes) {
+    String input = new String(bytes);
+    Iterable<String> it = Splitter.on(",").split(input);
+    JSONObject ret = new JSONObject();
+    ret.put("original_string", input);
+    ret.put("timestamp", System.currentTimeMillis());
+    ret.put("first", Iterables.getFirst(it, "missing"));
+    ret.put("last", Iterables.getLast(it, "missing"));
+    return ImmutableList.of(ret);
+  }
+
+  @Override
+  public void configure(Map<String, Object> map) {
+
+  }
+}
+```
+* Compile the parser via `mvn clean package` in `extra_parsers`
+
+This will create a jar containing your parser and its dependencies (sans 
Metron dependencies) in 
`extra-parsers/target/extra-parsers-1.0-SNAPSHOT-uber.jar`
+
+# Deploying Your Custom Parser
+
+In order to deploy your newly built custom parser, you would place the jar 
file above in the `$METRON_HOME/parser_contrib` directory on the Metron host 
(i.e. any host you would start parsers from or, alternatively, where the Metron 
REST is hosted).
+
+## Example
+
+Let's work through deploying the example above.
+
+### Preliminaries
+
+We assume that the following environment variables are set:
+* `METRON_HOME` - the home directory for metron
+* `ZOOKEEPER` - The zookeeper quorum (comma separated with port specified: 
e.g. `node1:2181` for full-dev)
+* `BROKERLIST` - The Kafka broker list (comma separated with port specified: 
e.g. `node1:6667` for full-dev)
+* `ES_HOST` - The elasticsearch master (and port) e.g. `node1:9200` for 
full-dev.
+
+Also, this does not assume that you are using a kerberized cluster.  If you 
are, then the parser start command will adjust slightly to include the security 
protocol.
+
+### Copy the jar file up
+
+Copy the jar file located in 
`extra-parsers/target/extra-parsers-1.0-SNAPSHOT-uber.jar` to 
`$METRON_HOME/parser_contrib` and ensure the permissions are such that the 
`metron` user can read and execute.
+
+### Restart the REST service in Ambari
+
+In order for new parsers to be picked up, the REST service must be restarted.  
You can do that from within Ambari by restarting the `Metron REST` service.
+
+### Push the Zookeeper Configs
+
+Now push the config to Zookeeper with the following command:
+`$METRON_HOME/bin/zk_load_configs.sh -m PUSH -i $METRON_HOME/config/zookeeper/ 
-z $ZOOKEEPER`
+
+
+### Create a Kafka Topic
+
+Create a kafka topic, let's call it `test` via:
+`/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper $ZOOKEEPER 
--create --topic test --partitions 1 --replication-factor 1`
+
+Note, in a real deployment, that topic would be named something more 
descriptive and would have replication factor and partitions set to something 
less trivial.
+
+### Configure Test Parser
+
+Create the a file called `$METRON_HOME/config/zookeeper/parsers/test.json` 
with the following content:
+```
+{
+  "parserClassName":"com.thirdparty.SimpleParser",
+  "sensorTopic":"test"
+}
+```
+
+### Start Parser
+Now we can start the parser and send some data through:
+* Start the parser
+```
+$METRON_HOME/bin/start_parser_topology.sh -k $BROKERLIST -z $ZOOKEEPER -s test
+```
+* Send example data through:
+```
+echo "apache,metron" | 
/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list 
$BROKERLIST --topic test
+```
+* Validate data was written in ES:
+```
+curl -XPOST "http://$ES_HOST/test*/_search?pretty"; -d '
+{
+  "_source" : [ "original_string", "timestamp", "first", "last"]
+}
+'
+```
+This should yield something like:
+```
+{
+  "took" : 23,
+  "timed_out" : false,
+  "_shards" : {
+    "total" : 1,
+    "successful" : 1,
+    "failed" : 0
+  },
+  "hits" : {
+    "total" : 1,
+    "max_score" : 1.0,
+    "hits" : [ {
+      "_index" : "test_index_2017.10.04.17",
+      "_type" : "test_doc",
+      "_id" : "3ae4dd4d-8c09-4f2a-93c0-26ec5508baaa",
+      "_score" : 1.0,
+      "_source" : {
+        "original_string" : "apache,metron",
+        "last" : "metron",
+        "first" : "apache",
+        "timestamp" : 1507138373223
+      }
+    } ]
+  }
+}
+```
+
+### Via the Management UI
+
+As long as the REST service is restarted after new parsers are added to 
`$METRON_HOME/parser_contrib`, they are available in the UI to creating and 
deploying parsers.

http://git-wip-us.apache.org/repos/asf/metron/blob/a262f87f/metron-platform/metron-parsers/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/pom.xml 
b/metron-platform/metron-parsers/pom.xml
index 18377d3..f856654 100644
--- a/metron-platform/metron-parsers/pom.xml
+++ b/metron-platform/metron-parsers/pom.xml
@@ -125,6 +125,12 @@
         </dependency>
         <dependency>
             <groupId>org.apache.storm</groupId>
+            <artifactId>storm-rename-hack</artifactId>
+            <version>${global_storm_version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
             <artifactId>storm-core</artifactId>
             <version>${global_storm_version}</version>
             <scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/metron/blob/a262f87f/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/MergeAndShadeTransformer.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/MergeAndShadeTransformer.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/MergeAndShadeTransformer.java
new file mode 100644
index 0000000..eaadf71
--- /dev/null
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/MergeAndShadeTransformer.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers.topology;
+
+import com.google.common.base.Splitter;
+import org.apache.storm.daemon.JarTransformer;
+import org.apache.storm.hack.StormShadeTransformer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.lang.invoke.MethodHandles;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.jar.JarEntry;
+import java.util.jar.JarInputStream;
+import java.util.jar.JarOutputStream;
+
+/**
+ * This is a storm jar transformer that will add in additional jars pulled 
from an
+ * environment variable.  The jars will be merged with the main uber jar and 
then
+ * the resulting jar will be shaded and relocated according to the 
StormShadeTransformer.
+ *
+ */
+public class MergeAndShadeTransformer implements JarTransformer {
+  public static final String EXTRA_JARS_ENV = "EXTRA_JARS";
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private StormShadeTransformer underlyingTransformer = new 
StormShadeTransformer();
+  @Override
+  public void transform(InputStream input, OutputStream output) throws 
IOException {
+    String extraJars = System.getenv().get(EXTRA_JARS_ENV);
+    if(extraJars == null || extraJars.length() == 0) {
+      underlyingTransformer.transform(input, output);
+      return;
+    }
+    File tmpFile = File.createTempFile("metron", "jar");
+    tmpFile.deleteOnExit();
+    Set<String> entries = new HashSet<>();
+    try (JarOutputStream jout = new JarOutputStream(new 
BufferedOutputStream(new FileOutputStream(tmpFile)))) {
+      try (JarInputStream jin = new JarInputStream(new 
BufferedInputStream(input))){
+        copy(jin, jout, entries);
+      }
+      for (String fileStr : Splitter.on(",").split(extraJars)) {
+        File f = new File(fileStr);
+        if (!f.exists()) {
+          continue;
+        }
+        LOG.info("Merging jar {} from {}", f.getName(), f.getAbsolutePath());
+        try (JarInputStream jin = new JarInputStream(new 
BufferedInputStream(new FileInputStream(f)))) {
+          copy(jin, jout, entries);
+        }
+      }
+    }
+    underlyingTransformer.transform(new BufferedInputStream(new 
FileInputStream(tmpFile)), output);
+  }
+
+  /**
+   * Merges two jars.  The first jar will get merged into the output jar.
+   * A running set of jar entries is kept so that duplicates are skipped.
+   * This has the side-effect that the first instance of a given entry will be 
added
+   * and all subsequent entries are skipped.
+   *
+   * @param jin The input jar
+   * @param jout The output jar
+   * @param entries The set of existing entries.  Note that this set will be 
mutated as part of this call.
+   * @return The set of entries.
+   * @throws IOException
+   */
+  private Set<String> copy(JarInputStream jin, JarOutputStream jout, 
Set<String> entries) throws IOException {
+    byte[] buffer = new byte[1024];
+    for(JarEntry entry = jin.getNextJarEntry(); entry != null; entry = 
jin.getNextJarEntry()) {
+      if(entries.contains(entry.getName())) {
+        continue;
+      }
+      LOG.debug("Merging jar entry {}", entry.getName());
+      entries.add(entry.getName());
+      jout.putNextEntry(entry);
+      int len = 0;
+      while( (len = jin.read(buffer)) > 0 ) {
+        jout.write(buffer, 0, len);
+      }
+    }
+    return entries;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/a262f87f/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
index b5ee628..4ce0508 100644
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
@@ -23,10 +23,6 @@ import org.apache.metron.storm.kafka.flux.SpoutConfiguration;
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.AlreadyAliveException;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.InvalidTopologyException;
-import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.utils.Utils;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.base.Joiner;
@@ -285,7 +281,19 @@ public class ParserTopologyCLI {
   }
 
   private static CommandLine parse(Options options, String[] args) {
-    CommandLineParser parser = new PosixParser();
+    /*
+     * The general gist is that in order to pass args to storm jar,
+     * we have to disregard options that we don't know about in the CLI.
+     * Storm will ignore our args, we have to do the same.
+     */
+    CommandLineParser parser = new PosixParser() {
+      @Override
+      protected void processOption(String arg, ListIterator iter) throws 
ParseException {
+        if(getOptions().hasOption(arg)) {
+          super.processOption(arg, iter);
+        }
+      }
+    };
     try {
       return ParserOptions.parse(parser, args);
     } catch (ParseException pe) {

http://git-wip-us.apache.org/repos/asf/metron/blob/a262f87f/metron-platform/metron-parsers/src/main/scripts/start_parser_topology.sh
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/scripts/start_parser_topology.sh 
b/metron-platform/metron-parsers/src/main/scripts/start_parser_topology.sh
index 8faf89e..00ac809 100755
--- a/metron-platform/metron-parsers/src/main/scripts/start_parser_topology.sh
+++ b/metron-platform/metron-parsers/src/main/scripts/start_parser_topology.sh
@@ -19,4 +19,11 @@
 METRON_VERSION=${project.version}
 METRON_HOME=/usr/metron/$METRON_VERSION
 TOPOLOGY_JAR=metron-parsers-$METRON_VERSION-uber.jar
-storm jar $METRON_HOME/lib/$TOPOLOGY_JAR 
org.apache.metron.parsers.topology.ParserTopologyCLI "$@"
+PARSER_CONTRIB=${PARSER_CONTRIB:-$METRON_HOME/parser_contrib}
+if [ -d "$PARSER_CONTRIB" ]; then
+  export STORM_EXT_CLASSPATH=$METRON_HOME/lib/$TOPOLOGY_JAR
+  export EXTRA_JARS=$(ls -m $PARSER_CONTRIB/*.jar | tr -d ' ' | tr -d '\n' | 
sed 's/\/\//\//g')
+  storm jar $METRON_HOME/lib/$TOPOLOGY_JAR 
org.apache.metron.parsers.topology.ParserTopologyCLI "$@" -c 
client.jartransformer.class=org.apache.metron.parsers.topology.MergeAndShadeTransformer
+else
+  storm jar $METRON_HOME/lib/$TOPOLOGY_JAR 
org.apache.metron.parsers.topology.ParserTopologyCLI "$@"
+fi

Reply via email to