http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-platform/metron-management/src/main/java/org/apache/metron/management/ConfigurationFunctions.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-management/src/main/java/org/apache/metron/management/ConfigurationFunctions.java
 
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/ConfigurationFunctions.java
index af90e14..82cb8cb 100644
--- 
a/metron-platform/metron-management/src/main/java/org/apache/metron/management/ConfigurationFunctions.java
+++ 
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/ConfigurationFunctions.java
@@ -71,6 +71,7 @@ public class ConfigurationFunctions {
     }
     CuratorFramework client = (CuratorFramework) clientOpt.get();
     TreeCache cache = new TreeCache(client, Constants.ZOOKEEPER_TOPOLOGY_ROOT);
+    TreeCache exCache = new TreeCache(client, 
Constants.ZOOKEEPER_EXTENSIONS_ROOT);
     TreeCacheListener listener = new TreeCacheListener() {
       @Override
       public void childEvent(CuratorFramework client, TreeCacheEvent event) 
throws Exception {
@@ -91,6 +92,9 @@ public class ConfigurationFunctions {
           } else if 
(path.startsWith(ConfigurationType.INDEXING.getZookeeperRoot())) {
             Map<String, String> sensorMap = (Map<String, 
String>)configMap.get(ConfigurationType.INDEXING);
             sensorMap.put(sensor, new String(data));
+          } else if 
(path.startsWith(ConfigurationType.PARSER_EXTENSION.getZookeeperRoot())) {
+            Map<String,String> parserExtensionMap = 
(Map<String,String>)configMap.get(ConfigurationType.PARSER_EXTENSION);
+            parserExtensionMap.put(sensor,new String(data));
           }
         }
         else if(event.getType().equals(TreeCacheEvent.Type.NODE_REMOVED)) {
@@ -99,6 +103,9 @@ public class ConfigurationFunctions {
           if (path.startsWith(ConfigurationType.PARSER.getZookeeperRoot())) {
             Map<String, String> sensorMap = (Map<String, 
String>)configMap.get(ConfigurationType.PARSER);
             sensorMap.remove(sensor);
+          }else if 
(path.startsWith(ConfigurationType.PARSER_EXTENSION.getZookeeperRoot())) {
+            Map<String,String> parserExtensionMap = 
(Map<String,String>)configMap.get(ConfigurationType.PARSER_EXTENSION);
+            parserExtensionMap.remove(sensor);
           }
           else if 
(path.startsWith(ConfigurationType.ENRICHMENT.getZookeeperRoot())) {
             Map<String, String> sensorMap = (Map<String, 
String>)configMap.get(ConfigurationType.ENRICHMENT);
@@ -117,8 +124,32 @@ public class ConfigurationFunctions {
         }
       }
     };
+    TreeCacheListener exListener = new TreeCacheListener() {
+      @Override
+      public void childEvent(CuratorFramework client, TreeCacheEvent event) 
throws Exception {
+        if (event.getType().equals(TreeCacheEvent.Type.NODE_ADDED) || 
event.getType().equals(TreeCacheEvent.Type.NODE_UPDATED)) {
+          String path = event.getData().getPath();
+          byte[] data = event.getData().getData();
+          String sensor = Iterables.getLast(Splitter.on("/").split(path), 
null);
+           if 
(path.startsWith(ConfigurationType.PARSER_EXTENSION.getZookeeperRoot())) {
+            Map<String,String> parserExtensionMap = 
(Map<String,String>)configMap.get(ConfigurationType.PARSER_EXTENSION);
+            parserExtensionMap.put(sensor,new String(data));
+          }
+        }
+        else if(event.getType().equals(TreeCacheEvent.Type.NODE_REMOVED)) {
+          String path = event.getData().getPath();
+          String sensor = Iterables.getLast(Splitter.on("/").split(path), 
null);
+          if 
(path.startsWith(ConfigurationType.PARSER_EXTENSION.getZookeeperRoot())) {
+            Map<String,String> parserExtensionMap = 
(Map<String,String>)configMap.get(ConfigurationType.PARSER_EXTENSION);
+            parserExtensionMap.remove(sensor);
+          }
+        }
+      }
+    };
     cache.getListenable().addListener(listener);
     cache.start();
+    exCache.getListenable().addListener(exListener);
+    exCache.start();
     for(ConfigurationType ct : ConfigurationType.values()) {
       switch(ct) {
         case GLOBAL:
@@ -146,17 +177,28 @@ public class ConfigurationFunctions {
             }
           }
           break;
+        case PARSER_EXTENSION:
+        {
+          List<String> extensionIds = 
client.getChildren().forPath(ct.getZookeeperRoot());
+          Map<String,String> parserExtensionMap = 
(Map<String,String>)configMap.get(ct);
+          for (String extensionId : extensionIds){
+            parserExtensionMap.put(extensionId, new 
String(ConfigurationsUtils.readFromZookeeper(ct.getZookeeperRoot() + "/" + 
extensionId,client)));
+          }
+        }
+        break;
       }
     }
     context.addCapability("treeCache", () -> cache);
+    context.addCapability("exTreeCache",() -> exCache);
   }
 
   @Stellar(
            namespace = "CONFIG"
           ,name = "GET"
           ,description = "Retrieve a Metron configuration from zookeeper."
-          ,params = {"type - One of ENRICHMENT, INDEXING, PARSER, GLOBAL, 
PROFILER"
+          ,params = {"type - One of ENRICHMENT, INDEXING, 
PARSER,PARSER_EXTENSION, GLOBAL, PROFILER"
                     , "sensor - Sensor to retrieve (required for enrichment 
and parser, not used for profiler and global)"
+                    , "extensionID - Parser Extension to retrieve (required 
for PARSER_EXTENSION, not used for profiler,global,indexing,enrichment,parser)"
                     , "emptyIfNotPresent - If true, then return an empty, 
minimally viable config"
                     }
           ,returns = "The String representation of the config in zookeeper"
@@ -191,6 +233,14 @@ public class ConfigurationFunctions {
           }
           return ret;
         }
