http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/fireeye/BasicFireEyeParser.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/fireeye/BasicFireEyeParser.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/fireeye/BasicFireEyeParser.java
index 489eb00..1cf9fb7 100644
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/fireeye/BasicFireEyeParser.java
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/fireeye/BasicFireEyeParser.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,18 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.metron.parsers.fireeye;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.metron.parsers.utils.ParserUtils;
-import org.apache.metron.parsers.BasicParser;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import java.lang.invoke.MethodHandles;
+import java.nio.charset.StandardCharsets;
 import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -34,189 +30,162 @@ import java.util.List;
 import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.metron.parsers.BasicParser;
+import org.apache.metron.parsers.utils.ParserUtils;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class BasicFireEyeParser extends BasicParser {
-
-       private static final long serialVersionUID = 6328907550159134550L;
-       protected static final Logger LOG = LoggerFactory
-                                       .getLogger(BasicFireEyeParser.class);
-
-
-       String tsRegex 
="([a-zA-Z]{3})\\s+(\\d+)\\s+(\\d+\\:\\d+\\:\\d+)\\s+(\\d+\\.\\d+\\.\\d+\\.\\d+)";
-       
-       
-       Pattern tsPattern = Pattern.compile(tsRegex);
-       // private transient static MetronGrok grok;
-       // private transient static InputStream pattern_url;
-
-       public BasicFireEyeParser() throws Exception {
-               // pattern_url = 
getClass().getClassLoader().getResourceAsStream(
-               // "patterns/fireeye");
-               //
-               // File file = ParserUtils.stream2file(pattern_url);
-               // grok = MetronGrok.create(file.getPath());
-               //
-               // grok.compile("%{FIREEYE_BASE}");
-       }
-
-       @Override
-       public void configure(Map<String, Object> parserConfig) {
-
-       }
-
-       @Override
-       public void init() {
-
-       }
-
-       @Override
-       public List<JSONObject> parse(byte[] raw_message) {
-               String toParse = "";
-               List<JSONObject> messages = new ArrayList<>();
-               try {
-
-                       toParse = new String(raw_message, "UTF-8");
-
-                       // String[] mTokens = toParse.split(" ");
-
-                       String positveIntPattern = "<[1-9][0-9]*>";
-                       Pattern p = Pattern.compile(positveIntPattern);
-                       Matcher m = p.matcher(toParse);
-
-                       String delimiter = "";
-
-                       while (m.find()) {
-                               delimiter = m.group();
-
-                       }
-
-                       if (!StringUtils.isBlank(delimiter)) {
-                               String[] tokens = toParse.split(delimiter);
-
-                               if (tokens.length > 1)
-                                       toParse = delimiter + tokens[1];
-
-                       }
-
-                       JSONObject toReturn = parseMessage(toParse);
-
-                       toReturn.put("timestamp", 
getTimeStamp(toParse,delimiter));
-                       messages.add(toReturn);
-                       return messages;
-
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       return null;
-               }
-
-       }
-
-       private long getTimeStamp(String toParse,String delimiter) throws 
ParseException {
-               
-               long ts = 0;
-               String month = null;
-               String day = null;
-               String time = null;
-               Matcher tsMatcher = tsPattern.matcher(toParse);
-               if (tsMatcher.find()) {
-                       month = tsMatcher.group(1);
-                       day = tsMatcher.group(2);
-                       time = tsMatcher.group(3);
-                       ts = ParserUtils.convertToEpoch(month, day, time, true);
-               } else {
-                       LOG.warn("Unable to find timestamp in message: {}", 
toParse);
-               }
-
-               return ts;
-       }
-
-       private JSONObject parseMessage(String toParse) {
-
-               // System.out.println("Received message: " + toParse);
-
-               // MetronMatch gm = grok.match(toParse);
-               // gm.captures();
-
-               JSONObject toReturn = new JSONObject();
-               //toParse = toParse.replaceAll("  ", " ");
-               String[] mTokens = toParse.split("\\s+");
-        //mTokens = toParse.split(" ");
-
-               // toReturn.putAll(gm.toMap());
-
-               String id = mTokens[4];
-
-               // We are not parsing the fedata for multi part message as we 
cannot
-               // determine how we can split the message and how many multi 
part
-               // messages can there be.
-               // The message itself will be stored in the response.
-
-               String[] tokens = id.split("\\.");
-               if (tokens.length == 2) {
-
-                       String[] array = Arrays.copyOfRange(mTokens, 1, 
mTokens.length - 1);
-                       String syslog = Joiner.on(" ").join(array);
-
-                       Multimap<String, String> multiMap = formatMain(syslog);
-
-                       for (String key : multiMap.keySet()) {
-
-                               String value = 
Joiner.on(",").join(multiMap.get(key));
-                               toReturn.put(key, value.trim());
-                       }
-
-               }
-
-               toReturn.put("original_string", toParse);
-
-               String ip_src_addr = (String) toReturn.get("dvc");
-               String ip_src_port = (String) toReturn.get("src_port");
-               String ip_dst_addr = (String) toReturn.get("dst_ip");
-               String ip_dst_port = (String) toReturn.get("dst_port");
-
-               if (ip_src_addr != null)
-                       toReturn.put("ip_src_addr", ip_src_addr);
-               if (ip_src_port != null)
-                       toReturn.put("ip_src_port", ip_src_port);
-               if (ip_dst_addr != null)
-                       toReturn.put("ip_dst_addr", ip_dst_addr);
-               if (ip_dst_port != null)
-                       toReturn.put("ip_dst_port", ip_dst_port);
-
-//             System.out.println(toReturn);
-
-               return toReturn;
-       }
-
-       private Multimap<String, String> formatMain(String in) {
-               Multimap<String, String> multiMap = ArrayListMultimap.create();
-               String input = in.replaceAll("cn3", "dst_port")
-                               .replaceAll("cs5", 
"cncHost").replaceAll("proto", "protocol")
-                               .replaceAll("rt=", 
"timestamp=").replaceAll("cs1", "malware")
-                               .replaceAll("dst=", "dst_ip=")
-                               .replaceAll("shost", "src_hostname")
-                               .replaceAll("dmac", 
"dst_mac").replaceAll("smac", "src_mac")
-                               .replaceAll("spt", "src_port")
-                               .replaceAll("\\bsrc\\b", "src_ip");
-               String[] tokens = input.split("\\|");
-
-               if (tokens.length > 0) {
-                       String message = tokens[tokens.length - 1];
-
-                       String pattern = 
"([\\w\\d]+)=([^=]*)(?=\\s*\\w+=|\\s*$) ";
-                       Pattern p = Pattern.compile(pattern);
-                       Matcher m = p.matcher(message);
-
-                       while (m.find()) {
-                               String[] str = m.group().split("=");
-                               multiMap.put(str[0], str[1]);
-
-                       }
 
-               }
-               return multiMap;
-       }
 
-       
+public class BasicFireEyeParser extends BasicParser {
 
+  private static final long serialVersionUID = 6328907550159134550L;
+  protected static final Logger LOG = LoggerFactory
+          .getLogger(MethodHandles.lookup().lookupClass());
+
+
+  private static final String tsRegex = 
"([a-zA-Z]{3})\\s+(\\d+)\\s+(\\d+\\:\\d+\\:\\d+)"
+          + "\\s+(\\d+\\.\\d+\\.\\d+\\.\\d+)";
+  private static final Pattern tsPattern = Pattern.compile(tsRegex);
+  private static final String syslogPriorityRegex = "<[1-9][0-9]*>";
+  private static final Pattern syslogPriorityPattern = 
Pattern.compile(syslogPriorityRegex);
+  private static final String nvRegex = 
"([\\w\\d]+)=([^=]*)(?=\\s*\\w+=|\\s*$) ";
+  private static final Pattern nvPattern = Pattern.compile(nvRegex);
+
+  @Override
+  public void configure(Map<String, Object> parserConfig) {}
+
+  @Override
+  public void init() {}
+
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public List<JSONObject> parse(byte[] rawMessage) {
+    String toParse;
+    List<JSONObject> messages = new ArrayList<>();
+    try {
+
+      toParse = new String(rawMessage, StandardCharsets.UTF_8);
+
+      // because we support what is basically a malformed syslog 3164 message 
having
+      // some form of text before the PRIORITY, we need to use the priority as
+      // a delimiter
+      Matcher m = syslogPriorityPattern.matcher(toParse);
+
+      String delimiter = "";
+
+      while (m.find()) {
+        delimiter = m.group();
+      }
+
+      if (!StringUtils.isBlank(delimiter)) {
+        String[] tokens = toParse.split(delimiter);
+        if (tokens.length > 1) {
+          toParse = delimiter + tokens[1];
+        }
+      }
+
+      // parse the main message
+      JSONObject toReturn = parseMessage(toParse);
+      toReturn.put("timestamp", getTimeStamp(toParse));
+      messages.add(toReturn);
+      return messages;
+    } catch (Exception e) {
+      String message = "Unable to parse " + new String(rawMessage) + ": " + 
e.getMessage();
+      LOG.error(message, e);
+      throw new IllegalStateException(message, e);
+    }
+  }
+
+  private long getTimeStamp(String toParse) throws ParseException {
+    long timestamp = 0;
+    String month;
+    String day;
+    String time;
+    Matcher tsMatcher = tsPattern.matcher(toParse);
+    if (tsMatcher.find()) {
+      month = tsMatcher.group(1);
+      day = tsMatcher.group(2);
+      time = tsMatcher.group(3);
+      timestamp = ParserUtils.convertToEpoch(month, day, time, true);
+    } else {
+      LOG.warn("Unable to find timestamp in message: {}", toParse);
+    }
+    return timestamp;
+  }
+
+  @SuppressWarnings("unchecked")
+  private JSONObject parseMessage(String toParse) {
+
+    JSONObject toReturn = new JSONObject();
+    String[] messageTokens = toParse.split("\\s+");
+    String id = messageTokens[4];
+
+    // We are not parsing the fedata for multi part message as we cannot
+    // determine how we can split the message and how many multi part
+    // messages can there be.
+    // The message itself will be stored in the response.
+
+    String[] tokens = id.split("\\.");
+    if (tokens.length == 2) {
+
+      String[] array = Arrays.copyOfRange(messageTokens, 1, 
messageTokens.length - 1);
+      String syslog = Joiner.on(" ").join(array);
+
+      Multimap<String, String> multiMap = formatMain(syslog);
+
+      for (String key : multiMap.keySet()) {
+        String value = Joiner.on(",").join(multiMap.get(key));
+        toReturn.put(key, value.trim());
+      }
+    }
+
+    toReturn.put("original_string", toParse);
+
+    final String ipSrcAddr = (String) toReturn.get("dvc");
+    final String ipSrcPort = (String) toReturn.get("src_port");
+    final String ipDstDddr = (String) toReturn.get("dst_ip");
+    final String ipDstPort = (String) toReturn.get("dst_port");
+
+    if (ipSrcAddr != null) {
+      toReturn.put("ip_src_addr", ipSrcAddr);
+    }
+    if (ipSrcPort != null) {
+      toReturn.put("ip_src_port", ipSrcPort);
+    }
+    if (ipDstDddr != null) {
+      toReturn.put("ip_dst_addr", ipDstDddr);
+    }
+    if (ipDstPort != null) {
+      toReturn.put("ip_dst_port", ipDstPort);
+    }
+    return toReturn;
+  }
+
+  private Multimap<String, String> formatMain(String in) {
+    Multimap<String, String> multiMap = ArrayListMultimap.create();
+    String input = in.replaceAll("cn3", "dst_port")
+            .replaceAll("cs5", "cncHost").replaceAll("proto", "protocol")
+            .replaceAll("rt=", "timestamp=").replaceAll("cs1", "malware")
+            .replaceAll("dst=", "dst_ip=")
+            .replaceAll("shost", "src_hostname")
+            .replaceAll("dmac", "dst_mac").replaceAll("smac", "src_mac")
+            .replaceAll("spt", "src_port")
+            .replaceAll("\\bsrc\\b", "src_ip");
+    String[] tokens = input.split("\\|");
+
+    if (tokens.length > 0) {
+      String message = tokens[tokens.length - 1];
+      Matcher m = nvPattern.matcher(message);
+
+      while (m.find()) {
+        String[] str = m.group().split("=");
+        multiMap.put(str[0], str[1]);
+      }
+    }
+    return multiMap;
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java
index b7b91c0..207c070 100644
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java
@@ -21,5 +21,5 @@ import org.apache.metron.stellar.dsl.Context;
 
 public interface MessageFilter<T> extends Configurable{
 
-       boolean emitTuple(T message, Context context);
+       boolean emit(T message, Context context);
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
index e3b903e..c9f8351 100644
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
@@ -17,7 +17,12 @@
  */
 package org.apache.metron.parsers.interfaces;
 
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.metron.parsers.DefaultMessageParserResult;
+
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -31,23 +36,47 @@ public interface MessageParser<T> extends Configurable {
   /**
    * Take raw data and convert it to a list of messages.
    *
-   * @param rawMessage
+   * @param rawMessage the raw bytes of the message
    * @return If null is returned, this is treated as an empty list.
    */
-  List<T> parse(byte[] rawMessage);
+  @Deprecated
+  default List<T> parse(byte[] rawMessage) {
+    throw new NotImplementedException("parse is not implemented");
+  }
 
   /**
    * Take raw data and convert it to an optional list of messages.
-   * @param parseMessage
+   * @param parseMessage the raw bytes of the message
    * @return If null is returned, this is treated as an empty list.
    */
+  @Deprecated
   default Optional<List<T>> parseOptional(byte[] parseMessage) {
     return Optional.ofNullable(parse(parseMessage));
   }
 
   /**
+   * Take raw data and convert it to messages.  Each raw message may produce 
multiple messages and therefore
+   * multiple errors.  A {@link MessageParserResult} is returned, which will 
have both the messages produced
+   * and the errors.
+   * @param parseMessage the raw bytes of the message
+   * @return Optional of {@link MessageParserResult}
+   */
+  default Optional<MessageParserResult<T>> parseOptionalResult(byte[] 
parseMessage) {
+    Optional<MessageParserResult<T>> result = Optional.empty();
+    try {
+      Optional<List<T>> optionalMessages = parseOptional(parseMessage);
+      if (optionalMessages.isPresent()) {
+        result = Optional.of(new 
DefaultMessageParserResult<>(optionalMessages.get()));
+      }
+    } catch (Throwable t) {
+      return Optional.of(new DefaultMessageParserResult<>(t));
+    }
+    return result;
+  }
+
+  /**
    * Validate the message to ensure that it's correct.
-   * @param message
+   * @param message the message to validate
    * @return true if the message is valid, false if not
    */
   boolean validate(T message);

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParserResult.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParserResult.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParserResult.java
new file mode 100644
index 0000000..891e94f
--- /dev/null
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParserResult.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.parsers.interfaces;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Result object MessageParser calls.
+ * @param <T>
+ */
+public interface MessageParserResult<T> {
+  /**
+   * Returns the Message objects of {@code T}
+   * @return {@code List}
+   */
+  List<T> getMessages();
+
+  /**
+   * Returns a map of raw message objects to the {@code Throwable} they 
triggered.
+   * @return {@code Map}
+   */
+  Map<Object,Throwable> getMessageThrowables();
+
+  /**
+   * Returns a master {@code Throwable} for a parse call.  This represents a 
complete
+   * call failure, as opposed to one associated with a message.
+   * @return {@code Optional}{@code Throwable}
+   */
+  Optional<Throwable> getMasterThrowable();
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/paloalto/BasicPaloAltoFirewallParser.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/paloalto/BasicPaloAltoFirewallParser.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/paloalto/BasicPaloAltoFirewallParser.java
index 9051f09..c8e8b62 100644
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/paloalto/BasicPaloAltoFirewallParser.java
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/paloalto/BasicPaloAltoFirewallParser.java
@@ -34,21 +34,26 @@ import java.util.regex.Pattern;
 
 public class BasicPaloAltoFirewallParser extends BasicParser {
 
-  private static boolean empty_attribute( final String s ) {
+  private static boolean empty_attribute(final String s) {
     return s == null || s.trim().isEmpty() || s.equals("\"\"");
   }
 
-  private static String unquoted_attribute( String s ) {
+  private static String unquoted_attribute(String s) {
     s = s.trim();
-    if ( s.startsWith( "\"" ) && s.endsWith( "\"" ) )
-      return s.substring( 1, s.length( ) - 1 );
+    if (s.startsWith("\"") && s.endsWith("\""))
+      return s.substring(1, s.length() - 1);
     return s;
   }
 
-  private static final Logger _LOG = LoggerFactory.getLogger
-          (BasicPaloAltoFirewallParser.class);
+  private static final Logger _LOG = 
LoggerFactory.getLogger(BasicPaloAltoFirewallParser.class);
 
   private static final long serialVersionUID = 3147090149725343999L;
+
+  private static final String LogTypeConfig = "CONFIG";
+  private static final String LogTypeSystem = "SYSTEM";
+  private static final String LogTypeThreat = "THREAT";
+  private static final String LogTypeTraffic = "TRAFFIC";
+
   public static final String PaloAltoDomain = "palo_alto_domain";
   public static final String ReceiveTime = "receive_time";
   public static final String SerialNum = "serial";
@@ -101,6 +106,21 @@ public class BasicPaloAltoFirewallParser extends 
BasicParser {
   public static final String ParentSessionStartTime = 
"parent_session_start_time";
   public static final String TunnelType = "tunnel_type";
 
+  //System
+  public static final String EventId = "event_id";
+  public static final String Object = "object";
+  public static final String Module = "module";
+  public static final String Description = "description";
+
+  //Config
+  public static final String Command = "command";
+  public static final String Admin = "admin";
+  public static final String Client = "client";
+  public static final String Result = "result";
+  public static final String ConfigurationPath = "configuration_path";
+  public static final String BeforeChangeDetail = "before_change_detail";
+  public static final String AfterChangeDetail = "after_change_detail";
+
   //Threat
   public static final String URL = "url";
   public static final String HOST = "host";
@@ -113,7 +133,7 @@ public class BasicPaloAltoFirewallParser extends 
BasicParser {
   public static final String PCAPID = "pcap_id";
   public static final String WFFileDigest = "filedigest";
   public static final String WFCloud = "cloud";
-  public static final String UserAgent= "user_agent";
+  public static final String UserAgent = "user_agent";
   public static final String WFFileType = "filetype";
   public static final String XForwardedFor = "xff";
   public static final String Referer = "referer";
@@ -159,8 +179,6 @@ public class BasicPaloAltoFirewallParser extends 
BasicParser {
 
       toParse = new String(msg, "UTF-8");
       _LOG.debug("Received message: {}", toParse);
-
-
       parseMessage(toParse, outputMessage);
       long timestamp = System.currentTimeMillis();
       outputMessage.put("timestamp", System.currentTimeMillis());
@@ -178,159 +196,258 @@ public class BasicPaloAltoFirewallParser extends 
BasicParser {
   private void parseMessage(String message, JSONObject outputMessage) {
 
     String[] tokens = 
Iterables.toArray(Splitter.on(Pattern.compile(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)")).split(message),
 String.class);
+
     int parser_version = 0;
 
     String type = tokens[3].trim();
 
+    //validate log types
+    if (!type.equals(LogTypeConfig) &&
+        !type.equals(LogTypeThreat) &&
+        !type.equals(LogTypeTraffic) &&
+        !type.equals(LogTypeSystem)) {
+      throw new UnsupportedOperationException("Unsupported log type.");
+    }
+
     //populate common objects
-    if( !empty_attribute( tokens[0] ) ) outputMessage.put(PaloAltoDomain, 
tokens[0].trim());
-    if( !empty_attribute( tokens[1] ) ) outputMessage.put(ReceiveTime, 
tokens[1].trim());
-    if( !empty_attribute( tokens[2] ) ) outputMessage.put(SerialNum, 
tokens[2].trim());
+    if (!empty_attribute(tokens[0])) outputMessage.put(PaloAltoDomain, 
tokens[0].trim());
+    if (!empty_attribute(tokens[1])) outputMessage.put(ReceiveTime, 
tokens[1].trim());
+    if (!empty_attribute(tokens[2])) outputMessage.put(SerialNum, 
tokens[2].trim());
     outputMessage.put(Type, type);
-    if( !empty_attribute( tokens[4] ) ) outputMessage.put(ThreatContentType, 
unquoted_attribute(tokens[4]));
-    if( !empty_attribute( tokens[5] ) ) outputMessage.put(ConfigVersion, 
tokens[5].trim());
-    if( !empty_attribute( tokens[6] ) ) outputMessage.put(GenerateTime, 
tokens[6].trim());
-    if( !empty_attribute( tokens[7] ) ) outputMessage.put(SourceAddress, 
tokens[7].trim());
-    if( !empty_attribute( tokens[8] ) ) outputMessage.put(DestinationAddress, 
tokens[8].trim());
-    if( !empty_attribute( tokens[9] ) ) outputMessage.put(NATSourceIP, 
tokens[9].trim());
-    if( !empty_attribute( tokens[10] ) ) outputMessage.put(NATDestinationIP, 
tokens[10].trim());
-    if( !empty_attribute( tokens[11] ) ) outputMessage.put(Rule, 
unquoted_attribute(tokens[11]));
-    if( !empty_attribute( tokens[12] ) ) outputMessage.put(SourceUser, 
unquoted_attribute(tokens[12]));
-    if( !empty_attribute( tokens[13] ) ) outputMessage.put(DestinationUser, 
unquoted_attribute(tokens[13]));
-    if( !empty_attribute( tokens[14] ) ) outputMessage.put(Application, 
unquoted_attribute(tokens[14]));
-    if( !empty_attribute( tokens[15] ) ) outputMessage.put(VirtualSystem, 
unquoted_attribute(tokens[15]));
-    if( !empty_attribute( tokens[16] ) ) outputMessage.put(SourceZone, 
unquoted_attribute(tokens[16]));
-    if( !empty_attribute( tokens[17] ) ) outputMessage.put(DestinationZone, 
unquoted_attribute(tokens[17]));
-    if( !empty_attribute( tokens[18] ) ) outputMessage.put(InboundInterface, 
unquoted_attribute(tokens[18]));
-    if( !empty_attribute( tokens[19] ) ) outputMessage.put(OutboundInterface, 
unquoted_attribute(tokens[19]));
-    if( !empty_attribute( tokens[20] ) ) outputMessage.put(LogAction, 
unquoted_attribute(tokens[20]));
-    if( !empty_attribute( tokens[21] ) ) outputMessage.put(TimeLogged, 
tokens[21].trim());
-    if( !empty_attribute( tokens[22] ) ) outputMessage.put(SessionID, 
tokens[22].trim());
-    if( !empty_attribute( tokens[23] ) ) outputMessage.put(RepeatCount, 
tokens[23].trim());
-    if( !empty_attribute( tokens[24] ) ) outputMessage.put(SourcePort, 
tokens[24].trim());
-    if( !empty_attribute( tokens[25] ) ) outputMessage.put(DestinationPort, 
tokens[25].trim());
-    if( !empty_attribute( tokens[26] ) ) outputMessage.put(NATSourcePort, 
tokens[26].trim());
-    if( !empty_attribute( tokens[27] ) ) outputMessage.put(NATDestinationPort, 
tokens[27].trim());
-    if( !empty_attribute( tokens[28] ) ) outputMessage.put(Flags, 
tokens[28].trim());
-    if( !empty_attribute( tokens[29] ) ) outputMessage.put(IPProtocol, 
unquoted_attribute(tokens[29]));
-    if( !empty_attribute( tokens[30] ) ) outputMessage.put(Action, 
unquoted_attribute(tokens[30]));
-
-
-    if ("THREAT".equals(type.toUpperCase())) {
-      int p1_offset = 0;
-      if      (tokens.length == 45) parser_version = 60;
-      else if (tokens.length == 53) parser_version = 61;
-      else if (tokens.length == 61) {
-        parser_version = 70;
-        p1_offset = 1;
-      }
-      else if (tokens.length == 72) {
-        parser_version = 80;
-        p1_offset =1;
+    if (!empty_attribute(tokens[4])) outputMessage.put(ThreatContentType, 
unquoted_attribute(tokens[4]));
+    if (!empty_attribute(tokens[5])) outputMessage.put(ConfigVersion, 
tokens[5].trim());
+    if (!empty_attribute(tokens[6])) outputMessage.put(GenerateTime, 
tokens[6].trim());
+
+    if (LogTypeConfig.equals(type.toUpperCase())) {
+      // There are two fields in custom logs only and they are not in the 
default format.
+      // But we need to parse them if they exist
+      if (tokens.length == 16 || tokens.length == 18) parser_version = 61;
+      else if (tokens.length == 22 || tokens.length == 24) parser_version = 80;
+
+      if (parser_version >= 61) {
+        if (!empty_attribute(tokens[7])) outputMessage.put(HOST, 
tokens[7].trim());
+        if (!empty_attribute(tokens[8])) outputMessage.put(VirtualSystem, 
tokens[8].trim());
+        if (!empty_attribute(tokens[9])) outputMessage.put(Command, 
tokens[9].trim());
+        if (!empty_attribute(tokens[10])) outputMessage.put(Admin, 
tokens[10].trim());
+        if (!empty_attribute(tokens[11])) outputMessage.put(Client, 
unquoted_attribute(tokens[11]));
+        if (!empty_attribute(tokens[12])) outputMessage.put(Result, 
unquoted_attribute(tokens[12]));
+        if (!empty_attribute(tokens[13])) outputMessage.put(ConfigurationPath, 
unquoted_attribute(tokens[13]));
       }
-      outputMessage.put(ParserVersion, parser_version);
-      if( !empty_attribute( tokens[31] ) ) {
-        outputMessage.put(URL, unquoted_attribute(tokens[31]));
-        try {
-            URL url = new URL(unquoted_attribute(tokens[31]));
-            outputMessage.put(HOST, url.getHost());
-        } catch (MalformedURLException e) {
+
+      if (parser_version == 61) {
+        if (!empty_attribute(tokens[14])) outputMessage.put(Seqno, 
unquoted_attribute(tokens[14]));
+        if (!empty_attribute(tokens[15])) outputMessage.put(ActionFlags, 
unquoted_attribute(tokens[15]));
+        if (tokens.length == 18) {
+          if (!empty_attribute(tokens[16]))
+            outputMessage.put(BeforeChangeDetail, 
unquoted_attribute(tokens[16]));
+          if (!empty_attribute(tokens[17]))
+            outputMessage.put(AfterChangeDetail, 
unquoted_attribute(tokens[17]));
         }
       }
-      if( !empty_attribute( tokens[32] ) ) outputMessage.put(ThreatID, 
tokens[32].trim());
-      if( !empty_attribute( tokens[33] ) ) outputMessage.put(Category, 
unquoted_attribute(tokens[33]));
-      if( !empty_attribute( tokens[34] ) ) outputMessage.put(Severity, 
unquoted_attribute(tokens[34]));
-      if( !empty_attribute( tokens[35] ) ) outputMessage.put(Direction, 
unquoted_attribute(tokens[35]));
-      if( !empty_attribute( tokens[36] ) ) outputMessage.put(Seqno, 
tokens[36].trim());
-      if( !empty_attribute( tokens[37] ) ) outputMessage.put(ActionFlags, 
unquoted_attribute(tokens[37]));
-      if( !empty_attribute( tokens[38] ) ) outputMessage.put(SourceLocation, 
unquoted_attribute(tokens[38]));
-      if( !empty_attribute( tokens[39] ) ) 
outputMessage.put(DestinationLocation, unquoted_attribute(tokens[39]));
-      if( !empty_attribute( tokens[41] ) ) outputMessage.put(ContentType, 
unquoted_attribute(tokens[41]));
-      if( !empty_attribute( tokens[42] ) ) outputMessage.put(PCAPID, 
tokens[42].trim());
-      if( !empty_attribute( tokens[43] ) ) outputMessage.put(WFFileDigest, 
unquoted_attribute(tokens[43]));
-      if( !empty_attribute( tokens[44] ) ) outputMessage.put(WFCloud, 
unquoted_attribute(tokens[44]));
-      if ( parser_version >= 61) {
-        if( !empty_attribute( tokens[(45 + p1_offset)] ) ) 
outputMessage.put(UserAgent, unquoted_attribute(tokens[(45 + p1_offset)]));
-        if( !empty_attribute( tokens[(46 + p1_offset)] ) ) 
outputMessage.put(WFFileType, unquoted_attribute(tokens[(46 + p1_offset)]));
-        if( !empty_attribute( tokens[(47 + p1_offset)] ) ) 
outputMessage.put(XForwardedFor, unquoted_attribute(tokens[(47 + p1_offset)]));
-        if( !empty_attribute( tokens[(48 + p1_offset)] ) ) 
outputMessage.put(Referer, unquoted_attribute(tokens[(48 + p1_offset)]));
-        if( !empty_attribute( tokens[(49 + p1_offset)] ) ) 
outputMessage.put(WFSender, unquoted_attribute(tokens[(49 + p1_offset)]));
-        if( !empty_attribute( tokens[(50 + p1_offset)] ) ) 
outputMessage.put(WFSubject, unquoted_attribute(tokens[(50 + p1_offset)]));
-        if( !empty_attribute( tokens[(51 + p1_offset)] ) ) 
outputMessage.put(WFRecipient, unquoted_attribute(tokens[(51 + p1_offset)]));
-        if( !empty_attribute( tokens[(52 + p1_offset)] ) ) 
outputMessage.put(WFReportID, unquoted_attribute(tokens[(52 + p1_offset)]));
-      }
-      if ( parser_version >= 70) { 
-        if( !empty_attribute( tokens[45] ) ) outputMessage.put(URLIndex, 
tokens[45].trim());
-        if( !empty_attribute( tokens[54] ) ) outputMessage.put(DGH1, 
tokens[54].trim());
-        if( !empty_attribute( tokens[55] ) ) outputMessage.put(DGH2, 
tokens[55].trim());
-        if( !empty_attribute( tokens[56] ) ) outputMessage.put(DGH3, 
tokens[56].trim());
-        if( !empty_attribute( tokens[57] ) ) outputMessage.put(DGH4, 
tokens[57].trim());
-        if( !empty_attribute( tokens[58] ) ) outputMessage.put(VSYSName, 
unquoted_attribute(tokens[58]));
-        if( !empty_attribute( tokens[59] ) ) outputMessage.put(DeviceName, 
unquoted_attribute(tokens[59]));
-      }
-      if ( parser_version >= 80) {
-        if( !empty_attribute( tokens[61] ) ) outputMessage.put(SourceVmUuid, 
tokens[61].trim());
-        if( !empty_attribute( tokens[62] ) ) 
outputMessage.put(DestinationVmUuid, tokens[62].trim());
-        if( !empty_attribute( tokens[63] ) ) outputMessage.put(HTTPMethod, 
tokens[63].trim());
-        if( !empty_attribute( tokens[64] ) ) outputMessage.put(TunnelId, 
tokens[64].trim());
-        if( !empty_attribute( tokens[65] ) ) outputMessage.put(MonitorTag, 
tokens[65].trim());
-        if( !empty_attribute( tokens[66] ) ) 
outputMessage.put(ParentSessionId, tokens[66].trim());
-        if( !empty_attribute( tokens[67] ) ) 
outputMessage.put(ParentSessionStartTime, tokens[67].trim());
-        if( !empty_attribute( tokens[68] ) ) outputMessage.put(TunnelType, 
tokens[68].trim());
-        if( !empty_attribute( tokens[69] ) ) outputMessage.put(ThreatCategory, 
tokens[69].trim());
-        if( !empty_attribute( tokens[70] ) ) outputMessage.put(ContentVersion, 
tokens[70].trim());
-      }
-      if ( parser_version == 0) {
-        outputMessage.put(Tokens, tokens.length);
-      }
 
-
-    } else if ("TRAFFIC".equals(type.toUpperCase())) {
-      if      (tokens.length == 46) parser_version = 60;
-      else if (tokens.length == 47) parser_version = 61;
-      else if (tokens.length == 54) parser_version = 70;
-      else if (tokens.length == 61) parser_version = 80;
-      outputMessage.put(ParserVersion, parser_version);
-      if( !empty_attribute( tokens[31] ) ) outputMessage.put(Bytes, 
tokens[31].trim());
-      if( !empty_attribute( tokens[32] ) ) outputMessage.put(BytesSent, 
tokens[32].trim());
-      if( !empty_attribute( tokens[33] ) ) outputMessage.put(BytesReceived, 
tokens[33].trim());
-      if( !empty_attribute( tokens[34] ) ) outputMessage.put(Packets, 
tokens[34].trim());
-      if( !empty_attribute( tokens[35] ) ) outputMessage.put(StartTime, 
tokens[35].trim());
-      if( !empty_attribute( tokens[36] ) ) outputMessage.put(ElapsedTimeInSec, 
tokens[36].trim());
-      if( !empty_attribute( tokens[37] ) ) outputMessage.put(Category, 
unquoted_attribute(tokens[37]));
-      if( !empty_attribute( tokens[39] ) ) outputMessage.put(Seqno, 
tokens[39].trim());
-      if( !empty_attribute( tokens[40] ) ) outputMessage.put(ActionFlags, 
unquoted_attribute(tokens[40]));
-      if( !empty_attribute( tokens[41] ) ) outputMessage.put(SourceLocation, 
unquoted_attribute(tokens[41]));
-      if( !empty_attribute( tokens[42] ) ) 
outputMessage.put(DestinationLocation, unquoted_attribute(tokens[42]));
-      if( !empty_attribute( tokens[44] ) ) outputMessage.put(PktsSent, 
tokens[44].trim());
-      if( !empty_attribute( tokens[45] ) ) outputMessage.put(PktsReceived, 
tokens[45].trim());
-      if ( parser_version >= 61) {
-        if( !empty_attribute( tokens[46] ) ) outputMessage.put(EndReason, 
unquoted_attribute(tokens[46]));
+      if (parser_version >= 70) {
+        int custom_fields_offset = 0;
+        if (tokens.length == 24) {
+          if (!empty_attribute(tokens[14])) {
+            outputMessage.put(BeforeChangeDetail, unquoted_attribute(tokens[14 
+ custom_fields_offset]));
+          }
+          if (!empty_attribute(tokens[15])) {
+            outputMessage.put(AfterChangeDetail, unquoted_attribute(tokens[15 
+ custom_fields_offset]));
+          }
+          custom_fields_offset = 2;
+        }
+        if (!empty_attribute(tokens[14 + custom_fields_offset])) {
+          outputMessage.put(Seqno, unquoted_attribute(tokens[14 + 
custom_fields_offset]));
+        }
+        if (!empty_attribute(tokens[15 + custom_fields_offset])) {
+          outputMessage.put(ActionFlags, unquoted_attribute(tokens[15 + 
custom_fields_offset]));
+        }
+        if (!empty_attribute(tokens[16 + custom_fields_offset])) {
+          outputMessage.put(DGH1, unquoted_attribute(tokens[16 + 
custom_fields_offset]));
+        }
+        if (!empty_attribute(tokens[17 + custom_fields_offset])) {
+          outputMessage.put(DGH2, unquoted_attribute(tokens[17 + 
custom_fields_offset]));
+        }
+        if (!empty_attribute(tokens[18 + custom_fields_offset])) {
+          outputMessage.put(DGH3, unquoted_attribute(tokens[18 + 
custom_fields_offset]));
+        }
+        if (!empty_attribute(tokens[19 + custom_fields_offset])) {
+          outputMessage.put(DGH4, unquoted_attribute(tokens[19 + 
custom_fields_offset]));
+        }
+        if (!empty_attribute(tokens[20 + custom_fields_offset])) {
+          outputMessage.put(VSYSName, unquoted_attribute(tokens[20 + 
custom_fields_offset]));
+        }
+        if (!empty_attribute(tokens[21 + custom_fields_offset])) {
+          outputMessage.put(DeviceName, unquoted_attribute(tokens[21 + 
custom_fields_offset]));
+        }
       }
-      if ( parser_version >= 70) {
-        if( !empty_attribute( tokens[47] ) ) outputMessage.put(DGH1, 
tokens[47].trim());
-        if( !empty_attribute( tokens[48] ) ) outputMessage.put(DGH2, 
tokens[48].trim());
-        if( !empty_attribute( tokens[49] ) ) outputMessage.put(DGH3, 
tokens[49].trim());
-        if( !empty_attribute( tokens[50] ) ) outputMessage.put(DGH4, 
tokens[50].trim());
-        if( !empty_attribute( tokens[51] ) ) outputMessage.put(VSYSName, 
unquoted_attribute(tokens[51]));
-        if( !empty_attribute( tokens[52] ) ) outputMessage.put(DeviceName, 
unquoted_attribute(tokens[52]));
-        if( !empty_attribute( tokens[53] ) ) outputMessage.put(ActionSource, 
unquoted_attribute(tokens[53]));
+    } else if (LogTypeSystem.equals(type.toUpperCase())) {
+      if (tokens.length == 17) parser_version = 61;
+      else if (tokens.length == 23) parser_version = 80;
+
+      if (parser_version >= 61) {
+        if (!empty_attribute(tokens[7])) outputMessage.put(VirtualSystem, 
tokens[7].trim());
+        if (!empty_attribute(tokens[8])) outputMessage.put(EventId, 
tokens[8].trim());
+        if (!empty_attribute(tokens[9])) outputMessage.put(Object, 
tokens[9].trim());
+
+        if (!empty_attribute(tokens[12])) outputMessage.put(Module, 
tokens[12].trim());
+        if (!empty_attribute(tokens[13])) outputMessage.put(Severity, 
unquoted_attribute(tokens[13]));
+        if (!empty_attribute(tokens[14])) outputMessage.put(Description, 
unquoted_attribute(tokens[14]));
+        if (!empty_attribute(tokens[15])) outputMessage.put(Seqno, 
unquoted_attribute(tokens[15]));
+        if (!empty_attribute(tokens[16])) outputMessage.put(ActionFlags, 
unquoted_attribute(tokens[16]));
       }
-      if ( parser_version >= 80) {
-        if( !empty_attribute( tokens[54] ) ) outputMessage.put(SourceVmUuid, 
tokens[54].trim());
-        if( !empty_attribute( tokens[55] ) ) 
outputMessage.put(DestinationVmUuid, tokens[55].trim());
-        if( !empty_attribute( tokens[56] ) ) outputMessage.put(TunnelId, 
tokens[56].trim());
-        if( !empty_attribute( tokens[57] ) ) outputMessage.put(MonitorTag, 
tokens[57].trim());
-        if( !empty_attribute( tokens[58] ) ) 
outputMessage.put(ParentSessionId, tokens[58].trim());
-        if( !empty_attribute( tokens[59] ) ) 
outputMessage.put(ParentSessionStartTime, tokens[59].trim());
-        if( !empty_attribute( tokens[60] ) ) outputMessage.put(TunnelType, 
tokens[60].trim());
+
+      if (parser_version == 80) {
+        if (!empty_attribute(tokens[17])) outputMessage.put(DGH1, 
tokens[17].trim());
+        if (!empty_attribute(tokens[18])) outputMessage.put(DGH2, 
tokens[18].trim());
+        if (!empty_attribute(tokens[19])) outputMessage.put(DGH3, 
tokens[19].trim());
+        if (!empty_attribute(tokens[20])) outputMessage.put(DGH4, 
tokens[20].trim());
+        if (!empty_attribute(tokens[21])) outputMessage.put(VSYSName, 
unquoted_attribute(tokens[21]));
+        if (!empty_attribute(tokens[22])) outputMessage.put(DeviceName, 
unquoted_attribute(tokens[22]));
       }
-      if ( parser_version == 0) {
-        outputMessage.put(Tokens, tokens.length);
+    } else if (LogTypeThreat.equals(type.toUpperCase()) ||
+               LogTypeTraffic.equals(type.toUpperCase())) {
+      if (!empty_attribute(tokens[7])) outputMessage.put(SourceAddress, 
tokens[7].trim());
+      if (!empty_attribute(tokens[8])) outputMessage.put(DestinationAddress, 
tokens[8].trim());
+      if (!empty_attribute(tokens[9])) outputMessage.put(NATSourceIP, 
tokens[9].trim());
+      if (!empty_attribute(tokens[10])) outputMessage.put(NATDestinationIP, 
tokens[10].trim());
+      if (!empty_attribute(tokens[11])) outputMessage.put(Rule, 
unquoted_attribute(tokens[11]));
+      if (!empty_attribute(tokens[12])) outputMessage.put(SourceUser, 
unquoted_attribute(tokens[12]));
+      if (!empty_attribute(tokens[13])) outputMessage.put(DestinationUser, 
unquoted_attribute(tokens[13]));
+      if (!empty_attribute(tokens[14])) outputMessage.put(Application, 
unquoted_attribute(tokens[14]));
+      if (!empty_attribute(tokens[15])) outputMessage.put(VirtualSystem, 
unquoted_attribute(tokens[15]));
+      if (!empty_attribute(tokens[16])) outputMessage.put(SourceZone, 
unquoted_attribute(tokens[16]));
+      if (!empty_attribute(tokens[17])) outputMessage.put(DestinationZone, 
unquoted_attribute(tokens[17]));
+      if (!empty_attribute(tokens[18])) outputMessage.put(InboundInterface, 
unquoted_attribute(tokens[18]));
+      if (!empty_attribute(tokens[19])) outputMessage.put(OutboundInterface, 
unquoted_attribute(tokens[19]));
+      if (!empty_attribute(tokens[20])) outputMessage.put(LogAction, 
unquoted_attribute(tokens[20]));
+      if (!empty_attribute(tokens[21])) outputMessage.put(TimeLogged, 
tokens[21].trim());
+      if (!empty_attribute(tokens[22])) outputMessage.put(SessionID, 
tokens[22].trim());
+      if (!empty_attribute(tokens[23])) outputMessage.put(RepeatCount, 
tokens[23].trim());
+      if (!empty_attribute(tokens[24])) outputMessage.put(SourcePort, 
tokens[24].trim());
+      if (!empty_attribute(tokens[25])) outputMessage.put(DestinationPort, 
tokens[25].trim());
+      if (!empty_attribute(tokens[26])) outputMessage.put(NATSourcePort, 
tokens[26].trim());
+      if (!empty_attribute(tokens[27])) outputMessage.put(NATDestinationPort, 
tokens[27].trim());
+      if (!empty_attribute(tokens[28])) outputMessage.put(Flags, 
tokens[28].trim());
+      if (!empty_attribute(tokens[29])) outputMessage.put(IPProtocol, 
unquoted_attribute(tokens[29]));
+      if (!empty_attribute(tokens[30])) outputMessage.put(Action, 
unquoted_attribute(tokens[30]));
+
+      if (LogTypeThreat.equals(type.toUpperCase())) {
+        int p1_offset = 0;
+        if      (tokens.length == 45) parser_version = 60;
+        else if (tokens.length == 53) parser_version = 61;
+        else if (tokens.length == 61) {
+          parser_version = 70;
+          p1_offset = 1;
+        } else if (tokens.length == 72) {
+          parser_version = 80;
+          p1_offset = 1;
+        }
+        if (!empty_attribute(tokens[31])) {
+          outputMessage.put(URL, unquoted_attribute(tokens[31]));
+          try {
+            URL url = new URL(unquoted_attribute(tokens[31]));
+            outputMessage.put(HOST, url.getHost());
+          } catch (MalformedURLException e) {
+          }
+        }
+        if (!empty_attribute(tokens[32])) outputMessage.put(ThreatID, 
tokens[32].trim());
+        if (!empty_attribute(tokens[33])) outputMessage.put(Category, 
unquoted_attribute(tokens[33]));
+        if (!empty_attribute(tokens[34])) outputMessage.put(Severity, 
unquoted_attribute(tokens[34]));
+        if (!empty_attribute(tokens[35])) outputMessage.put(Direction, 
unquoted_attribute(tokens[35]));
+        if (!empty_attribute(tokens[36])) outputMessage.put(Seqno, 
tokens[36].trim());
+        if (!empty_attribute(tokens[37])) outputMessage.put(ActionFlags, 
unquoted_attribute(tokens[37]));
+        if (!empty_attribute(tokens[38])) outputMessage.put(SourceLocation, 
unquoted_attribute(tokens[38]));
+        if (!empty_attribute(tokens[39]))
+          outputMessage.put(DestinationLocation, 
unquoted_attribute(tokens[39]));
+        if (!empty_attribute(tokens[41])) outputMessage.put(ContentType, 
unquoted_attribute(tokens[41]));
+        if (!empty_attribute(tokens[42])) outputMessage.put(PCAPID, 
tokens[42].trim());
+        if (!empty_attribute(tokens[43])) outputMessage.put(WFFileDigest, 
unquoted_attribute(tokens[43]));
+        if (!empty_attribute(tokens[44])) outputMessage.put(WFCloud, 
unquoted_attribute(tokens[44]));
+        if (parser_version >= 61) {
+          if (!empty_attribute(tokens[(45 + p1_offset)]))
+            outputMessage.put(UserAgent, unquoted_attribute(tokens[(45 + 
p1_offset)]));
+          if (!empty_attribute(tokens[(46 + p1_offset)]))
+            outputMessage.put(WFFileType, unquoted_attribute(tokens[(46 + 
p1_offset)]));
+          if (!empty_attribute(tokens[(47 + p1_offset)]))
+            outputMessage.put(XForwardedFor, unquoted_attribute(tokens[(47 + 
p1_offset)]));
+          if (!empty_attribute(tokens[(48 + p1_offset)]))
+            outputMessage.put(Referer, unquoted_attribute(tokens[(48 + 
p1_offset)]));
+          if (!empty_attribute(tokens[(49 + p1_offset)]))
+            outputMessage.put(WFSender, unquoted_attribute(tokens[(49 + 
p1_offset)]));
+          if (!empty_attribute(tokens[(50 + p1_offset)]))
+            outputMessage.put(WFSubject, unquoted_attribute(tokens[(50 + 
p1_offset)]));
+          if (!empty_attribute(tokens[(51 + p1_offset)]))
+            outputMessage.put(WFRecipient, unquoted_attribute(tokens[(51 + 
p1_offset)]));
+          if (!empty_attribute(tokens[(52 + p1_offset)]))
+            outputMessage.put(WFReportID, unquoted_attribute(tokens[(52 + 
p1_offset)]));
+        }
+        if (parser_version >= 70) {
+          if (!empty_attribute(tokens[45])) outputMessage.put(URLIndex, 
tokens[45].trim());
+          if (!empty_attribute(tokens[54])) outputMessage.put(DGH1, 
tokens[54].trim());
+          if (!empty_attribute(tokens[55])) outputMessage.put(DGH2, 
tokens[55].trim());
+          if (!empty_attribute(tokens[56])) outputMessage.put(DGH3, 
tokens[56].trim());
+          if (!empty_attribute(tokens[57])) outputMessage.put(DGH4, 
tokens[57].trim());
+          if (!empty_attribute(tokens[58])) outputMessage.put(VSYSName, 
unquoted_attribute(tokens[58]));
+          if (!empty_attribute(tokens[59])) outputMessage.put(DeviceName, 
unquoted_attribute(tokens[59]));
+        }
+        if (parser_version >= 80) {
+          if (!empty_attribute(tokens[61])) outputMessage.put(SourceVmUuid, 
tokens[61].trim());
+          if (!empty_attribute(tokens[62])) 
outputMessage.put(DestinationVmUuid, tokens[62].trim());
+          if (!empty_attribute(tokens[63])) outputMessage.put(HTTPMethod, 
tokens[63].trim());
+          if (!empty_attribute(tokens[64])) outputMessage.put(TunnelId, 
tokens[64].trim());
+          if (!empty_attribute(tokens[65])) outputMessage.put(MonitorTag, 
tokens[65].trim());
+          if (!empty_attribute(tokens[66])) outputMessage.put(ParentSessionId, 
tokens[66].trim());
+          if (!empty_attribute(tokens[67])) 
outputMessage.put(ParentSessionStartTime, tokens[67].trim());
+          if (!empty_attribute(tokens[68])) outputMessage.put(TunnelType, 
tokens[68].trim());
+          if (!empty_attribute(tokens[69])) outputMessage.put(ThreatCategory, 
tokens[69].trim());
+          if (!empty_attribute(tokens[70])) outputMessage.put(ContentVersion, 
tokens[70].trim());
+        }
+      } else if (LogTypeTraffic.equals(type.toUpperCase())) {
+        if (tokens.length == 46) parser_version = 60;
+        else if (tokens.length == 47) parser_version = 61;
+        else if (tokens.length == 54) parser_version = 70;
+        else if (tokens.length == 61) parser_version = 80;
+        if (!empty_attribute(tokens[31])) outputMessage.put(Bytes, 
tokens[31].trim());
+        if (!empty_attribute(tokens[32])) outputMessage.put(BytesSent, 
tokens[32].trim());
+        if (!empty_attribute(tokens[33])) outputMessage.put(BytesReceived, 
tokens[33].trim());
+        if (!empty_attribute(tokens[34])) outputMessage.put(Packets, 
tokens[34].trim());
+        if (!empty_attribute(tokens[35])) outputMessage.put(StartTime, 
tokens[35].trim());
+        if (!empty_attribute(tokens[36])) outputMessage.put(ElapsedTimeInSec, 
tokens[36].trim());
+        if (!empty_attribute(tokens[37])) outputMessage.put(Category, 
unquoted_attribute(tokens[37]));
+        if (!empty_attribute(tokens[39])) outputMessage.put(Seqno, 
tokens[39].trim());
+        if (!empty_attribute(tokens[40])) outputMessage.put(ActionFlags, 
unquoted_attribute(tokens[40]));
+        if (!empty_attribute(tokens[41])) outputMessage.put(SourceLocation, 
unquoted_attribute(tokens[41]));
+        if (!empty_attribute(tokens[42]))
+          outputMessage.put(DestinationLocation, 
unquoted_attribute(tokens[42]));
+        if (!empty_attribute(tokens[44])) outputMessage.put(PktsSent, 
tokens[44].trim());
+        if (!empty_attribute(tokens[45])) outputMessage.put(PktsReceived, 
tokens[45].trim());
+        if (parser_version >= 61) {
+          if (!empty_attribute(tokens[46])) outputMessage.put(EndReason, 
unquoted_attribute(tokens[46]));
+        }
+        if (parser_version >= 70) {
+          if (!empty_attribute(tokens[47])) outputMessage.put(DGH1, 
tokens[47].trim());
+          if (!empty_attribute(tokens[48])) outputMessage.put(DGH2, 
tokens[48].trim());
+          if (!empty_attribute(tokens[49])) outputMessage.put(DGH3, 
tokens[49].trim());
+          if (!empty_attribute(tokens[50])) outputMessage.put(DGH4, 
tokens[50].trim());
+          if (!empty_attribute(tokens[51])) outputMessage.put(VSYSName, 
unquoted_attribute(tokens[51]));
+          if (!empty_attribute(tokens[52])) outputMessage.put(DeviceName, 
unquoted_attribute(tokens[52]));
+          if (!empty_attribute(tokens[53])) outputMessage.put(ActionSource, 
unquoted_attribute(tokens[53]));
+        }
+        if (parser_version >= 80) {
+          if (!empty_attribute(tokens[54])) outputMessage.put(SourceVmUuid, 
tokens[54].trim());
+          if (!empty_attribute(tokens[55])) 
outputMessage.put(DestinationVmUuid, tokens[55].trim());
+          if (!empty_attribute(tokens[56])) outputMessage.put(TunnelId, 
tokens[56].trim());
+          if (!empty_attribute(tokens[57])) outputMessage.put(MonitorTag, 
tokens[57].trim());
+          if (!empty_attribute(tokens[58])) outputMessage.put(ParentSessionId, 
tokens[58].trim());
+          if (!empty_attribute(tokens[59])) 
outputMessage.put(ParentSessionStartTime, tokens[59].trim());
+          if (!empty_attribute(tokens[60])) outputMessage.put(TunnelType, 
tokens[60].trim());
+        }
       }
     }
-
+    outputMessage.put(ParserVersion, parser_version);
+    if (parser_version == 0) {
+      outputMessage.put(Tokens, tokens.length);
+    }
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/paloalto/README.md
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/paloalto/README.md
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/paloalto/README.md
new file mode 100644
index 0000000..b6a1230
--- /dev/null
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/paloalto/README.md
@@ -0,0 +1,32 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+# BasicPaloAltoFirewallParser
+## Introduction
+The parser is able to parse logs generated by Palo Alto firewall devices 
powered by Pan OS. The supported log format is CSV. 
+
+The supported log types and versions
+
+| Log type | Pan OS version |
+|----------|----------------|
+| Traffic | 6.0, 6.1, 7.0, 8.0 |
+| Threat  | 6.0, 6.1, 7.0, 8.0 |
+| Config  | 6.1, 7.0, 8.0 |
+| System  | 6.1, 7.0, 8.0 |
+
+
+

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java
index e3ad941..5b62e85 100644
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java
@@ -25,23 +25,35 @@ import com.github.palindromicity.syslog.SyslogParserBuilder;
 import com.github.palindromicity.syslog.dsl.SyslogFieldKeys;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.metron.parsers.BasicParser;
+import org.apache.metron.parsers.DefaultMessageParserResult;
+import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
 import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
+import java.io.IOException;
 import java.io.Reader;
+import java.io.Serializable;
 import java.io.StringReader;
+import java.lang.invoke.MethodHandles;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 
 /**
  * Parser for well structured RFC 5424 messages.
  */
-public class Syslog5424Parser extends BasicParser {
+public class Syslog5424Parser implements MessageParser<JSONObject>, 
Serializable {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   public static final String NIL_POLICY_CONFIG = "nilPolicy";
   private transient SyslogParser syslogParser;
 
@@ -62,15 +74,31 @@ public class Syslog5424Parser extends BasicParser {
   }
 
   @Override
+  public boolean validate(JSONObject message) {
+    JSONObject value = message;
+    if (!(value.containsKey("original_string"))) {
+      LOG.trace("[Metron] Message does not have original_string: {}", message);
+      return false;
+    } else if (!(value.containsKey("timestamp"))) {
+      LOG.trace("[Metron] Message does not have timestamp: {}", message);
+      return false;
+    } else {
+      LOG.trace("[Metron] Message conforms to schema: {}", message);
+      return true;
+    }
+  }
+
+  @Override
   @SuppressWarnings("unchecked")
-  public List<JSONObject> parse(byte[] rawMessage) {
+  public Optional<MessageParserResult<JSONObject>> parseOptionalResult(byte[] 
rawMessage) {
     try {
       if (rawMessage == null || rawMessage.length == 0) {
-        return null;
+        return Optional.empty();
       }
 
       String originalString = new String(rawMessage);
       List<JSONObject> returnList = new ArrayList<>();
+      Map<Object,Throwable> errorMap = new HashMap<>();
       try (Reader reader = new BufferedReader(new 
StringReader(originalString))) {
         syslogParser.parseLines(reader, (m) -> {
           JSONObject jsonObject = new JSONObject(m);
@@ -79,14 +107,14 @@ public class Syslog5424Parser extends BasicParser {
           jsonObject.put("original_string", originalString);
           setTimestamp(jsonObject);
           returnList.add(jsonObject);
-        });
+        },errorMap::put);
 
-        return returnList;
+        return Optional.of(new 
DefaultMessageParserResult<JSONObject>(returnList,errorMap));
       }
-    } catch (Exception e) {
-      String message = "Unable to parse " + new String(rawMessage) + ": " + 
e.getMessage();
+    } catch (IOException e) {
+      String message = "Unable to read buffer " + new String(rawMessage) + ": 
" + e.getMessage();
       LOG.error(message, e);
-      throw new IllegalStateException(message, e);
+      return Optional.of(new DefaultMessageParserResult<JSONObject>( new 
IllegalStateException(message, e)));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponent.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponent.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponent.java
new file mode 100644
index 0000000..eb5ff9f
--- /dev/null
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponent.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers.topology;
+
+import org.apache.metron.parsers.interfaces.MessageFilter;
+import org.apache.metron.parsers.interfaces.MessageParser;
+import org.json.simple.JSONObject;
+
+import java.io.Serializable;
+
+public class ParserComponent implements Serializable {
+  private static final long serialVersionUID = 7880346740026374665L;
+
+  private MessageParser<JSONObject> messageParser;
+  private MessageFilter<JSONObject> filter;
+
+  public ParserComponent(
+      MessageParser<JSONObject> messageParser,
+      MessageFilter<JSONObject> filter) {
+    this.messageParser = messageParser;
+    this.filter = filter;
+  }
+
+  public MessageParser<JSONObject> getMessageParser() {
+    return messageParser;
+  }
+
+  public MessageFilter<JSONObject> getFilter() {
+    return filter;
+  }
+
+  public void setMessageParser(
+      MessageParser<JSONObject> messageParser) {
+    this.messageParser = messageParser;
+  }
+
+  public void setFilter(
+      MessageFilter<JSONObject> filter) {
+    this.filter = filter;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java
deleted file mode 100644
index 32d56b9..0000000
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.parsers.topology;
-
-import java.io.Serializable;
-import org.apache.metron.parsers.bolt.WriterHandler;
-import org.apache.metron.parsers.interfaces.MessageFilter;
-import org.apache.metron.parsers.interfaces.MessageParser;
-import org.json.simple.JSONObject;
-
-public class ParserComponents implements Serializable {
-  private static final long serialVersionUID = 7880346740026374665L;
-
-  private MessageParser<JSONObject> messageParser;
-  private MessageFilter<JSONObject> filter;
-  private WriterHandler writer;
-
-  public ParserComponents(
-      MessageParser<JSONObject> messageParser,
-      MessageFilter<JSONObject> filter,
-      WriterHandler writer) {
-    this.messageParser = messageParser;
-    this.filter = filter;
-    this.writer = writer;
-  }
-
-  public MessageParser<JSONObject> getMessageParser() {
-    return messageParser;
-  }
-
-  public MessageFilter<JSONObject> getFilter() {
-    return filter;
-  }
-
-  public WriterHandler getWriter() {
-    return writer;
-  }
-
-  public void setMessageParser(
-      MessageParser<JSONObject> messageParser) {
-    this.messageParser = messageParser;
-  }
-
-  public void setFilter(
-      MessageFilter<JSONObject> filter) {
-    this.filter = filter;
-  }
-
-  public void setWriter(WriterHandler writer) {
-    this.writer = writer;
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index d20e1a5..9dc7b88 100644
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,11 +21,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.metron.common.Constants;
@@ -37,12 +37,10 @@ import org.apache.metron.common.utils.KafkaUtils;
 import org.apache.metron.common.utils.ReflectionUtils;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.MessageWriter;
+import org.apache.metron.parsers.ParserRunnerImpl;
 import org.apache.metron.parsers.bolt.ParserBolt;
 import org.apache.metron.parsers.bolt.WriterBolt;
 import org.apache.metron.parsers.bolt.WriterHandler;
-import org.apache.metron.parsers.filters.Filters;
-import org.apache.metron.parsers.interfaces.MessageFilter;
-import org.apache.metron.parsers.interfaces.MessageParser;
 import org.apache.metron.parsers.topology.config.ValueSupplier;
 import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder;
 import org.apache.metron.storm.kafka.flux.SpoutConfiguration;
@@ -268,29 +266,14 @@ public class ParserTopologyBuilder {
                                               Optional<String> 
securityProtocol,
                                               ParserConfigurations configs,
                                               Optional<String> outputTopic) {
-
-    Map<String, ParserComponents> parserBoltConfigs = new HashMap<>();
+    Map<String, WriterHandler> writerConfigs = new HashMap<>();
     for( Entry<String, SensorParserConfig> entry : 
sensorTypeToParserConfig.entrySet()) {
       String sensorType = entry.getKey();
       SensorParserConfig parserConfig = entry.getValue();
-      // create message parser
-      MessageParser<JSONObject> parser = ReflectionUtils
-          .createInstance(parserConfig.getParserClassName());
-      parser.configure(parserConfig.getParserConfig());
-
-      // create message filter
-      MessageFilter<JSONObject> filter = null;
-      if (!StringUtils.isEmpty(parserConfig.getFilterClassName())) {
-        filter = Filters.get(
-            parserConfig.getFilterClassName(),
-            parserConfig.getParserConfig()
-        );
-      }
 
       // create a writer
       AbstractWriter writer;
       if (parserConfig.getWriterClassName() == null) {
-
         // if not configured, use a sensible default
         writer = createKafkaWriter(brokerUrl, zookeeperUrl, securityProtocol)
             .withTopic(outputTopic.orElse(Constants.ENRICHMENT_TOPIC));
@@ -304,16 +287,10 @@ public class ParserTopologyBuilder {
 
       // create a writer handler
       WriterHandler writerHandler = createWriterHandler(writer);
-
-      ParserComponents components = new ParserComponents(
-         parser,
-         filter,
-         writerHandler
-      );
-      parserBoltConfigs.put(sensorType, components);
+      writerConfigs.put(sensorType, writerHandler);
     }
 
-    return new ParserBolt(zookeeperUrl, parserBoltConfigs);
+    return new ParserBolt(zookeeperUrl, new ParserRunnerImpl(new 
HashSet<>(sensorTypeToParserConfig.keySet())), writerConfigs);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/websphere/GrokWebSphereParser.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/websphere/GrokWebSphereParser.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/websphere/GrokWebSphereParser.java
index 178719b..a58e0c9 100644
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/websphere/GrokWebSphereParser.java
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/websphere/GrokWebSphereParser.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -27,117 +27,114 @@ import java.util.Iterator;
 
 public class GrokWebSphereParser extends GrokParser {
 
-       private static final long serialVersionUID = 4860439408055777358L;
+  private static final long serialVersionUID = 4860439408055777358L;
 
-       @Override
-       protected long formatTimestamp(Object value) {
-               long epochTimestamp = System.currentTimeMillis();
-               if (value != null) {
-                       try {
-                               epochTimestamp = 
toEpoch(Calendar.getInstance().get(Calendar.YEAR)  + " " + value);
-                       } catch (ParseException e) {
-                               //default to current time
-                       }
-               }
-               return epochTimestamp;
-       }
+  @Override
+  protected long formatTimestamp(Object value) {
+    long epochTimestamp = System.currentTimeMillis();
+    if (value != null) {
+      try {
+        epochTimestamp = toEpoch(Calendar.getInstance().get(Calendar.YEAR) + " 
" + value);
+      } catch (ParseException e) {
+        //default to current time
+      }
+    }
+    return epochTimestamp;
+  }
 
-       @Override
-       protected void postParse(JSONObject message) {
-               removeEmptyFields(message);
-               message.remove("timestamp_string");
-               if (message.containsKey("message")) {
-                       String messageValue = (String) message.get("message");
-                       if (messageValue.contains("logged into")) {
-                               parseLoginMessage(message);
-                       }
-                       else if (messageValue.contains("logged out")) {
-                               parseLogoutMessage(message);
-                       }
-                       else if (messageValue.contains("rbm(")) {
-                               parseRBMMessage(message);
-                       }
-                       else {
-                               parseOtherMessage(message);
-                       }
-               }
-       }
+  @Override
+  protected void postParse(JSONObject message) {
+    removeEmptyFields(message);
+    message.remove("timestamp_string");
+    if (message.containsKey("message")) {
+      String messageValue = (String) message.get("message");
+      if (messageValue.contains("logged into")) {
+        parseLoginMessage(message);
+      } else if (messageValue.contains("logged out")) {
+        parseLogoutMessage(message);
+      } else if (messageValue.contains("rbm(")) {
+        parseRBMMessage(message);
+      } else {
+        parseOtherMessage(message);
+      }
+    }
+  }
 
-       @SuppressWarnings("unchecked")
-       private void removeEmptyFields(JSONObject json) {
-               Iterator<Object> keyIter = json.keySet().iterator();
-               while (keyIter.hasNext()) {
-                       Object key = keyIter.next();
-                       Object value = json.get(key);
-                       if (null == value || "".equals(value.toString())) {
-                               keyIter.remove();
-                       }
-               }
-       }
+  @SuppressWarnings("unchecked")
+  private void removeEmptyFields(JSONObject json) {
+    Iterator<Object> keyIter = json.keySet().iterator();
+    while (keyIter.hasNext()) {
+      Object key = keyIter.next();
+      Object value = json.get(key);
+      if (null == value || "".equals(value.toString())) {
+        keyIter.remove();
+      }
+    }
+  }
 
-       //Extracts the appropriate fields from login messages
-       @SuppressWarnings("unchecked")
-       private void parseLoginMessage(JSONObject json) {
-               json.put("event_subtype", "login");
-               String message = (String) json.get("message");
-               if (message.contains(":")){
-                       String parts[] = message.split(":");
-                       String user = parts[0];
-                       String ip_src_addr = parts[1];
-                       if (user.contains("user(") && user.contains(")")) {     
-                               user = user.substring(user.indexOf("user(") + 
"user(".length());
-                               user = user.substring(0, user.indexOf(")"));
-                               json.put("username", user);
-                       }
-                       if (ip_src_addr.contains("[") && 
ip_src_addr.contains("]")) {
-                               ip_src_addr = 
ip_src_addr.substring(ip_src_addr.indexOf("[") + 1);
-                               ip_src_addr = ip_src_addr.substring(0, 
ip_src_addr.indexOf("]"));
-                               json.put("ip_src_addr", ip_src_addr);
-                       }
-                       json.remove("message");
-               }
-       }
+  //Extracts the appropriate fields from login messages
+  @SuppressWarnings("unchecked")
+  private void parseLoginMessage(JSONObject json) {
+    json.put("event_subtype", "login");
+    String message = (String) json.get("message");
+    if (message.contains(":")) {
+      String[] parts = message.split(":");
+      String user = parts[0];
+      String ip_src_addr = parts[1];
+      if (user.contains("user(") && user.contains(")")) {
+        user = user.substring(user.indexOf("user(") + "user(".length());
+        user = user.substring(0, user.indexOf(")"));
+        json.put("username", user);
+      }
+      if (ip_src_addr.contains("[") && ip_src_addr.contains("]")) {
+        ip_src_addr = ip_src_addr.substring(ip_src_addr.indexOf("[") + 1);
+        ip_src_addr = ip_src_addr.substring(0, ip_src_addr.indexOf("]"));
+        json.put("ip_src_addr", ip_src_addr);
+      }
+      json.remove("message");
+    }
+  }
 
-       //Extracts the appropriate fields from logout messages
-       @SuppressWarnings("unchecked")
-       private void parseLogoutMessage(JSONObject json) {
-               json.put("event_subtype", "logout");
-               String message = (String) json.get("message");
-               if (message.matches(".*'.*'.*'.*'.*")) {
-                       String parts[] = message.split("'");
-                       String ip_src_addr = parts[0];
-                       if (ip_src_addr.contains("[") && 
ip_src_addr.contains("]")) {
-                               ip_src_addr = 
ip_src_addr.substring(ip_src_addr.indexOf("[") + 1);
-                               ip_src_addr = ip_src_addr.substring(0, 
ip_src_addr.indexOf("]"));
-                               json.put("ip_src_addr", ip_src_addr);
-                       }
-                       json.put("username", parts[1]);
-                       json.put("security_domain", parts[3]);
-                       json.remove("message");
-               }
-       }
+  //Extracts the appropriate fields from logout messages
+  @SuppressWarnings("unchecked")
+  private void parseLogoutMessage(JSONObject json) {
+    json.put("event_subtype", "logout");
+    String message = (String) json.get("message");
+    if (message.matches(".*'.*'.*'.*'.*")) {
+      String parts[] = message.split("'");
+      String ip_src_addr = parts[0];
+      if (ip_src_addr.contains("[") && ip_src_addr.contains("]")) {
+        ip_src_addr = ip_src_addr.substring(ip_src_addr.indexOf("[") + 1);
+        ip_src_addr = ip_src_addr.substring(0, ip_src_addr.indexOf("]"));
+        json.put("ip_src_addr", ip_src_addr);
+      }
+      json.put("username", parts[1]);
+      json.put("security_domain", parts[3]);
+      json.remove("message");
+    }
+  }
 
-       //Extracts the appropriate fields from RBM messages
-       @SuppressWarnings("unchecked")
-       private void parseRBMMessage(JSONObject json) {
-               String message = (String) json.get("message");
-               if (message.contains("(")) {
-                       json.put("process", message.substring(0, 
message.indexOf("(")));
-                       if (message.contains(":")) {
-                               json.put("message", 
message.substring(message.indexOf(":") + 2));       
-                       }
-               }
-       }
+  //Extracts the appropriate fields from RBM messages
+  @SuppressWarnings("unchecked")
+  private void parseRBMMessage(JSONObject json) {
+    String message = (String) json.get("message");
+    if (message.contains("(")) {
+      json.put("process", message.substring(0, message.indexOf("(")));
+      if (message.contains(":")) {
+        json.put("message", message.substring(message.indexOf(":") + 2));
+      }
+    }
+  }
 
-       //Extracts the appropriate fields from other messages
-       @SuppressWarnings("unchecked")
-       private void parseOtherMessage(JSONObject json) {
-               String message = (String) json.get("message");
-               if (message.contains("(")) {
-                       json.put("process", message.substring(0, 
message.indexOf("(")));
-                       if (message.contains(":")) {
-                               json.put("message", 
message.substring(message.indexOf(":") + 2));       
-                       }
-               }
-       }
+  //Extracts the appropriate fields from other messages
+  @SuppressWarnings("unchecked")
+  private void parseOtherMessage(JSONObject json) {
+    String message = (String) json.get("message");
+    if (message.contains("(")) {
+      json.put("process", message.substring(0, message.indexOf("(")));
+      if (message.contains(":")) {
+        json.put("message", message.substring(message.indexOf(":") + 2));
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
index 8441409..2f3784a 100644
--- 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
+++ 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
@@ -42,8 +42,8 @@ public class FiltersTest {
         put("filter.query", "exists(foo)");
       }};
       MessageFilter<JSONObject> filter = Filters.get(Filters.STELLAR.name(), 
config);
-      Assert.assertTrue(filter.emitTuple(new JSONObject(ImmutableMap.of("foo", 
1)), Context.EMPTY_CONTEXT()));
-      Assert.assertFalse(filter.emitTuple(new 
JSONObject(ImmutableMap.of("bar", 1)), Context.EMPTY_CONTEXT()));
+      Assert.assertTrue(filter.emit(new JSONObject(ImmutableMap.of("foo", 1)), 
Context.EMPTY_CONTEXT()));
+      Assert.assertFalse(filter.emit(new JSONObject(ImmutableMap.of("bar", 
1)), Context.EMPTY_CONTEXT()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java
 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java
index 1a50dea..24386fa 100644
--- 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java
+++ 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java
@@ -19,6 +19,7 @@ package org.apache.metron.parsers;
 
 import com.google.common.collect.MapDifference;
 import com.google.common.collect.Maps;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
 import org.json.simple.parser.ParseException;
@@ -30,6 +31,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 public abstract class GrokParserTest {
 
@@ -53,8 +55,10 @@ public abstract class GrokParserTest {
 
       JSONObject expected = (JSONObject) jsonParser.parse(e.getValue());
       byte[] rawMessage = e.getKey().getBytes();
-
-      List<JSONObject> parsedList = grokParser.parse(rawMessage);
+      Optional<MessageParserResult<JSONObject>> resultOptional = 
grokParser.parseOptionalResult(rawMessage);
+      Assert.assertNotNull(resultOptional);
+      Assert.assertTrue(resultOptional.isPresent());
+      List<JSONObject> parsedList = resultOptional.get().getMessages();
       Assert.assertEquals(1, parsedList.size());
       compare(expected, parsedList.get(0));
     }
@@ -92,4 +96,5 @@ public abstract class GrokParserTest {
   public abstract List<String> getTimeFields();
   public abstract String getDateFormat();
   public abstract String getTimestampField();
+  public abstract String getMultiLine();
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java
 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java
index 9769baa..4842a1f 100644
--- 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java
+++ 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java
@@ -19,68 +19,118 @@
 package org.apache.metron.parsers;
 
 import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.parsers.interfaces.MessageParserResult;
+import org.json.simple.JSONObject;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
 public class MessageParserTest {
-  @Test
-  public void testNullable() throws Exception {
-    MessageParser parser = new MessageParser() {
-      @Override
-      public void init() {
 
-      }
+  abstract class TestMessageParser implements MessageParser<JSONObject> {
+    @Override
+    public void init() {
+    }
 
-      @Override
-      public List parse(byte[] rawMessage) {
-        return null;
-      }
+    @Override
+    public boolean validate(JSONObject message) {
+      return false;
+    }
 
-      @Override
-      public boolean validate(Object message) {
-        return false;
-      }
+    @Override
+    public void configure(Map<String, Object> config) {
 
-      @Override
-      public void configure(Map<String, Object> config) {
+    }
+  }
 
+  @Test
+  public void testNullable() throws Exception {
+    MessageParser parser = new TestMessageParser() {
+      @Override
+      public List<JSONObject> parse(byte[] rawMessage) {
+        return null;
       }
     };
-    Assert.assertNotNull(parser.parseOptional(null));
-    Assert.assertFalse(parser.parseOptional(null).isPresent());
+    Assert.assertNotNull(parser.parseOptionalResult(null));
+    Assert.assertFalse(parser.parseOptionalResult(null).isPresent());
   }
 
   @Test
   public void testNotNullable() throws Exception {
-    MessageParser parser = new MessageParser() {
+    MessageParser<JSONObject> parser = new TestMessageParser() {
       @Override
-      public void init() {
-
+      public List<JSONObject> parse(byte[] rawMessage) {
+        return new ArrayList<>();
       }
+    };
+    Assert.assertNotNull(parser.parseOptionalResult(null));
+    Optional<MessageParserResult<JSONObject>> ret = 
parser.parseOptionalResult(null);
+    Assert.assertTrue(ret.isPresent());
+    Assert.assertEquals(0, ret.get().getMessages().size());
+  }
 
+  @Test
+  public void testParse() {
+    JSONObject message = new JSONObject();
+    MessageParser<JSONObject> parser = new TestMessageParser() {
       @Override
-      public List parse(byte[] rawMessage) {
-        return new ArrayList<>();
+      public List<JSONObject> parse(byte[] rawMessage) {
+        return Collections.singletonList(message);
       }
+    };
+    Optional<MessageParserResult<JSONObject>> ret = 
parser.parseOptionalResult("message".getBytes());
+    Assert.assertTrue(ret.isPresent());
+    Assert.assertEquals(1, ret.get().getMessages().size());
+    Assert.assertEquals(message, ret.get().getMessages().get(0));
+  }
 
+  @Test
+  public void testParseOptional() {
+    JSONObject message = new JSONObject();
+    MessageParser<JSONObject> parser = new TestMessageParser() {
       @Override
-      public boolean validate(Object message) {
-        return false;
+      public Optional<List<JSONObject>> parseOptional(byte[] rawMessage) {
+        return Optional.of(Collections.singletonList(message));
       }
+    };
+    Optional<MessageParserResult<JSONObject>> ret = 
parser.parseOptionalResult("message".getBytes());
+    Assert.assertTrue(ret.isPresent());
+    Assert.assertEquals(1, ret.get().getMessages().size());
+    Assert.assertEquals(message, ret.get().getMessages().get(0));
+  }
 
+  @Test
+  public void testParseException() {
+    MessageParser<JSONObject> parser = new TestMessageParser() {
       @Override
-      public void configure(Map<String, Object> config) {
+      public List<JSONObject> parse(byte[] rawMessage) {
+        throw new RuntimeException("parse exception");
+      }
+    };
+    Optional<MessageParserResult<JSONObject>> ret = 
parser.parseOptionalResult("message".getBytes());
+    Assert.assertTrue(ret.isPresent());
+    Assert.assertTrue(ret.get().getMasterThrowable().isPresent());
+    Assert.assertEquals("parse exception", 
ret.get().getMasterThrowable().get().getMessage());
+  }
 
+  @Test
+  public void testParseOptionalException() {
+    MessageParser<JSONObject> parser = new TestMessageParser() {
+      @Override
+      public Optional<List<JSONObject>> parseOptional(byte[] rawMessage) {
+        throw new RuntimeException("parse exception");
       }
     };
-    Assert.assertNotNull(parser.parseOptional(null));
-    Optional<List> ret = parser.parseOptional(null);
+    Optional<MessageParserResult<JSONObject>> ret = 
parser.parseOptionalResult("message".getBytes());
     Assert.assertTrue(ret.isPresent());
-    Assert.assertEquals(0, ret.get().size());
+    Assert.assertTrue(ret.get().getMasterThrowable().isPresent());
+    Assert.assertEquals("parse exception", 
ret.get().getMasterThrowable().get().getMessage());
   }
+
 }

Reply via email to