+        case PARSER_EXTENSION: {
+          String extensionID = (String) args.get(1);
+          if(args.size() > 2) {
+            emptyIfNotPresent = ConversionUtils.convert(args.get(2), 
Boolean.class);
+          }
+          Map<String,String> parserExtensionMap = (Map<String,String>) 
configMap.get(type);
+          return parserExtensionMap.get(extensionID);
+        }
         case INDEXING: {
           String sensor = (String) args.get(1);
           if(args.size() > 2) {

http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
 
b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
index 431ece2..af6b3da 100644
--- 
a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
+++ 
b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
@@ -37,7 +37,7 @@ import org.junit.Test;
 
 import java.util.HashMap;
 
-import static org.apache.metron.TestConstants.PARSER_CONFIGS_PATH;
+import static org.apache.metron.TestConstants.A_PARSER_CONFIGS_PATH_FMT;
 import static org.apache.metron.TestConstants.SAMPLE_CONFIG_PATH;
 import static org.apache.metron.management.utils.FileUtils.slurp;
 import static org.apache.metron.stellar.common.utils.StellarProcessorUtils.run;
@@ -46,6 +46,7 @@ public class ConfigurationFunctionsTest {
   private static TestingServer testZkServer;
   private static CuratorFramework client;
   private static String zookeeperUrl;
+  private static final String sensorType = "bro";
   private Context context = new Context.Builder()
             .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
             .build();
@@ -57,7 +58,7 @@ public class ConfigurationFunctionsTest {
     client.start();
 
     pushConfigs(SAMPLE_CONFIG_PATH);
-    pushConfigs(PARSER_CONFIGS_PATH);
+    
pushConfigs(String.format(A_PARSER_CONFIGS_PATH_FMT,sensorType,sensorType));
 
 
   }
@@ -73,7 +74,7 @@ public class ConfigurationFunctionsTest {
   }
 
 
-  static String goodBroParserConfig = slurp(PARSER_CONFIGS_PATH + 
"/parsers/bro.json");
+  static String goodBroParserConfig = 
slurp(String.format(A_PARSER_CONFIGS_PATH_FMT,sensorType,sensorType) + 
"/parsers/bro.json");
 
   /**
     {
@@ -120,6 +121,52 @@ public class ConfigurationFunctionsTest {
     }
   }
 
+  static String goodParserEXConfig = slurp(SAMPLE_CONFIG_PATH + 
"extensions/parsers/metron-test-parsers.json");
+
+  /**
+    {
+   "extensionAssemblyName" : "test.fool-1.0.tar.gz",
+   "extensionBundleName" : "metron-test-parsers-1.0.bundle",
+   "extensionsBundleID" : "metron-test-parsers",
+   "extensionsBundleVersion" : "1.0.0",
+   "parserExtensionParserName" : [ "test2", "test" ]
+   }
+   */
+  @Multiline
+  static String defaultTestParserExtensionConfig;
+
+  @Test
+  public void testParserExtensionGetHappyPath() {
+
+    Object out = run("CONFIG_GET('PARSER_EXTENSION', 'metron-test-parsers')", 
new HashMap<>(), context);
+    Assert.assertEquals(goodParserEXConfig, out);
+  }
+
+  @Test
+  public void testParserExtensionGetMissWithoutDefault() {
+
+    {
+      Object out = run("CONFIG_GET('PARSER_EXTENSION', 'brop', false)", new 
HashMap<>(), context);
+      Assert.assertNull(out);
+    }
+  }
+
+  @Test
+  public void testParserExtensionGetMissWithNoDefaultSupport() throws 
Exception {
+    JSONObject expected = (JSONObject) new 
JSONParser().parse(defaultTestParserExtensionConfig);
+
+    {
+      Object out = run("CONFIG_GET('PARSER_EXTENSION', 
'metron-test-parsers')", new HashMap<>(), context);
+      JSONObject actual = (JSONObject) new 
JSONParser().parse(out.toString().trim());
+      Assert.assertEquals(expected, actual);
+    }
+    {
+      // does not support default
+      Object out = run("CONFIG_GET('PARSER_EXTENSION', 'brop', true)", new 
HashMap<>(), context);
+      Assert.assertNull(out);
+    }
+  }
+
   static String goodTestEnrichmentConfig = slurp( SAMPLE_CONFIG_PATH + 
"/enrichments/test.json");
 
   /**

http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-platform/metron-management/src/test/java/org/apache/metron/management/ParserConfigFunctionsTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ParserConfigFunctionsTest.java
 
b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ParserConfigFunctionsTest.java
index f830e62..0ca296a 100644
--- 
a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ParserConfigFunctionsTest.java
+++ 
b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ParserConfigFunctionsTest.java
@@ -31,15 +31,15 @@ import org.junit.Test;
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.apache.metron.TestConstants.PARSER_CONFIGS_PATH;
+import static org.apache.metron.TestConstants.A_PARSER_CONFIGS_PATH_FMT;
 import static org.apache.metron.management.utils.FileUtils.slurp;
 import static org.apache.metron.common.configuration.ConfigurationType.PARSER;
 import static org.apache.metron.stellar.common.utils.StellarProcessorUtils.run;
 
 public class ParserConfigFunctionsTest {
 
-  String emptyTransformationsConfig = slurp(PARSER_CONFIGS_PATH + 
"/parsers/bro.json");
-  String existingTransformationsConfig = slurp(PARSER_CONFIGS_PATH + 
"/parsers/squid.json");
+  String emptyTransformationsConfig = 
slurp(String.format(A_PARSER_CONFIGS_PATH_FMT,"bro","bro") + 
"/parsers/bro.json");
+  String existingTransformationsConfig = 
slurp(String.format(A_PARSER_CONFIGS_PATH_FMT,"squid","squid") + 
"/parsers/squid.json");
   Map<String, StellarExecutor.VariableResult> variables ;
   Context context = null;
   @Before

http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-platform/metron-parsers/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/README.md 
b/metron-platform/metron-parsers/README.md
index f03abdf..3f8acbb 100644
--- a/metron-platform/metron-parsers/README.md
+++ b/metron-platform/metron-parsers/README.md
@@ -9,7 +9,7 @@ enrichment and indexing.
 There are two general types types of parsers:
 *  A parser written in Java which conforms to the `MessageParser` interface.  
This kind of parser is optimized for speed and performance and is built for use 
with higher velocity topologies.  These parsers are not easily modifiable and 
in order to make changes to them the entire topology need to be recompiled.  
 * A general purpose parser.  This type of parser is primarily designed for 
lower-velocity topologies or for quickly standing up a parser for a new 
telemetry before a permanent Java parser can be written for it.  As of the time 
of this writing, we have:
-  * Grok parser: `org.apache.metron.parsers.GrokParser` with possible 
`parserConfig` entries of 
+  * Grok parser: `org.apache.metron.parsers.grok.GrokParser` with possible 
`parserConfig` entries of 
     * `grokPath` : The path in HDFS (or in the Jar) to the grok statement
     * `patternLabel` : The pattern label to use from the grok statement
     * `timestampField` : The field to use for timestamp
@@ -251,7 +251,7 @@ Consider the following example configuration for the `yaf` 
sensor:
 
 ```
 {
-  "parserClassName":"org.apache.metron.parsers.GrokParser",
+  "parserClassName":"org.apache.metron.parsers.grok.GrokParser",
   "sensorTopic":"yaf",
   "fieldTransformations" : [
                     {
@@ -286,7 +286,7 @@ Java parser adapters are indended for higher-velocity 
topologies and are not eas
 ### Grok Parser Adapters
 Grok parser adapters are designed primarly for someone who is not a Java coder 
for quickly standing up a parser adapter for lower velocity topologies.  Grok 
relies on Regex for message parsing, which is much slower than purpose-built 
Java parsers, but is more extensible.  Grok parsers are defined via a config 
file and the topplogy does not need to be recombiled in order to make changes 
to them.  An example of a Grok perser is:
 
-* org.apache.metron.parsers.GrokParser
+* org.apache.metron.parsers.grok.GrokParser
 
 For more information on the Grok project please refer to the following link:
 

http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-platform/metron-parsers/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/pom.xml 
b/metron-platform/metron-parsers/pom.xml
index b7c21ff..b3b5657 100644
--- a/metron-platform/metron-parsers/pom.xml
+++ b/metron-platform/metron-parsers/pom.xml
@@ -30,6 +30,11 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.metron</groupId>
+            <artifactId>bundles-lib</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
             <artifactId>metron-common</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
@@ -230,6 +235,11 @@
             <version>2.2.6</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.atteo.classindex</groupId>
+            <artifactId>classindex</artifactId>
+            <version>${global_classindex_version}</version>
+        </dependency>
     </dependencies>
     <build>
         <plugins>

http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-platform/metron-parsers/src/main/config/zookeeper/parsers/asa.json
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/config/zookeeper/parsers/asa.json 
b/metron-platform/metron-parsers/src/main/config/zookeeper/parsers/asa.json
deleted file mode 100644
index 489a047..0000000
--- a/metron-platform/metron-parsers/src/main/config/zookeeper/parsers/asa.json
+++ /dev/null
@@ -1,7 +0,0 @@
-{
-  "parserClassName": "org.apache.metron.parsers.asa.BasicAsaParser",
-  "sensorTopic": "asa",
-  "parserConfig": {
-    "deviceTimeZone": "UTC"
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-platform/metron-parsers/src/main/config/zookeeper/parsers/bro.json
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/config/zookeeper/parsers/bro.json 
b/metron-platform/metron-parsers/src/main/config/zookeeper/parsers/bro.json
deleted file mode 100644
index a9750c2..0000000
--- a/metron-platform/metron-parsers/src/main/config/zookeeper/parsers/bro.json
+++ /dev/null
@@ -1,5 +0,0 @@
-{
-  "parserClassName":"org.apache.metron.parsers.bro.BasicBroParser",
-  "sensorTopic":"bro",
-  "parserConfig": {}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-platform/metron-parsers/src/main/config/zookeeper/parsers/snort.json
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/config/zookeeper/parsers/snort.json 
b/metron-platform/metron-parsers/src/main/config/zookeeper/parsers/snort.json
deleted file mode 100644
index be36fa2..0000000
--- 
a/metron-platform/metron-parsers/src/main/config/zookeeper/parsers/snort.json
+++ /dev/null
@@ -1,5 +0,0 @@
-{
-  "parserClassName":"org.apache.metron.parsers.snort.BasicSnortParser",
-  "sensorTopic":"snort",
-  "parserConfig": {}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-platform/metron-parsers/src/main/config/zookeeper/parsers/squid.json
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/config/zookeeper/parsers/squid.json 
b/metron-platform/metron-parsers/src/main/config/zookeeper/parsers/squid.json
deleted file mode 100644
index e44c4c2..0000000
--- 
a/metron-platform/metron-parsers/src/main/config/zookeeper/parsers/squid.json
+++ /dev/null
@@ -1,19 +0,0 @@
-{
-  "parserClassName": "org.apache.metron.parsers.GrokParser",
-  "sensorTopic": "squid",
-  "parserConfig": {
-    "grokPath": "/patterns/squid",
-    "patternLabel": "SQUID_DELIMITED",
-    "timestampField": "timestamp"
-  },
-  "fieldTransformations" : [
-    {
-      "transformation" : "STELLAR"
-    ,"output" : [ "full_hostname", "domain_without_subdomains" ]
-    ,"config" : {
-      "full_hostname" : "URL_TO_HOST(url)"
-      ,"domain_without_subdomains" : "DOMAIN_REMOVE_SUBDOMAINS(full_hostname)"
-                }
-    }
-                           ]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-platform/metron-parsers/src/main/config/zookeeper/parsers/websphere.json
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/config/zookeeper/parsers/websphere.json
 
b/metron-platform/metron-parsers/src/main/config/zookeeper/parsers/websphere.json
deleted file mode 100644
index 0f2c901..0000000
--- 
a/metron-platform/metron-parsers/src/main/config/zookeeper/parsers/websphere.json
+++ /dev/null
@@ -1,11 +0,0 @@
-{
-  "parserClassName":"org.apache.metron.parsers.websphere.GrokWebSphereParser",
-  "sensorTopic":"websphere",
-  "parserConfig":
-  {
-    "grokPath":"/patterns/websphere",
-    "patternLabel":"WEBSPHERE",
-    "timestampField":"timestamp_string",
-    "dateFormat":"yyyy MMM dd HH:mm:ss"
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-platform/metron-parsers/src/main/config/zookeeper/parsers/yaf.json
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/config/zookeeper/parsers/yaf.json 
b/metron-platform/metron-parsers/src/main/config/zookeeper/parsers/yaf.json
deleted file mode 100644
index 1267554..0000000
--- a/metron-platform/metron-parsers/src/main/config/zookeeper/parsers/yaf.json
+++ /dev/null
@@ -1,18 +0,0 @@
-{
-  "parserClassName":"org.apache.metron.parsers.GrokParser",
-  "sensorTopic":"yaf",
-  "fieldTransformations" : [
-                    {
-                      "input" : "protocol"
-                     ,"transformation": "IP_PROTOCOL"
-                    }
-                    ],
-  "parserConfig":
-  {
-    "grokPath":"/patterns/yaf",
-    "patternLabel":"YAF_DELIMITED",
-    "timestampField":"start_time",
-    "timeFields": ["start_time", "end_time"],
-    "dateFormat":"yyyy-MM-dd HH:mm:ss.S"
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/BasicParser.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/BasicParser.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/BasicParser.java
index 49157d5..37bb856 100644
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/BasicParser.java
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/BasicParser.java
@@ -30,6 +30,7 @@ public abstract class BasicParser implements
 
   protected static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+  public BasicParser(){}
   @Override
   public boolean validate(JSONObject message) {
     JSONObject value = message;

http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
deleted file mode 100644
index 99ac390..0000000
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.parsers;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Serializable;
-import java.lang.invoke.MethodHandles;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.TimeZone;
-import oi.thekraken.grok.api.Grok;
-import oi.thekraken.grok.api.Match;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.metron.common.Constants;
-import org.apache.metron.parsers.interfaces.MessageParser;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class GrokParser implements MessageParser<JSONObject>, Serializable {
-
-  protected static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  protected transient Grok grok;
-  protected String grokPath;
-  protected String patternLabel;
-  protected List<String> timeFields = new ArrayList<>();
-  protected String timestampField;
-  protected SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss.S z");
-  protected String patternsCommonDir = "/patterns/common";
-
-  @Override
-  public void configure(Map<String, Object> parserConfig) {
-    this.grokPath = (String) parserConfig.get("grokPath");
-    this.patternLabel = (String) parserConfig.get("patternLabel");
-    this.timestampField = (String) parserConfig.get("timestampField");
-    List<String> timeFieldsParam = (List<String>) 
parserConfig.get("timeFields");
-    if (timeFieldsParam != null) {
-      this.timeFields = timeFieldsParam;
-    }
-    String dateFormatParam = (String) parserConfig.get("dateFormat");
-    if (dateFormatParam != null) {
-      this.dateFormat = new SimpleDateFormat(dateFormatParam);
-    }
-    String timeZoneParam = (String) parserConfig.get("timeZone");
-    if (timeZoneParam != null) {
-      dateFormat.setTimeZone(TimeZone.getTimeZone(timeZoneParam));
-      LOG.debug("Grok Parser using provided TimeZone: {}", timeZoneParam);
-    } else {
-      dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
-      LOG.debug("Grok Parser using default TimeZone (UTC)");
-    }
-  }
-
-  public InputStream openInputStream(String streamName) throws IOException {
-    FileSystem fs = FileSystem.get(new Configuration());
-    Path path = new Path(streamName);
-    if(fs.exists(path)) {
-      return fs.open(path);
-    } else {
-      return getClass().getResourceAsStream(streamName);
-    }
-  }
-
-  @Override
-  public void init() {
-    grok = new Grok();
-    try {
-      InputStream commonInputStream = openInputStream(patternsCommonDir);
-      LOG.debug("Grok parser loading common patterns from: {}", 
patternsCommonDir);
-
-      if (commonInputStream == null) {
-        throw new RuntimeException(
-                "Unable to initialize grok parser: Unable to load " + 
patternsCommonDir + " from either classpath or HDFS");
-      }
-
-      grok.addPatternFromReader(new InputStreamReader(commonInputStream));
-      LOG.debug("Loading parser-specific patterns from: {}", grokPath);
-
-      InputStream patterInputStream = openInputStream(grokPath);
-      if (patterInputStream == null) {
-        throw new RuntimeException("Grok parser unable to initialize grok 
parser: Unable to load " + grokPath
-                + " from either classpath or HDFS");
-      }
-      grok.addPatternFromReader(new InputStreamReader(patterInputStream));
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Grok parser set the following grok expression: {}", 
grok.getNamedRegexCollectionById(patternLabel));
-      }
-
-      String grokPattern = "%{" + patternLabel + "}";
-
-      grok.compile(grokPattern);
-      LOG.debug("Compiled grok pattern {}", grokPattern);
-
-    } catch (Throwable e) {
-      LOG.error(e.getMessage(), e);
-      throw new RuntimeException("Grok parser Error: " + e.getMessage(), e);
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public List<JSONObject> parse(byte[] rawMessage) {
-    if (grok == null) {
-      init();
-    }
-    List<JSONObject> messages = new ArrayList<>();
-    String originalMessage = null;
-    try {
-      originalMessage = new String(rawMessage, "UTF-8");
-      LOG.debug("Grok parser parsing message: {}",originalMessage);
-      Match gm = grok.match(originalMessage);
-      gm.captures();
-      JSONObject message = new JSONObject();
-      message.putAll(gm.toMap());
-
-      if (message.size() == 0)
-        throw new RuntimeException("Grok statement produced a null message. 
Original message was: "
-                + originalMessage + " and the parsed message was: " + message 
+ " . Check the pattern at: "
-                + grokPath);
-
-      message.put("original_string", originalMessage);
-      for (String timeField : timeFields) {
-        String fieldValue = (String) message.get(timeField);
-        if (fieldValue != null) {
-          message.put(timeField, toEpoch(fieldValue));
-        }
-      }
-      if (timestampField != null) {
-        message.put(Constants.Fields.TIMESTAMP.getName(), 
formatTimestamp(message.get(timestampField)));
-      }
-      message.remove(patternLabel);
-      postParse(message);
-      messages.add(message);
-      LOG.debug("Grok parser parsed message: {}", message);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-      throw new IllegalStateException("Grok parser Error: " + e.getMessage() + 
" on " + originalMessage , e);
-    }
-    return messages;
-  }
-
-  @Override
-  public boolean validate(JSONObject message) {
-    LOG.debug("Grok parser validating message: {}", message);
-
-    Object timestampObject = message.get(Constants.Fields.TIMESTAMP.getName());
-    if (timestampObject instanceof Long) {
-      Long timestamp = (Long) timestampObject;
-      if (timestamp > 0) {
-        LOG.debug("Grok parser validated message: {}", message);
-        return true;
-      }
-    }
-
-    LOG.debug("Grok parser did not validate message: {}", message);
-    return false;
-  }
-
-  protected void postParse(JSONObject message) {}
-
-  protected long toEpoch(String datetime) throws ParseException {
-    LOG.debug("Grok parser converting timestamp to epoch: {}", datetime);
-    LOG.debug("Grok parser's DateFormat has TimeZone: {}", 
dateFormat.getTimeZone());
-
-    Date date = dateFormat.parse(datetime);
-    LOG.debug("Grok parser converted timestamp to epoch: {}", date);
-
-    return date.getTime();
-  }
-
-  protected long formatTimestamp(Object value) {
-    LOG.debug("Grok parser formatting timestamp {}", value);
-
-    if (value == null) {
-      throw new RuntimeException(patternLabel + " pattern does not include 
field " + timestampField);
-    }
-    if (value instanceof Number) {
-      return ((Number) value).longValue();
-    } else {
-      return Long.parseLong(Joiner.on("").join(Splitter.on('.').split(value + 
"")));
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/asa/BasicAsaParser.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/asa/BasicAsaParser.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/asa/BasicAsaParser.java
deleted file mode 100644
index 7ac2398..0000000
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/asa/BasicAsaParser.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.parsers.asa;
-
-import com.google.common.collect.ImmutableMap;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.UnsupportedEncodingException;
-import java.lang.invoke.MethodHandles;
-import java.time.Clock;
-import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import oi.thekraken.grok.api.Grok;
-import oi.thekraken.grok.api.Match;
-import oi.thekraken.grok.api.exception.GrokException;
-import org.apache.metron.common.Constants;
-import org.apache.metron.parsers.BasicParser;
-import org.apache.metron.parsers.ParseException;
-import org.apache.metron.parsers.utils.SyslogUtils;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BasicAsaParser extends BasicParser {
-
-  protected static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  protected Clock deviceClock;
-  private String syslogPattern = "%{CISCO_TAGGED_SYSLOG}";
-
-  private Grok syslogGrok;
-
-  private static final Map<String, String> patternMap = ImmutableMap.<String, 
String> builder()
-      .put("ASA-2-106001", "CISCOFW106001")
-      .put("ASA-2-106006", "CISCOFW106006_106007_106010")
-      .put("ASA-2-106007", "CISCOFW106006_106007_106010")
-      .put("ASA-2-106010", "CISCOFW106006_106007_106010")
-      .put("ASA-3-106014", "CISCOFW106014")
-      .put("ASA-6-106015", "CISCOFW106015")
-      .put("ASA-1-106021", "CISCOFW106021")
-      .put("ASA-4-106023", "CISCOFW106023")
-      .put("ASA-5-106100", "CISCOFW106100")
-      .put("ASA-6-110002", "CISCOFW110002")
-      .put("ASA-6-302010", "CISCOFW302010")
-      .put("ASA-6-302013", "CISCOFW302013_302014_302015_302016")
-      .put("ASA-6-302014", "CISCOFW302013_302014_302015_302016")
-      .put("ASA-6-302015", "CISCOFW302013_302014_302015_302016")
-      .put("ASA-6-302016", "CISCOFW302013_302014_302015_302016")
-      .put("ASA-6-302020", "CISCOFW302020_302021")
-      .put("ASA-6-302021", "CISCOFW302020_302021")
-      .put("ASA-6-305011", "CISCOFW305011")
-      .put("ASA-3-313001", "CISCOFW313001_313004_313008")
-      .put("ASA-3-313004", "CISCOFW313001_313004_313008")
-      .put("ASA-3-313008", "CISCOFW313001_313004_313008")
-      .put("ASA-4-313005", "CISCOFW313005")
-      .put("ASA-4-402117", "CISCOFW402117")
-      .put("ASA-4-402119", "CISCOFW402119")
-      .put("ASA-4-419001", "CISCOFW419001")
-      .put("ASA-4-419002", "CISCOFW419002")
-      .put("ASA-4-500004", "CISCOFW500004")
-      .put("ASA-6-602303", "CISCOFW602303_602304")
-      .put("ASA-6-602304", "CISCOFW602303_602304")
-      .put("ASA-7-710001", "CISCOFW710001_710002_710003_710005_710006")
-      .put("ASA-7-710002", "CISCOFW710001_710002_710003_710005_710006")
-      .put("ASA-7-710003", "CISCOFW710001_710002_710003_710005_710006")
-      .put("ASA-7-710005", "CISCOFW710001_710002_710003_710005_710006")
-      .put("ASA-7-710006", "CISCOFW710001_710002_710003_710005_710006")
-      .put("ASA-6-713172", "CISCOFW713172")
-      .put("ASA-4-733100", "CISCOFW733100")
-      .put("ASA-6-305012", "CISCOFW305012")
-      .put("ASA-7-609001", "CISCOFW609001")
-      .put("ASA-7-609002", "CISCOFW609002")
-      .put("ASA-5-713041", "CISCOFW713041")
-      .build();
-
-  private Map<String, Grok> grokers = new HashMap<String, 
Grok>(patternMap.size());
-
-  @Override
-  public void configure(Map<String, Object> parserConfig) {
-    String timeZone = (String) parserConfig.get("deviceTimeZone");
-    if (timeZone != null)
-      deviceClock = Clock.system(ZoneId.of(timeZone));
-    else {
-      deviceClock = Clock.systemUTC();
-      LOG.warn("[Metron] No device time zone provided; defaulting to UTC");
-    }
-  }
-
-  private void addGrok(String key, String pattern) throws GrokException {
-    Grok grok = new Grok();
-    InputStream patternStream = 
this.getClass().getResourceAsStream("/patterns/asa");
-    grok.addPatternFromReader(new InputStreamReader(patternStream));
-    grok.compile("%{" + pattern + "}");
-    grokers.put(key, grok);
-  }
-
-  @Override
-  public void init() {
-    syslogGrok = new Grok();
-    InputStream syslogStream = 
this.getClass().getResourceAsStream("/patterns/asa");
-    try {
-      syslogGrok.addPatternFromReader(new InputStreamReader(syslogStream));
-      syslogGrok.compile(syslogPattern);
-    } catch (GrokException e) {
-      LOG.error("[Metron] Failed to load grok patterns from jar", e);
-      throw new RuntimeException(e.getMessage(), e);
-    }
-
-    for (Entry<String, String> pattern : patternMap.entrySet()) {
-      try {
-        addGrok(pattern.getKey(), pattern.getValue());
-      } catch (GrokException e) {
-        LOG.error("[Metron] Failed to load grok pattern {} for ASA tag {}", 
pattern.getValue(), pattern.getKey());
-      }
-    }
-
-    LOG.info("[Metron] CISCO ASA Parser Initialized");
-  }
-
-  @Override
-  public List<JSONObject> parse(byte[] rawMessage) {
-    String logLine = "";
-    String messagePattern = "";
-    JSONObject metronJson = new JSONObject();
-    List<JSONObject> messages = new ArrayList<>();
-    Map<String, Object> syslogJson = new HashMap<String, Object>();
-
-    try {
-      logLine = new String(rawMessage, "UTF-8");
-    } catch (UnsupportedEncodingException e) {
-      LOG.error("[Metron] Could not read raw message", e);
-      throw new RuntimeException(e.getMessage(), e);
-    }
-
-    try {
-      LOG.debug("[Metron] Started parsing raw message: {}", logLine);
-      Match syslogMatch = syslogGrok.match(logLine);
-      syslogMatch.captures();
-      if (!syslogMatch.isNull()) {
-       syslogJson = syslogMatch.toMap();
-       LOG.trace("[Metron] Grok CISCO ASA syslog matches: {}", 
syslogMatch.toJson());
-
-       metronJson.put(Constants.Fields.ORIGINAL.getName(), logLine);
-       metronJson.put(Constants.Fields.TIMESTAMP.getName(),
-           SyslogUtils.parseTimestampToEpochMillis((String) 
syslogJson.get("CISCOTIMESTAMP"), deviceClock));
-       metronJson.put("ciscotag", syslogJson.get("CISCOTAG"));
-       metronJson.put("syslog_severity", 
SyslogUtils.getSeverityFromPriority((int) syslogJson.get("syslog_pri")));
-       metronJson.put("syslog_facility", 
SyslogUtils.getFacilityFromPriority((int) syslogJson.get("syslog_pri")));
-
-       if (syslogJson.get("syslog_host") != null) {
-         metronJson.put("syslog_host", syslogJson.get("syslog_host"));
-       }
-       if (syslogJson.get("syslog_prog") != null) {
-         metronJson.put("syslog_prog", syslogJson.get("syslog_prog"));
-       }
-
-      } else
-       throw new RuntimeException(
-           String.format("[Metron] Message '%s' does not match pattern '%s'", 
logLine, syslogPattern));
-    } catch (ParseException e) {
-      LOG.error("[Metron] Could not parse message timestamp", e);
-      throw new RuntimeException(e.getMessage(), e);
-    } catch (RuntimeException e) {
-      LOG.error(e.getMessage(), e);
-      throw new RuntimeException(e.getMessage(), e);
-    }
-
-    try {
-      messagePattern = (String) syslogJson.get("CISCOTAG");
-      Grok asaGrok = grokers.get(messagePattern);
-
-      if (asaGrok == null)
-       LOG.info("[Metron] No pattern for ciscotag '{}'", 
syslogJson.get("CISCOTAG"));
-      else {
-
-       String messageContent = (String) syslogJson.get("message");
-       Match messageMatch = asaGrok.match(messageContent);
-       messageMatch.captures();
-       if (!messageMatch.isNull()) {
-         Map<String, Object> messageJson = messageMatch.toMap();
-         LOG.trace("[Metron] Grok CISCO ASA message matches: {}", 
messageMatch.toJson());
-
-         String src_ip = (String) messageJson.get("src_ip");
-         if (src_ip != null)
-           metronJson.put(Constants.Fields.SRC_ADDR.getName(), src_ip);
-
-         Integer src_port = (Integer) messageJson.get("src_port");
-         if (src_port != null)
-           metronJson.put(Constants.Fields.SRC_PORT.getName(), src_port);
-
-         String dst_ip = (String) messageJson.get("dst_ip");
-         if (dst_ip != null)
-           metronJson.put(Constants.Fields.DST_ADDR.getName(), dst_ip);
-
-         Integer dst_port = (Integer) messageJson.get("dst_port");
-         if (dst_port != null)
-           metronJson.put(Constants.Fields.DST_PORT.getName(), dst_port);
-
-         String protocol = (String) messageJson.get("protocol");
-         if (protocol != null)
-           metronJson.put(Constants.Fields.PROTOCOL.getName(), 
protocol.toLowerCase());
-
-         String action = (String) messageJson.get("action");
-         if (action != null)
-           metronJson.put("action", action.toLowerCase());
-       } else
-         LOG.warn("[Metron] Message '{}' did not match pattern for ciscotag 
'{}'", logLine,
-             syslogJson.get("CISCOTAG"));
-      }
-
-      LOG.debug("[Metron] Final normalized message: {}", 
metronJson.toString());
-
-    } catch (RuntimeException e) {
-      LOG.error(e.getMessage(), e);
-      throw new RuntimeException(e.getMessage(), e);
-    }
-
-    messages.add(metronJson);
-    return messages;
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index 99785b2..e5c2d1f 100644
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -70,12 +70,21 @@ public class ParserBolt extends ConfiguredParserBolt 
implements Serializable {
   private transient MessageGetStrategy messageGetStrategy;
   public ParserBolt( String zookeeperUrl
                    , String sensorType
-                   , MessageParser<JSONObject> parser
                    , WriterHandler writer
   )
   {
     super(zookeeperUrl, sensorType);
     this.writer = writer;
+  }
+
+  public ParserBolt( String zookeeperUrl
+          , String sensorType
+          , MessageParser<JSONObject> parser
+          , WriterHandler writer
+  )
+  {
+    super(zookeeperUrl, sensorType);
+    this.writer = writer;
     this.parser = parser;
   }
 
@@ -95,6 +104,16 @@ public class ParserBolt extends ConfiguredParserBolt 
implements Serializable {
     super.prepare(stormConf, context, collector);
     messageGetStrategy = MessageGetters.DEFAULT_BYTES_FROM_POSITION.get();
     this.collector = collector;
+
+    if(this.parser == null) {
+      Optional<MessageParser<JSONObject>> optParser = 
ParserLoader.loadParser(stormConf, client, getSensorParserConfig());
+      if (optParser.isPresent()) {
+        this.parser = optParser.get();
+        this.parser.configure(getSensorParserConfig().getParserConfig());
+      } else {
+        throw new IllegalStateException("Failed to load parser " + 
getSensorParserConfig().getParserClassName());
+      }
+    }
     initializeStellar();
     if(getSensorParserConfig() != null && filter == null) {
       getSensorParserConfig().getParserConfig().putIfAbsent("stellarContext", 
stellarContext);
@@ -105,6 +124,10 @@ public class ParserBolt extends ConfiguredParserBolt 
implements Serializable {
       }
     }
 
+    if(getSensorParserConfig() != null) {
+      
getSensorParserConfig().getParserConfig().put("globalConfig",globalConfig);
+    }
+
     parser.init();
 
     writer.init(stormConf, context, collector, getConfigurations());

http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserLoader.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserLoader.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserLoader.java
new file mode 100644
index 0000000..fbbe396
--- /dev/null
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserLoader.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers.bolt;
+
+import java.net.URISyntaxException;
+import java.util.List;
+import org.apache.commons.vfs2.FileObject;
+import org.apache.commons.vfs2.FileSystemManager;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.metron.bundles.*;
+import org.apache.metron.bundles.bundle.Bundle;
+import org.apache.metron.bundles.util.BundleProperties;
+import org.apache.metron.bundles.util.FileSystemManagerFactory;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.parsers.interfaces.MessageParser;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Loads a Parser from the Metron Bundle Extension System.
+ * This Bundle by be resident on the local filesystem or it may located in 
HDFS.
+ *
+ */
+public class ParserLoader {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ParserBolt.class);
+
+  /**
+   * Loads a parser from a configuration
+   * @param stormConfig the storm config
+   * @param client the CuratorFramework
+   * @param parserConfig the configuration
+   * @return Optional of MessageParser<JSONObject>
+   */
+  @SuppressWarnings("unchecked")
+  public static Optional<MessageParser<JSONObject>> loadParser(Map stormConfig,
+      CuratorFramework client, SensorParserConfig parserConfig) {
+    MessageParser<JSONObject> parser = null;
+    try {
+      // fetch the BundleProperties from zookeeper
+      Optional<BundleProperties> bundleProperties = 
getBundleProperties(client);
+      if (bundleProperties.isPresent()) {
+        BundleProperties props = bundleProperties.get();
+        BundleSystem bundleSystem = new 
BundleSystem.Builder().withBundleProperties(props).build();
+        parser = bundleSystem
+            .createInstance(parserConfig.getParserClassName(), 
MessageParser.class);
+      } else {
+        LOG.error("BundleProperties are missing!");
+      }
+    } catch (Exception e) {
+      LOG.error("Failed to load parser " + parserConfig.getParserClassName(), 
e);
+      return Optional.empty();
+    }
+    return Optional.of(parser);
+  }
+
+  private static Optional<BundleProperties> 
getBundleProperties(CuratorFramework client)
+      throws Exception {
+    BundleProperties properties = null;
+    byte[] propBytes = ConfigurationsUtils
+        .readFromZookeeper(Constants.ZOOKEEPER_ROOT + "/bundle.properties", 
client);
+    if (propBytes.length > 0) {
+      // read in the properties
+      properties = BundleProperties
+          .createBasicBundleProperties(new ByteArrayInputStream(propBytes), 
new HashMap<>());
+    }
+    return Optional.of(properties);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bro/BasicBroParser.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bro/BasicBroParser.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bro/BasicBroParser.java
deleted file mode 100644
index 5264750..0000000
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bro/BasicBroParser.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.parsers.bro;
-
-import java.lang.invoke.MethodHandles;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import org.apache.metron.common.Constants;
-import org.apache.metron.parsers.BasicParser;
-import org.json.simple.JSONArray;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@SuppressWarnings("serial")
-public class BasicBroParser extends BasicParser {
-
-  protected static final Logger _LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  public static final ThreadLocal<NumberFormat> DECIMAL_FORMAT = new 
ThreadLocal<NumberFormat>() {
-    @Override
-    protected NumberFormat initialValue() {
-      return new DecimalFormat("0.0#####");
-    }
-  };
-  private JSONCleaner cleaner = new JSONCleaner();
-
-  @Override
-  public void configure(Map<String, Object> parserConfig) {
-
-  }
-
-  @Override
-  public void init() {
-
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public List<JSONObject> parse(byte[] msg) {
-
-    _LOG.trace("[Metron] Starting to parse incoming message");
-
-    String rawMessage = null;
-    List<JSONObject> messages = new ArrayList<>();
-    try {
-      rawMessage = new String(msg, "UTF-8");
-      _LOG.trace("[Metron] Received message: {}", rawMessage);
-
-      JSONObject cleanedMessage = cleaner.clean(rawMessage);
-      _LOG.debug("[Metron] Cleaned message: {}", cleanedMessage);
-
-      if (cleanedMessage == null || cleanedMessage.isEmpty()) {
-        throw new Exception("Unable to clean message: " + rawMessage);
-      }
-
-      String key;
-      JSONObject payload;
-      if (cleanedMessage.containsKey("type")) {
-        key = cleanedMessage.get("type").toString();
-        payload = cleanedMessage;
-      } else {
-        key = cleanedMessage.keySet().iterator().next().toString();
-
-        if (key == null) {
-          throw new Exception("Unable to retrieve key for message: "
-                  + rawMessage);
-        }
-
-        payload = (JSONObject) cleanedMessage.get(key);
-      }
-
-      if (payload == null) {
-        throw new Exception("Unable to retrieve payload for message: "
-                + rawMessage);
-      }
-
-      String originalString = key.toUpperCase() + " |";
-      for (Object k : payload.keySet()) {
-        Object raw = payload.get(k);
-        String value = raw.toString();
-        if (raw instanceof Double) {
-          value = DECIMAL_FORMAT.get().format(raw);
-        }
-        originalString += " " + k.toString() + ":" + value;
-      }
-      payload.put("original_string", originalString);
-
-      replaceKey(payload, Constants.Fields.TIMESTAMP.getName(), new String[]{ 
"ts" });
-
-      long timestamp = 0L;
-      if (payload.containsKey(Constants.Fields.TIMESTAMP.getName())) {
-        try {
-          Double broTimestamp = ((Number) 
payload.get(Constants.Fields.TIMESTAMP.getName())).doubleValue();
-          String broTimestampFormatted = 
DECIMAL_FORMAT.get().format(broTimestamp);
-          timestamp = convertToMillis(broTimestamp);
-          payload.put(Constants.Fields.TIMESTAMP.getName(), timestamp);
-          payload.put("bro_timestamp", broTimestampFormatted);
-          _LOG.trace("[Metron] new bro record - timestamp : {}", 
payload.get(Constants.Fields.TIMESTAMP.getName()));
-        } catch (NumberFormatException nfe) {
-          _LOG.error("[Metron] timestamp is invalid: {}", 
payload.get("timestamp"));
-          payload.put(Constants.Fields.TIMESTAMP.getName(), 0);
-        }
-      }
-
-      boolean ipSrcReplaced = replaceKey(payload, 
Constants.Fields.SRC_ADDR.getName(), new String[]{"source_ip", "id.orig_h"});
-      if (!ipSrcReplaced) {
-        replaceKeyArray(payload, Constants.Fields.SRC_ADDR.getName(), new 
String[]{ "tx_hosts" });
-      }
-
-      boolean ipDstReplaced = replaceKey(payload, 
Constants.Fields.DST_ADDR.getName(), new String[]{"dest_ip", "id.resp_h"});
-      if (!ipDstReplaced) {
-        replaceKeyArray(payload, Constants.Fields.DST_ADDR.getName(), new 
String[]{ "rx_hosts" });
-      }
-
-      replaceKey(payload, Constants.Fields.SRC_PORT.getName(), new 
String[]{"source_port", "id.orig_p"});
-      replaceKey(payload, Constants.Fields.DST_PORT.getName(), new 
String[]{"dest_port", "id.resp_p"});
-
-      payload.put(Constants.Fields.PROTOCOL.getName(), key);
-      _LOG.debug("[Metron] Returning parsed message: {}", payload);
-      messages.add(payload);
-      return messages;
-
-    } catch (Exception e) {
-      String message = "Unable to parse Message: " + rawMessage;
-      _LOG.error(message, e);
-      throw new IllegalStateException(message, e);
-    }
-
-  }
-
-  private Long convertToMillis(Double timestampSeconds) {
-    return ((Double) (timestampSeconds * 1000)).longValue();
-  }
-
-  private boolean replaceKey(JSONObject payload, String toKey, String[] 
fromKeys) {
-    for (String fromKey : fromKeys) {
-      if (payload.containsKey(fromKey)) {
-        Object value = payload.remove(fromKey);
-        payload.put(toKey, value);
-        _LOG.trace("[Metron] Added {} to {}", toKey, payload);
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private boolean replaceKeyArray(JSONObject payload, String toKey, String[] 
fromKeys) {
-    for (String fromKey : fromKeys) {
-      if (payload.containsKey(fromKey)) {
-        JSONArray value = (JSONArray) payload.remove(fromKey);
-        if (value != null && !value.isEmpty()) {
-          payload.put(toKey, value.get(0));
-          _LOG.trace("[Metron] Added {} to {}", toKey, payload);
-          return true;
-        }
-      }
-    }
-    return false;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bro/JSONCleaner.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bro/JSONCleaner.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bro/JSONCleaner.java
deleted file mode 100644
index d910d12..0000000
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bro/JSONCleaner.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.parsers.bro;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
-
-public class JSONCleaner implements Serializable {
-
-       /**
-        * 
-        */
-       private static final long serialVersionUID = 1L;
-
-
-       /**
-        * @param jsonString
-        * @return
-        * @throws ParseException
-        * Takes a json String as input and modifies the keys to remove any 
characters other than . _ a-z A-Z or 0-9
-        */
-       @SuppressWarnings({"unchecked","rawtypes"})
-       public JSONObject clean(String jsonString) throws ParseException
-       {
-               JSONParser parser = new JSONParser();
-               
-               
-               Map json = (Map) parser.parse(jsonString);
-               JSONObject output = new JSONObject();
-           Iterator iter = json.entrySet().iterator();
-
-                while(iter.hasNext()){
-                     Map.Entry entry = (Map.Entry)iter.next();
-                     
-                     String key = 
((String)entry.getKey()).replaceAll("[^\\._a-zA-Z0-9]+","");
-                     output.put(key, entry.getValue());
-                   }
-
-               return output;
-       }
-       
-       
-       @SuppressWarnings({ "unchecked", "rawtypes", "unused" })
-       public static void main(String args[])
-       {
-               String jsonText = "{\"first_1\": 123, \"second\": [4, 5, 6], 
\"third\": 789}";
-               JSONCleaner cleaner = new JSONCleaner();
-               try {
-                       //cleaner.clean(jsonText);
-                       Map obj=new HashMap();
-                         obj.put("name","foo");
-                         obj.put("num", 100);
-                         obj.put("balance", 1000.21);
-                         obj.put("is_vip", true);
-                         obj.put("nickname",null);
-                       Map obj1 = new HashMap();
-                       obj1.put("sourcefile", obj);
-                       
-                       JSONObject json = new JSONObject(obj1);
-                       System.out.println(json);
-                         
-                         
-                         
-                         System.out.print(jsonText);
-               } catch (Exception e) {
-                       e.printStackTrace();
-               }
-       }
-       
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/cef/CEFParser.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/cef/CEFParser.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/cef/CEFParser.java
deleted file mode 100644
index 03de37c..0000000
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/cef/CEFParser.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.parsers.cef;
-
-import java.lang.invoke.MethodHandles;
-import java.nio.charset.Charset;
-import java.time.Clock;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import org.apache.metron.parsers.BasicParser;
-import org.apache.metron.parsers.ParseException;
-import org.apache.metron.parsers.utils.DateUtils;
-import org.apache.metron.parsers.utils.SyslogUtils;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class CEFParser extends BasicParser {
-       private static final long serialVersionUID = 1L;
-
-       protected static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-       private static final String HEADER_CAPTURE_PATTERN = "[^\\|]*";
-       private static final String EXTENSION_CAPTURE_PATTERN = "(?<!\\\\)=";
-       private static final Charset UTF_8 = Charset.forName("UTF-8");
-
-       private Pattern p;
-       private Pattern pext;
-
-       public void init() {
-
-               // CEF Headers: Device Vendor|Device Product|Device 
Version|Device Event
-               // Class ID|Name|Severity
-
-               String syslogTime = 
"(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\\b
 +(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]) 
(?!<[0-9])(?:2[0123]|[01]?[0-9]):(?:[0-5][0-9])(?::(?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?))(?![0-9])?";
-               String syslogTime5424 = 
"(?:\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(?:\\.\\d+)?(?:Z|[+-]\\d{2}:\\d{2}))";
-               String syslogPriority = "<(?:[0-9]+)>";
-               String syslogHost = "[a-z0-9\\.\\\\-_]+";
-
-               StringBuilder sb = new StringBuilder("");
-               sb.append("(?<syslogPriority>");
-               sb.append(syslogPriority);
-               sb.append(")?");
-               sb.append("(?<syslogTime>");
-               sb.append(syslogTime);
-               sb.append("|");
-               sb.append(syslogTime5424);
-               sb.append(")?");
-
-               sb.append("(?<syslogHost>");
-               sb.append(syslogHost);
-               sb.append(")?");
-
-               sb.append(".*");
-
-               sb.append("CEF: ?0\\|");
-
-               headerBlock("DeviceVendor", sb);
-               sb.append("\\|");
-               headerBlock("DeviceProduct", sb);
-               sb.append("\\|");
-               headerBlock("DeviceVersion", sb);
-               sb.append("\\|");
-               headerBlock("DeviceEvent", sb);
-               sb.append("\\|");
-               headerBlock("Name", sb);
-               sb.append("\\|");
-               headerBlock("Severity", sb);
-               sb.append("\\|");
-
-               // extension capture:
-               sb.append("(?<extensions>.*)");
-               String pattern = sb.toString();
-
-               p = Pattern.compile(pattern);
-
-               // key finder for extensions
-               pext = Pattern.compile(EXTENSION_CAPTURE_PATTERN);
-       }
-
-       @SuppressWarnings("unchecked")
-       public List<JSONObject> parse(byte[] rawMessage) {
-               List<JSONObject> messages = new ArrayList<>();
-
-               String cefString = new String(rawMessage, UTF_8);
-
-               Matcher matcher = p.matcher(cefString);
-
-               while (matcher.find()) {
-                       JSONObject obj = new JSONObject();
-                       if (matcher.matches()) {
-                               LOG.info("Found %d groups", 
matcher.groupCount());
-                               obj.put("DeviceVendor", 
matcher.group("DeviceVendor"));
-                               obj.put("DeviceProduct", 
matcher.group("DeviceProduct"));
-                               obj.put("DeviceVersion", 
matcher.group("DeviceVersion"));
-                               obj.put("DeviceEvent", 
matcher.group("DeviceEvent"));
-                               obj.put("Name", matcher.group("Name"));
-                               obj.put("Severity", 
standardizeSeverity(matcher.group("Severity")));
-                       }
-
-                       String ext = matcher.group("extensions");
-                       Matcher m = pext.matcher(ext);
-
-                       int index = 0;
-                       String key = null;
-                       String value = null;
-                       Map<String, String> labelMap = new HashMap<String, 
String>();
-
-                       while (m.find()) {
-                               if (key == null) {
-                                       key = ext.substring(index, m.start());
-                                       index = m.end();
-                                       if (!m.find()) {
-                                               break;
-                                       }
-                               }
-                               value = ext.substring(index, m.start());
-                               index = m.end();
-                               int v = value.lastIndexOf(" ");
-                               if (v > 0) {
-                                       String temp = value.substring(0, 
v).trim();
-                                       if (key.endsWith("Label")) {
-                                               labelMap.put(key.substring(0, 
key.length() - 5), temp);
-                                       } else {
-                                               obj.put(key, temp);
-                                       }
-                                       key = value.substring(v).trim();
-                               }
-                       }
-                       value = ext.substring(index);
-
-                       // Build a map of Label extensions to apply later
-                       if (key.endsWith("Label")) {
-                               labelMap.put(key.substring(0, key.length() - 
5), value);
-                       } else {
-                               obj.put(key, value);
-                       }
-
-                       // Apply the labels to custom fields
-                       for (Entry<String, String> label : labelMap.entrySet()) 
{
-                               mutate(obj, label.getKey(), label.getValue());
-                       }
-
-                       // Rename standard CEF fields to comply with Metron 
standards
-                       obj = mutate(obj, "dst", "ip_dst_addr");
-                       obj = mutate(obj, "dpt", "ip_dst_port");
-                       obj = convertToInt(obj, "ip_dst_port");
-
-                       obj = mutate(obj, "src", "ip_src_addr");
-                       obj = mutate(obj, "spt", "ip_src_port");
-                       obj = convertToInt(obj, "ip_src_port");
-
-                       obj = mutate(obj, "act", "deviceAction");
-                       // applicationProtocol
-                       obj = mutate(obj, "app", "protocol");
-
-                       obj.put("original_string", cefString);
-
-                       // apply timestamp from message if present, using rt, 
syslog
-                       // timestamp,
-                       // default to current system time
-
-                       if (obj.containsKey("rt")) {
-                               String rt = (String) obj.get("rt");
-                               try {
-                                       obj.put("timestamp", 
DateUtils.parseMultiformat(rt, DateUtils.DATE_FORMATS_CEF));
-                               } catch (java.text.ParseException e) {
-                                       throw new IllegalStateException("rt 
field present in CEF but cannot be parsed", e);
-                               }
-                       } else {
-                               String logTimestamp = 
matcher.group("syslogTime");
-                               if (!(logTimestamp == null || 
logTimestamp.isEmpty())) {
-                                       try {
-                                               obj.put("timestamp", 
SyslogUtils.parseTimestampToEpochMillis(logTimestamp, Clock.systemUTC()));
-                                       } catch (ParseException e) {
-                                               throw new 
IllegalStateException("Cannot parse syslog timestamp", e);
-                                       }
-                               } else {
-                                       obj.put("timestamp", 
System.currentTimeMillis());
-                               }
-                       }
-
-                       // add the host
-                       String host = matcher.group("syslogHost");
-                       if (!(host == null || host.isEmpty())) {
-                               obj.put("host", host);
-                       }
-
-                       messages.add(obj);
-               }
-               return messages;
-       }
-
-       @SuppressWarnings("unchecked")
-       private JSONObject convertToInt(JSONObject obj, String key) {
-               if (obj.containsKey(key)) {
-                       obj.put(key, Integer.valueOf((String) obj.get(key)));
-               }
-               return obj;
-       }
-
-       private void headerBlock(String name, StringBuilder sb) {
-               
sb.append("(?<").append(name).append(">").append(HEADER_CAPTURE_PATTERN).append(")");
-       }
-
-       /**
-        * Maps string based severity in CEF format to integer.
-        * 
-        * The strings are mapped according to the CEF 23 specification, taking 
the
-        * integer value as the value of the range buckets rounded up
-        * 
-        * The valid string values are: Unknown, Low, Medium, High, and 
Very-High.
-        * The valid integer values are: 0-3=Low, 4-6=Medium, 7- 8=High, and
-        * 9-10=Very-High.
-        * 
-        * @param severity
-        *            String or Integer
-        * @return Integer value mapped from the string
-        */
-       private Integer standardizeSeverity(String severity) {
-               if (severity.length() < 3) {
-                       // should be a number
-                       return Integer.valueOf(severity);
-               } else {
-                       switch (severity) {
-                       case "Low":
-                               return 2;
-                       case "Medium":
-                               return 5;
-                       case "High":
-                               return 8;
-                       case "Very-High":
-                               return 10;
-                       default:
-                               return 0;
-                       }
-               }
-       }
-
-       @Override
-       public void configure(Map<String, Object> config) {
-               // TODO Auto-generated method stub
-
-       }
-
-       @SuppressWarnings("unchecked")
-       private JSONObject mutate(JSONObject json, String oldKey, String 
newKey) {
-               if (json.containsKey(oldKey)) {
-                       json.put(newKey, json.remove(oldKey));
-               }
-               return json;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/fireeye/BasicFireEyeParser.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/fireeye/BasicFireEyeParser.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/fireeye/BasicFireEyeParser.java
deleted file mode 100644
index 489eb00..0000000
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/fireeye/BasicFireEyeParser.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.parsers.fireeye;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.metron.parsers.utils.ParserUtils;
-import org.apache.metron.parsers.BasicParser;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-public class BasicFireEyeParser extends BasicParser {
-
-       private static final long serialVersionUID = 6328907550159134550L;
-       protected static final Logger LOG = LoggerFactory
-                                       .getLogger(BasicFireEyeParser.class);
-
-
-       String tsRegex 
="([a-zA-Z]{3})\\s+(\\d+)\\s+(\\d+\\:\\d+\\:\\d+)\\s+(\\d+\\.\\d+\\.\\d+\\.\\d+)";
-       
-       
-       Pattern tsPattern = Pattern.compile(tsRegex);
-       // private transient static MetronGrok grok;
-       // private transient static InputStream pattern_url;
-
-       public BasicFireEyeParser() throws Exception {
-               // pattern_url = 
getClass().getClassLoader().getResourceAsStream(
-               // "patterns/fireeye");
-               //
-               // File file = ParserUtils.stream2file(pattern_url);
-               // grok = MetronGrok.create(file.getPath());
-               //
-               // grok.compile("%{FIREEYE_BASE}");
-       }
-
-       @Override
-       public void configure(Map<String, Object> parserConfig) {
-
-       }
-
-       @Override
-       public void init() {
-
-       }
-
-       @Override
-       public List<JSONObject> parse(byte[] raw_message) {
-               String toParse = "";
-               List<JSONObject> messages = new ArrayList<>();
-               try {
-
-                       toParse = new String(raw_message, "UTF-8");
-
-                       // String[] mTokens = toParse.split(" ");
-
-                       String positveIntPattern = "<[1-9][0-9]*>";
-                       Pattern p = Pattern.compile(positveIntPattern);
-                       Matcher m = p.matcher(toParse);
-
-                       String delimiter = "";
-
-                       while (m.find()) {
-                               delimiter = m.group();
-
-                       }
-
-                       if (!StringUtils.isBlank(delimiter)) {
-                               String[] tokens = toParse.split(delimiter);
-
-                               if (tokens.length > 1)
-                                       toParse = delimiter + tokens[1];
-
-                       }
-
-                       JSONObject toReturn = parseMessage(toParse);
-
-                       toReturn.put("timestamp", 
getTimeStamp(toParse,delimiter));
-                       messages.add(toReturn);
-                       return messages;
-
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       return null;
-               }
-
-       }
-
-       private long getTimeStamp(String toParse,String delimiter) throws 
ParseException {
-               
-               long ts = 0;
-               String month = null;
-               String day = null;
-               String time = null;
-               Matcher tsMatcher = tsPattern.matcher(toParse);
-               if (tsMatcher.find()) {
-                       month = tsMatcher.group(1);
-                       day = tsMatcher.group(2);
-                       time = tsMatcher.group(3);
-                       ts = ParserUtils.convertToEpoch(month, day, time, true);
-               } else {
-                       LOG.warn("Unable to find timestamp in message: {}", 
toParse);
-               }
-
-               return ts;
-       }
-
-       private JSONObject parseMessage(String toParse) {
-
-               // System.out.println("Received message: " + toParse);
-
-               // MetronMatch gm = grok.match(toParse);
-               // gm.captures();
-
-               JSONObject toReturn = new JSONObject();
-               //toParse = toParse.replaceAll("  ", " ");
-               String[] mTokens = toParse.split("\\s+");
-        //mTokens = toParse.split(" ");
-
-               // toReturn.putAll(gm.toMap());
-
-               String id = mTokens[4];
-
-               // We are not parsing the fedata for multi part message as we 
cannot
-               // determine how we can split the message and how many multi 
part
-               // messages can there be.
-               // The message itself will be stored in the response.
-
-               String[] tokens = id.split("\\.");
-               if (tokens.length == 2) {
-
-                       String[] array = Arrays.copyOfRange(mTokens, 1, 
mTokens.length - 1);
-                       String syslog = Joiner.on(" ").join(array);
-
-                       Multimap<String, String> multiMap = formatMain(syslog);
-
-                       for (String key : multiMap.keySet()) {
-
-                               String value = 
Joiner.on(",").join(multiMap.get(key));
-                               toReturn.put(key, value.trim());
-                       }
-
-               }
-
-               toReturn.put("original_string", toParse);
-
-               String ip_src_addr = (String) toReturn.get("dvc");
-               String ip_src_port = (String) toReturn.get("src_port");
-               String ip_dst_addr = (String) toReturn.get("dst_ip");
-               String ip_dst_port = (String) toReturn.get("dst_port");
-
-               if (ip_src_addr != null)
-                       toReturn.put("ip_src_addr", ip_src_addr);
-               if (ip_src_port != null)
-                       toReturn.put("ip_src_port", ip_src_port);
-               if (ip_dst_addr != null)
-                       toReturn.put("ip_dst_addr", ip_dst_addr);
-               if (ip_dst_port != null)
-                       toReturn.put("ip_dst_port", ip_dst_port);
-
-//             System.out.println(toReturn);
-
-               return toReturn;
-       }
-
-       private Multimap<String, String> formatMain(String in) {
-               Multimap<String, String> multiMap = ArrayListMultimap.create();
-               String input = in.replaceAll("cn3", "dst_port")
-                               .replaceAll("cs5", 
"cncHost").replaceAll("proto", "protocol")
-                               .replaceAll("rt=", 
"timestamp=").replaceAll("cs1", "malware")
-                               .replaceAll("dst=", "dst_ip=")
-                               .replaceAll("shost", "src_hostname")
-                               .replaceAll("dmac", 
"dst_mac").replaceAll("smac", "src_mac")
-                               .replaceAll("spt", "src_port")
-                               .replaceAll("\\bsrc\\b", "src_ip");
-               String[] tokens = input.split("\\|");
-
-               if (tokens.length > 0) {
-                       String message = tokens[tokens.length - 1];
-
-                       String pattern = 
"([\\w\\d]+)=([^=]*)(?=\\s*\\w+=|\\s*$) ";
-                       Pattern p = Pattern.compile(pattern);
-                       Matcher m = p.matcher(message);
-
-                       while (m.find()) {
-                               String[] str = m.group().split("=");
-                               multiMap.put(str[0], str[1]);
-
-                       }
-
-               }
-               return multiMap;
-       }
-
-       
-
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/ffcb91ed/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/grok/GrokBuilder.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/grok/GrokBuilder.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/grok/GrokBuilder.java
new file mode 100644
index 0000000..89f6f94
--- /dev/null
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/grok/GrokBuilder.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership.  The ASF licenses this file to you under the Apache 
License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the 
License.  You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.metron.parsers.grok;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import oi.thekraken.grok.api.Grok;
+import oi.thekraken.grok.api.exception.GrokException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.metron.common.utils.ResourceLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p> GrokBuilder builds an instance of Grok with patterns loaded. </p> 
Either the Parser
+ * Configuration, with keys : grokPath and patternLabel must be passed or 
those values must be
+ * provided using the with statements.
+ */
+public class GrokBuilder {
+
+  protected static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private Map<String, Object> parserConfiguration;
+  protected String patternsCommonDir = "/patterns/common";
+  protected List<Reader> readers = new ArrayList<>();
+  protected String grokPath;
+  protected String patternLabel;
+  protected boolean loadCommon = true;
+
+  /**
+   * Constructor.
+   */
+  public GrokBuilder() {
+  }
+
+  /**
+   * GrokBuilder with a parserConfiguration.
+   *
+   * @param parserConfiguration the configuration
+   * @return GrokBuilder
+   */
+  public GrokBuilder withParserConfiguration(Map<String, Object> 
parserConfiguration) {
+    this.parserConfiguration = parserConfiguration;
+    return this;
+  }
+
+  /**
+   * GrokBuilder with a {@link Reader} to read the file input.
+   *
+   * @param reader a Reader instance
+   * @return the Builder
+   */
+  public GrokBuilder withReader(Reader reader) {
+    this.readers.add(reader);
+    return this;
+  }
+
+  /**
+   * GrokBuilder with a grokPath.
+   *
+   * @param grokPath the grokPath
+   * @return the Builder
+   */
+  public GrokBuilder withGrokPath(String grokPath) {
+    if (StringUtils.isEmpty(grokPath)) {
+      throw new IllegalArgumentException("grokPath cannot be empty");
+    }
+    this.grokPath = grokPath;
+    return this;
+  }
+
+  /**
+   * GrokBuilder with a given pattern label.
+   *
+   * @param patternLabel the pattern label
+   * @return the Builder
+   */
+  public GrokBuilder withPatternLabel(String patternLabel) {
+    if (StringUtils.isEmpty(patternLabel)) {
+      throw new IllegalArgumentException("patternLabel cannot be empty");
+    }
+    this.patternLabel = patternLabel;
+    return this;
+  }
+
+  /**
+   * GrokBuilder with a given patterns common directory.
+   *
+   * @param patternsCommonDir the pattern common directory
+   * @return the Builder
+   */
+  public GrokBuilder withPatternsCommonDir(String patternsCommonDir) {
+    this.patternsCommonDir = patternsCommonDir;
+    return this;
+  }
+
+  /**
+   * Flag to try to load common grok file.
+   *
+   * @param loadCommon flag, if true load if false do not
+   * @return the Builder
+   */
+  public GrokBuilder withLoadCommon(boolean loadCommon) {
+    this.loadCommon = loadCommon;
+    return this;
+  }
+
+  /**
+   * Builds a fully configured Grok. Fully configured means that the Grok has 
all of the configured
+   * patterns loaded.
+   *
+   * @return Grok
+   * @throws Exception if there are problems loading or compiling the patterns 
into Grok
+   */
+  @SuppressWarnings("unchecked")
+  public Grok build() throws Exception {
+    if (parserConfiguration != null) {
+      if (parserConfiguration.containsKey("grokPath")) {
+        grokPath = (String) parserConfiguration.get("grokPath");
+      }
+      if (parserConfiguration.containsKey("patternLabel")) {
+        patternLabel = (String) parserConfiguration.get("patternLabel");
+      }
+      if (parserConfiguration.containsKey("readers")) {
+        List<Reader> configReaders = (List<Reader>) 
parserConfiguration.get("readers");
+        this.readers.addAll(configReaders);
+      }
+      if (parserConfiguration.containsKey("loadCommon")) {
+        this.loadCommon = (Boolean) parserConfiguration.get("loadCommon");
+      }
+    } else {
+      if (StringUtils.isEmpty(grokPath) && readers.size() == 0) {
+        throw new IllegalArgumentException("missing required grokPath");
+      }
+      if (StringUtils.isEmpty(patternLabel) && readers.size() == 0) {
+        throw new IllegalArgumentException("missing required patternLabel");
+      }
+    }
+
+    Grok grok = new Grok();
+    Map<String, Object> config = parserConfiguration;
+    if (config == null) {
+      config = new HashMap<>();
+    }
+    Object globalObject = config.get("globalConfig");
+    Map<String, Object> globalConfig = null;
+    if (globalObject != null) {
+      globalConfig = (Map<String, Object>) globalObject;
+    }
+
+    try (ResourceLoader resourceLoader = new ResourceLoader.Builder()
+        .withConfiguration(globalConfig).build()) {
+      Map<String, InputStream> streamMap = null;
+      if (loadCommon) {
+        LOG.debug("Grok parser loading common patterns from: {}", 
patternsCommonDir);
+        streamMap = resourceLoader.getResources(patternsCommonDir);
+        load(grok, streamMap);
+      }
+      if (readers.size() == 0) {
+        if (StringUtils.isNotEmpty(grokPath)) {
+          LOG.debug("Loading parser-specific patterns from: {}", grokPath);
+
+          streamMap = resourceLoader.getResources(grokPath);
+
+          load(grok, streamMap);
+        }
+      } else {
+        LOG.debug("Loading pattern for reader");
+        readers.forEach(x -> {
+          try {
+            load(grok, x);
+          } catch (GrokException e) {
+            LOG.error("error loading grok pattern from reader", e);
+            throw new IllegalStateException(e);
+          }
+
+        });
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Grok parser set the following grok expression: {}",
+            grok.getNamedRegexCollectionById(patternLabel));
+      }
+      if (StringUtils.isNotEmpty(patternLabel)) {
+        String grokPattern = "%{" + patternLabel + "}";
+        grok.compile(grokPattern);
+        LOG.debug("Compiled grok pattern {}", grokPattern);
+      }
+
+      return grok;
+    }
+  }
+
+  private void load(Grok grok, Map<String, InputStream> streams) throws 
IOException, GrokException {
+    for (Entry<String, InputStream> entry : streams.entrySet()) {
+      try (InputStream thisStream = entry.getValue()) {
+        if (thisStream == null) {
+          throw new RuntimeException(
+              "Unable to initialize grok parser: Unable to load " + 
entry.getKey()
+                  + " from either HDFS or locally");
+        }
+        LOG.debug("Loading patterns from: {}", entry.getKey());
+        grok.addPatternFromReader(new InputStreamReader(thisStream));
+      }
+    }
+  }
+
+  private void load(Grok grok, Reader reader) throws GrokException {
+    grok.addPatternFromReader(reader);
+  }
+
+}

Reply via email to