http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/fireeye/BasicFireEyeParser.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/fireeye/BasicFireEyeParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/fireeye/BasicFireEyeParser.java index 489eb00..1cf9fb7 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/fireeye/BasicFireEyeParser.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/fireeye/BasicFireEyeParser.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,18 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.metron.parsers.fireeye; import com.google.common.base.Joiner; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; -import org.apache.commons.lang3.StringUtils; -import org.apache.metron.parsers.utils.ParserUtils; -import org.apache.metron.parsers.BasicParser; -import org.json.simple.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import java.lang.invoke.MethodHandles; +import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.util.ArrayList; import java.util.Arrays; @@ -34,189 +30,162 @@ import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.commons.lang3.StringUtils; +import org.apache.metron.parsers.BasicParser; +import org.apache.metron.parsers.utils.ParserUtils; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class BasicFireEyeParser extends BasicParser { - - private static final long serialVersionUID = 6328907550159134550L; - protected static final Logger LOG = LoggerFactory - .getLogger(BasicFireEyeParser.class); - - - String tsRegex ="([a-zA-Z]{3})\\s+(\\d+)\\s+(\\d+\\:\\d+\\:\\d+)\\s+(\\d+\\.\\d+\\.\\d+\\.\\d+)"; - - - Pattern tsPattern = Pattern.compile(tsRegex); - // private transient static MetronGrok grok; - // private transient static InputStream pattern_url; - - public BasicFireEyeParser() throws Exception { - // pattern_url = getClass().getClassLoader().getResourceAsStream( - // "patterns/fireeye"); - // - // File file = ParserUtils.stream2file(pattern_url); - // grok = MetronGrok.create(file.getPath()); - // - // grok.compile("%{FIREEYE_BASE}"); - } - - @Override - public void configure(Map<String, Object> parserConfig) { - - } - - @Override - public void init() { - - } - - @Override - public List<JSONObject> parse(byte[] raw_message) { - String toParse = ""; - List<JSONObject> messages = new ArrayList<>(); - try { - - toParse = new String(raw_message, "UTF-8"); - - // String[] mTokens = toParse.split(" "); - - String positveIntPattern = "<[1-9][0-9]*>"; - Pattern p = Pattern.compile(positveIntPattern); - Matcher m = p.matcher(toParse); - - String delimiter = ""; - - while (m.find()) { - delimiter = m.group(); - - } - - if (!StringUtils.isBlank(delimiter)) { - String[] tokens = toParse.split(delimiter); - - if (tokens.length > 1) - toParse = delimiter + tokens[1]; - - } - - JSONObject toReturn = parseMessage(toParse); - - toReturn.put("timestamp", getTimeStamp(toParse,delimiter)); - messages.add(toReturn); - return messages; - - } catch (Exception e) { - e.printStackTrace(); - return null; - } - - } - - private long getTimeStamp(String toParse,String delimiter) throws ParseException { - - long ts = 0; - String month = null; - String day = null; - String time = null; - Matcher tsMatcher = tsPattern.matcher(toParse); - if (tsMatcher.find()) { - month = tsMatcher.group(1); - day = tsMatcher.group(2); - time = tsMatcher.group(3); - ts = ParserUtils.convertToEpoch(month, day, time, true); - } else { - LOG.warn("Unable to find timestamp in message: {}", toParse); - } - - return ts; - } - - private JSONObject parseMessage(String toParse) { - - // System.out.println("Received message: " + toParse); - - // MetronMatch gm = grok.match(toParse); - // gm.captures(); - - JSONObject toReturn = new JSONObject(); - //toParse = toParse.replaceAll(" ", " "); - String[] mTokens = toParse.split("\\s+"); - //mTokens = toParse.split(" "); - - // toReturn.putAll(gm.toMap()); - - String id = mTokens[4]; - - // We are not parsing the fedata for multi part message as we cannot - // determine how we can split the message and how many multi part - // messages can there be. - // The message itself will be stored in the response. - - String[] tokens = id.split("\\."); - if (tokens.length == 2) { - - String[] array = Arrays.copyOfRange(mTokens, 1, mTokens.length - 1); - String syslog = Joiner.on(" ").join(array); - - Multimap<String, String> multiMap = formatMain(syslog); - - for (String key : multiMap.keySet()) { - - String value = Joiner.on(",").join(multiMap.get(key)); - toReturn.put(key, value.trim()); - } - - } - - toReturn.put("original_string", toParse); - - String ip_src_addr = (String) toReturn.get("dvc"); - String ip_src_port = (String) toReturn.get("src_port"); - String ip_dst_addr = (String) toReturn.get("dst_ip"); - String ip_dst_port = (String) toReturn.get("dst_port"); - - if (ip_src_addr != null) - toReturn.put("ip_src_addr", ip_src_addr); - if (ip_src_port != null) - toReturn.put("ip_src_port", ip_src_port); - if (ip_dst_addr != null) - toReturn.put("ip_dst_addr", ip_dst_addr); - if (ip_dst_port != null) - toReturn.put("ip_dst_port", ip_dst_port); - -// System.out.println(toReturn); - - return toReturn; - } - - private Multimap<String, String> formatMain(String in) { - Multimap<String, String> multiMap = ArrayListMultimap.create(); - String input = in.replaceAll("cn3", "dst_port") - .replaceAll("cs5", "cncHost").replaceAll("proto", "protocol") - .replaceAll("rt=", "timestamp=").replaceAll("cs1", "malware") - .replaceAll("dst=", "dst_ip=") - .replaceAll("shost", "src_hostname") - .replaceAll("dmac", "dst_mac").replaceAll("smac", "src_mac") - .replaceAll("spt", "src_port") - .replaceAll("\\bsrc\\b", "src_ip"); - String[] tokens = input.split("\\|"); - - if (tokens.length > 0) { - String message = tokens[tokens.length - 1]; - - String pattern = "([\\w\\d]+)=([^=]*)(?=\\s*\\w+=|\\s*$) "; - Pattern p = Pattern.compile(pattern); - Matcher m = p.matcher(message); - - while (m.find()) { - String[] str = m.group().split("="); - multiMap.put(str[0], str[1]); - - } - } - return multiMap; - } - +public class BasicFireEyeParser extends BasicParser { + private static final long serialVersionUID = 6328907550159134550L; + protected static final Logger LOG = LoggerFactory + .getLogger(MethodHandles.lookup().lookupClass()); + + + private static final String tsRegex = "([a-zA-Z]{3})\\s+(\\d+)\\s+(\\d+\\:\\d+\\:\\d+)" + + "\\s+(\\d+\\.\\d+\\.\\d+\\.\\d+)"; + private static final Pattern tsPattern = Pattern.compile(tsRegex); + private static final String syslogPriorityRegex = "<[1-9][0-9]*>"; + private static final Pattern syslogPriorityPattern = Pattern.compile(syslogPriorityRegex); + private static final String nvRegex = "([\\w\\d]+)=([^=]*)(?=\\s*\\w+=|\\s*$) "; + private static final Pattern nvPattern = Pattern.compile(nvRegex); + + @Override + public void configure(Map<String, Object> parserConfig) {} + + @Override + public void init() {} + + + @Override + @SuppressWarnings("unchecked") + public List<JSONObject> parse(byte[] rawMessage) { + String toParse; + List<JSONObject> messages = new ArrayList<>(); + try { + + toParse = new String(rawMessage, StandardCharsets.UTF_8); + + // because we support what is basically a malformed syslog 3164 message having + // some form of text before the PRIORITY, we need to use the priority as + // a delimiter + Matcher m = syslogPriorityPattern.matcher(toParse); + + String delimiter = ""; + + while (m.find()) { + delimiter = m.group(); + } + + if (!StringUtils.isBlank(delimiter)) { + String[] tokens = toParse.split(delimiter); + if (tokens.length > 1) { + toParse = delimiter + tokens[1]; + } + } + + // parse the main message + JSONObject toReturn = parseMessage(toParse); + toReturn.put("timestamp", getTimeStamp(toParse)); + messages.add(toReturn); + return messages; + } catch (Exception e) { + String message = "Unable to parse " + new String(rawMessage) + ": " + e.getMessage(); + LOG.error(message, e); + throw new IllegalStateException(message, e); + } + } + + private long getTimeStamp(String toParse) throws ParseException { + long timestamp = 0; + String month; + String day; + String time; + Matcher tsMatcher = tsPattern.matcher(toParse); + if (tsMatcher.find()) { + month = tsMatcher.group(1); + day = tsMatcher.group(2); + time = tsMatcher.group(3); + timestamp = ParserUtils.convertToEpoch(month, day, time, true); + } else { + LOG.warn("Unable to find timestamp in message: {}", toParse); + } + return timestamp; + } + + @SuppressWarnings("unchecked") + private JSONObject parseMessage(String toParse) { + + JSONObject toReturn = new JSONObject(); + String[] messageTokens = toParse.split("\\s+"); + String id = messageTokens[4]; + + // We are not parsing the fedata for multi part message as we cannot + // determine how we can split the message and how many multi part + // messages can there be. + // The message itself will be stored in the response. + + String[] tokens = id.split("\\."); + if (tokens.length == 2) { + + String[] array = Arrays.copyOfRange(messageTokens, 1, messageTokens.length - 1); + String syslog = Joiner.on(" ").join(array); + + Multimap<String, String> multiMap = formatMain(syslog); + + for (String key : multiMap.keySet()) { + String value = Joiner.on(",").join(multiMap.get(key)); + toReturn.put(key, value.trim()); + } + } + + toReturn.put("original_string", toParse); + + final String ipSrcAddr = (String) toReturn.get("dvc"); + final String ipSrcPort = (String) toReturn.get("src_port"); + final String ipDstDddr = (String) toReturn.get("dst_ip"); + final String ipDstPort = (String) toReturn.get("dst_port"); + + if (ipSrcAddr != null) { + toReturn.put("ip_src_addr", ipSrcAddr); + } + if (ipSrcPort != null) { + toReturn.put("ip_src_port", ipSrcPort); + } + if (ipDstDddr != null) { + toReturn.put("ip_dst_addr", ipDstDddr); + } + if (ipDstPort != null) { + toReturn.put("ip_dst_port", ipDstPort); + } + return toReturn; + } + + private Multimap<String, String> formatMain(String in) { + Multimap<String, String> multiMap = ArrayListMultimap.create(); + String input = in.replaceAll("cn3", "dst_port") + .replaceAll("cs5", "cncHost").replaceAll("proto", "protocol") + .replaceAll("rt=", "timestamp=").replaceAll("cs1", "malware") + .replaceAll("dst=", "dst_ip=") + .replaceAll("shost", "src_hostname") + .replaceAll("dmac", "dst_mac").replaceAll("smac", "src_mac") + .replaceAll("spt", "src_port") + .replaceAll("\\bsrc\\b", "src_ip"); + String[] tokens = input.split("\\|"); + + if (tokens.length > 0) { + String message = tokens[tokens.length - 1]; + Matcher m = nvPattern.matcher(message); + + while (m.find()) { + String[] str = m.group().split("="); + multiMap.put(str[0], str[1]); + } + } + return multiMap; + } }
http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java index b7b91c0..207c070 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java @@ -21,5 +21,5 @@ import org.apache.metron.stellar.dsl.Context; public interface MessageFilter<T> extends Configurable{ - boolean emitTuple(T message, Context context); + boolean emit(T message, Context context); } http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java index e3b903e..c9f8351 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java @@ -17,7 +17,12 @@ */ package org.apache.metron.parsers.interfaces; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.metron.parsers.DefaultMessageParserResult; + import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -31,23 +36,47 @@ public interface MessageParser<T> extends Configurable { /** * Take raw data and convert it to a list of messages. * - * @param rawMessage + * @param rawMessage the raw bytes of the message * @return If null is returned, this is treated as an empty list. */ - List<T> parse(byte[] rawMessage); + @Deprecated + default List<T> parse(byte[] rawMessage) { + throw new NotImplementedException("parse is not implemented"); + } /** * Take raw data and convert it to an optional list of messages. - * @param parseMessage + * @param parseMessage the raw bytes of the message * @return If null is returned, this is treated as an empty list. */ + @Deprecated default Optional<List<T>> parseOptional(byte[] parseMessage) { return Optional.ofNullable(parse(parseMessage)); } /** + * Take raw data and convert it to messages. Each raw message may produce multiple messages and therefore + * multiple errors. A {@link MessageParserResult} is returned, which will have both the messages produced + * and the errors. + * @param parseMessage the raw bytes of the message + * @return Optional of {@link MessageParserResult} + */ + default Optional<MessageParserResult<T>> parseOptionalResult(byte[] parseMessage) { + Optional<MessageParserResult<T>> result = Optional.empty(); + try { + Optional<List<T>> optionalMessages = parseOptional(parseMessage); + if (optionalMessages.isPresent()) { + result = Optional.of(new DefaultMessageParserResult<>(optionalMessages.get())); + } + } catch (Throwable t) { + return Optional.of(new DefaultMessageParserResult<>(t)); + } + return result; + } + + /** * Validate the message to ensure that it's correct. - * @param message + * @param message the message to validate * @return true if the message is valid, false if not */ boolean validate(T message); http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParserResult.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParserResult.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParserResult.java new file mode 100644 index 0000000..891e94f --- /dev/null +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParserResult.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.parsers.interfaces; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * Result object MessageParser calls. + * @param <T> + */ +public interface MessageParserResult<T> { + /** + * Returns the Message objects of {@code T} + * @return {@code List} + */ + List<T> getMessages(); + + /** + * Returns a map of raw message objects to the {@code Throwable} they triggered. + * @return {@code Map} + */ + Map<Object,Throwable> getMessageThrowables(); + + /** + * Returns a master {@code Throwable} for a parse call. This represents a complete + * call failure, as opposed to one associated with a message. + * @return {@code Optional}{@code Throwable} + */ + Optional<Throwable> getMasterThrowable(); +} http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/paloalto/BasicPaloAltoFirewallParser.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/paloalto/BasicPaloAltoFirewallParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/paloalto/BasicPaloAltoFirewallParser.java index 9051f09..c8e8b62 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/paloalto/BasicPaloAltoFirewallParser.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/paloalto/BasicPaloAltoFirewallParser.java @@ -34,21 +34,26 @@ import java.util.regex.Pattern; public class BasicPaloAltoFirewallParser extends BasicParser { - private static boolean empty_attribute( final String s ) { + private static boolean empty_attribute(final String s) { return s == null || s.trim().isEmpty() || s.equals("\"\""); } - private static String unquoted_attribute( String s ) { + private static String unquoted_attribute(String s) { s = s.trim(); - if ( s.startsWith( "\"" ) && s.endsWith( "\"" ) ) - return s.substring( 1, s.length( ) - 1 ); + if (s.startsWith("\"") && s.endsWith("\"")) + return s.substring(1, s.length() - 1); return s; } - private static final Logger _LOG = LoggerFactory.getLogger - (BasicPaloAltoFirewallParser.class); + private static final Logger _LOG = LoggerFactory.getLogger(BasicPaloAltoFirewallParser.class); private static final long serialVersionUID = 3147090149725343999L; + + private static final String LogTypeConfig = "CONFIG"; + private static final String LogTypeSystem = "SYSTEM"; + private static final String LogTypeThreat = "THREAT"; + private static final String LogTypeTraffic = "TRAFFIC"; + public static final String PaloAltoDomain = "palo_alto_domain"; public static final String ReceiveTime = "receive_time"; public static final String SerialNum = "serial"; @@ -101,6 +106,21 @@ public class BasicPaloAltoFirewallParser extends BasicParser { public static final String ParentSessionStartTime = "parent_session_start_time"; public static final String TunnelType = "tunnel_type"; + //System + public static final String EventId = "event_id"; + public static final String Object = "object"; + public static final String Module = "module"; + public static final String Description = "description"; + + //Config + public static final String Command = "command"; + public static final String Admin = "admin"; + public static final String Client = "client"; + public static final String Result = "result"; + public static final String ConfigurationPath = "configuration_path"; + public static final String BeforeChangeDetail = "before_change_detail"; + public static final String AfterChangeDetail = "after_change_detail"; + //Threat public static final String URL = "url"; public static final String HOST = "host"; @@ -113,7 +133,7 @@ public class BasicPaloAltoFirewallParser extends BasicParser { public static final String PCAPID = "pcap_id"; public static final String WFFileDigest = "filedigest"; public static final String WFCloud = "cloud"; - public static final String UserAgent= "user_agent"; + public static final String UserAgent = "user_agent"; public static final String WFFileType = "filetype"; public static final String XForwardedFor = "xff"; public static final String Referer = "referer"; @@ -159,8 +179,6 @@ public class BasicPaloAltoFirewallParser extends BasicParser { toParse = new String(msg, "UTF-8"); _LOG.debug("Received message: {}", toParse); - - parseMessage(toParse, outputMessage); long timestamp = System.currentTimeMillis(); outputMessage.put("timestamp", System.currentTimeMillis()); @@ -178,159 +196,258 @@ public class BasicPaloAltoFirewallParser extends BasicParser { private void parseMessage(String message, JSONObject outputMessage) { String[] tokens = Iterables.toArray(Splitter.on(Pattern.compile(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)")).split(message), String.class); + int parser_version = 0; String type = tokens[3].trim(); + //validate log types + if (!type.equals(LogTypeConfig) && + !type.equals(LogTypeThreat) && + !type.equals(LogTypeTraffic) && + !type.equals(LogTypeSystem)) { + throw new UnsupportedOperationException("Unsupported log type."); + } + //populate common objects - if( !empty_attribute( tokens[0] ) ) outputMessage.put(PaloAltoDomain, tokens[0].trim()); - if( !empty_attribute( tokens[1] ) ) outputMessage.put(ReceiveTime, tokens[1].trim()); - if( !empty_attribute( tokens[2] ) ) outputMessage.put(SerialNum, tokens[2].trim()); + if (!empty_attribute(tokens[0])) outputMessage.put(PaloAltoDomain, tokens[0].trim()); + if (!empty_attribute(tokens[1])) outputMessage.put(ReceiveTime, tokens[1].trim()); + if (!empty_attribute(tokens[2])) outputMessage.put(SerialNum, tokens[2].trim()); outputMessage.put(Type, type); - if( !empty_attribute( tokens[4] ) ) outputMessage.put(ThreatContentType, unquoted_attribute(tokens[4])); - if( !empty_attribute( tokens[5] ) ) outputMessage.put(ConfigVersion, tokens[5].trim()); - if( !empty_attribute( tokens[6] ) ) outputMessage.put(GenerateTime, tokens[6].trim()); - if( !empty_attribute( tokens[7] ) ) outputMessage.put(SourceAddress, tokens[7].trim()); - if( !empty_attribute( tokens[8] ) ) outputMessage.put(DestinationAddress, tokens[8].trim()); - if( !empty_attribute( tokens[9] ) ) outputMessage.put(NATSourceIP, tokens[9].trim()); - if( !empty_attribute( tokens[10] ) ) outputMessage.put(NATDestinationIP, tokens[10].trim()); - if( !empty_attribute( tokens[11] ) ) outputMessage.put(Rule, unquoted_attribute(tokens[11])); - if( !empty_attribute( tokens[12] ) ) outputMessage.put(SourceUser, unquoted_attribute(tokens[12])); - if( !empty_attribute( tokens[13] ) ) outputMessage.put(DestinationUser, unquoted_attribute(tokens[13])); - if( !empty_attribute( tokens[14] ) ) outputMessage.put(Application, unquoted_attribute(tokens[14])); - if( !empty_attribute( tokens[15] ) ) outputMessage.put(VirtualSystem, unquoted_attribute(tokens[15])); - if( !empty_attribute( tokens[16] ) ) outputMessage.put(SourceZone, unquoted_attribute(tokens[16])); - if( !empty_attribute( tokens[17] ) ) outputMessage.put(DestinationZone, unquoted_attribute(tokens[17])); - if( !empty_attribute( tokens[18] ) ) outputMessage.put(InboundInterface, unquoted_attribute(tokens[18])); - if( !empty_attribute( tokens[19] ) ) outputMessage.put(OutboundInterface, unquoted_attribute(tokens[19])); - if( !empty_attribute( tokens[20] ) ) outputMessage.put(LogAction, unquoted_attribute(tokens[20])); - if( !empty_attribute( tokens[21] ) ) outputMessage.put(TimeLogged, tokens[21].trim()); - if( !empty_attribute( tokens[22] ) ) outputMessage.put(SessionID, tokens[22].trim()); - if( !empty_attribute( tokens[23] ) ) outputMessage.put(RepeatCount, tokens[23].trim()); - if( !empty_attribute( tokens[24] ) ) outputMessage.put(SourcePort, tokens[24].trim()); - if( !empty_attribute( tokens[25] ) ) outputMessage.put(DestinationPort, tokens[25].trim()); - if( !empty_attribute( tokens[26] ) ) outputMessage.put(NATSourcePort, tokens[26].trim()); - if( !empty_attribute( tokens[27] ) ) outputMessage.put(NATDestinationPort, tokens[27].trim()); - if( !empty_attribute( tokens[28] ) ) outputMessage.put(Flags, tokens[28].trim()); - if( !empty_attribute( tokens[29] ) ) outputMessage.put(IPProtocol, unquoted_attribute(tokens[29])); - if( !empty_attribute( tokens[30] ) ) outputMessage.put(Action, unquoted_attribute(tokens[30])); - - - if ("THREAT".equals(type.toUpperCase())) { - int p1_offset = 0; - if (tokens.length == 45) parser_version = 60; - else if (tokens.length == 53) parser_version = 61; - else if (tokens.length == 61) { - parser_version = 70; - p1_offset = 1; - } - else if (tokens.length == 72) { - parser_version = 80; - p1_offset =1; + if (!empty_attribute(tokens[4])) outputMessage.put(ThreatContentType, unquoted_attribute(tokens[4])); + if (!empty_attribute(tokens[5])) outputMessage.put(ConfigVersion, tokens[5].trim()); + if (!empty_attribute(tokens[6])) outputMessage.put(GenerateTime, tokens[6].trim()); + + if (LogTypeConfig.equals(type.toUpperCase())) { + // There are two fields in custom logs only and they are not in the default format. + // But we need to parse them if they exist + if (tokens.length == 16 || tokens.length == 18) parser_version = 61; + else if (tokens.length == 22 || tokens.length == 24) parser_version = 80; + + if (parser_version >= 61) { + if (!empty_attribute(tokens[7])) outputMessage.put(HOST, tokens[7].trim()); + if (!empty_attribute(tokens[8])) outputMessage.put(VirtualSystem, tokens[8].trim()); + if (!empty_attribute(tokens[9])) outputMessage.put(Command, tokens[9].trim()); + if (!empty_attribute(tokens[10])) outputMessage.put(Admin, tokens[10].trim()); + if (!empty_attribute(tokens[11])) outputMessage.put(Client, unquoted_attribute(tokens[11])); + if (!empty_attribute(tokens[12])) outputMessage.put(Result, unquoted_attribute(tokens[12])); + if (!empty_attribute(tokens[13])) outputMessage.put(ConfigurationPath, unquoted_attribute(tokens[13])); } - outputMessage.put(ParserVersion, parser_version); - if( !empty_attribute( tokens[31] ) ) { - outputMessage.put(URL, unquoted_attribute(tokens[31])); - try { - URL url = new URL(unquoted_attribute(tokens[31])); - outputMessage.put(HOST, url.getHost()); - } catch (MalformedURLException e) { + + if (parser_version == 61) { + if (!empty_attribute(tokens[14])) outputMessage.put(Seqno, unquoted_attribute(tokens[14])); + if (!empty_attribute(tokens[15])) outputMessage.put(ActionFlags, unquoted_attribute(tokens[15])); + if (tokens.length == 18) { + if (!empty_attribute(tokens[16])) + outputMessage.put(BeforeChangeDetail, unquoted_attribute(tokens[16])); + if (!empty_attribute(tokens[17])) + outputMessage.put(AfterChangeDetail, unquoted_attribute(tokens[17])); } } - if( !empty_attribute( tokens[32] ) ) outputMessage.put(ThreatID, tokens[32].trim()); - if( !empty_attribute( tokens[33] ) ) outputMessage.put(Category, unquoted_attribute(tokens[33])); - if( !empty_attribute( tokens[34] ) ) outputMessage.put(Severity, unquoted_attribute(tokens[34])); - if( !empty_attribute( tokens[35] ) ) outputMessage.put(Direction, unquoted_attribute(tokens[35])); - if( !empty_attribute( tokens[36] ) ) outputMessage.put(Seqno, tokens[36].trim()); - if( !empty_attribute( tokens[37] ) ) outputMessage.put(ActionFlags, unquoted_attribute(tokens[37])); - if( !empty_attribute( tokens[38] ) ) outputMessage.put(SourceLocation, unquoted_attribute(tokens[38])); - if( !empty_attribute( tokens[39] ) ) outputMessage.put(DestinationLocation, unquoted_attribute(tokens[39])); - if( !empty_attribute( tokens[41] ) ) outputMessage.put(ContentType, unquoted_attribute(tokens[41])); - if( !empty_attribute( tokens[42] ) ) outputMessage.put(PCAPID, tokens[42].trim()); - if( !empty_attribute( tokens[43] ) ) outputMessage.put(WFFileDigest, unquoted_attribute(tokens[43])); - if( !empty_attribute( tokens[44] ) ) outputMessage.put(WFCloud, unquoted_attribute(tokens[44])); - if ( parser_version >= 61) { - if( !empty_attribute( tokens[(45 + p1_offset)] ) ) outputMessage.put(UserAgent, unquoted_attribute(tokens[(45 + p1_offset)])); - if( !empty_attribute( tokens[(46 + p1_offset)] ) ) outputMessage.put(WFFileType, unquoted_attribute(tokens[(46 + p1_offset)])); - if( !empty_attribute( tokens[(47 + p1_offset)] ) ) outputMessage.put(XForwardedFor, unquoted_attribute(tokens[(47 + p1_offset)])); - if( !empty_attribute( tokens[(48 + p1_offset)] ) ) outputMessage.put(Referer, unquoted_attribute(tokens[(48 + p1_offset)])); - if( !empty_attribute( tokens[(49 + p1_offset)] ) ) outputMessage.put(WFSender, unquoted_attribute(tokens[(49 + p1_offset)])); - if( !empty_attribute( tokens[(50 + p1_offset)] ) ) outputMessage.put(WFSubject, unquoted_attribute(tokens[(50 + p1_offset)])); - if( !empty_attribute( tokens[(51 + p1_offset)] ) ) outputMessage.put(WFRecipient, unquoted_attribute(tokens[(51 + p1_offset)])); - if( !empty_attribute( tokens[(52 + p1_offset)] ) ) outputMessage.put(WFReportID, unquoted_attribute(tokens[(52 + p1_offset)])); - } - if ( parser_version >= 70) { - if( !empty_attribute( tokens[45] ) ) outputMessage.put(URLIndex, tokens[45].trim()); - if( !empty_attribute( tokens[54] ) ) outputMessage.put(DGH1, tokens[54].trim()); - if( !empty_attribute( tokens[55] ) ) outputMessage.put(DGH2, tokens[55].trim()); - if( !empty_attribute( tokens[56] ) ) outputMessage.put(DGH3, tokens[56].trim()); - if( !empty_attribute( tokens[57] ) ) outputMessage.put(DGH4, tokens[57].trim()); - if( !empty_attribute( tokens[58] ) ) outputMessage.put(VSYSName, unquoted_attribute(tokens[58])); - if( !empty_attribute( tokens[59] ) ) outputMessage.put(DeviceName, unquoted_attribute(tokens[59])); - } - if ( parser_version >= 80) { - if( !empty_attribute( tokens[61] ) ) outputMessage.put(SourceVmUuid, tokens[61].trim()); - if( !empty_attribute( tokens[62] ) ) outputMessage.put(DestinationVmUuid, tokens[62].trim()); - if( !empty_attribute( tokens[63] ) ) outputMessage.put(HTTPMethod, tokens[63].trim()); - if( !empty_attribute( tokens[64] ) ) outputMessage.put(TunnelId, tokens[64].trim()); - if( !empty_attribute( tokens[65] ) ) outputMessage.put(MonitorTag, tokens[65].trim()); - if( !empty_attribute( tokens[66] ) ) outputMessage.put(ParentSessionId, tokens[66].trim()); - if( !empty_attribute( tokens[67] ) ) outputMessage.put(ParentSessionStartTime, tokens[67].trim()); - if( !empty_attribute( tokens[68] ) ) outputMessage.put(TunnelType, tokens[68].trim()); - if( !empty_attribute( tokens[69] ) ) outputMessage.put(ThreatCategory, tokens[69].trim()); - if( !empty_attribute( tokens[70] ) ) outputMessage.put(ContentVersion, tokens[70].trim()); - } - if ( parser_version == 0) { - outputMessage.put(Tokens, tokens.length); - } - - } else if ("TRAFFIC".equals(type.toUpperCase())) { - if (tokens.length == 46) parser_version = 60; - else if (tokens.length == 47) parser_version = 61; - else if (tokens.length == 54) parser_version = 70; - else if (tokens.length == 61) parser_version = 80; - outputMessage.put(ParserVersion, parser_version); - if( !empty_attribute( tokens[31] ) ) outputMessage.put(Bytes, tokens[31].trim()); - if( !empty_attribute( tokens[32] ) ) outputMessage.put(BytesSent, tokens[32].trim()); - if( !empty_attribute( tokens[33] ) ) outputMessage.put(BytesReceived, tokens[33].trim()); - if( !empty_attribute( tokens[34] ) ) outputMessage.put(Packets, tokens[34].trim()); - if( !empty_attribute( tokens[35] ) ) outputMessage.put(StartTime, tokens[35].trim()); - if( !empty_attribute( tokens[36] ) ) outputMessage.put(ElapsedTimeInSec, tokens[36].trim()); - if( !empty_attribute( tokens[37] ) ) outputMessage.put(Category, unquoted_attribute(tokens[37])); - if( !empty_attribute( tokens[39] ) ) outputMessage.put(Seqno, tokens[39].trim()); - if( !empty_attribute( tokens[40] ) ) outputMessage.put(ActionFlags, unquoted_attribute(tokens[40])); - if( !empty_attribute( tokens[41] ) ) outputMessage.put(SourceLocation, unquoted_attribute(tokens[41])); - if( !empty_attribute( tokens[42] ) ) outputMessage.put(DestinationLocation, unquoted_attribute(tokens[42])); - if( !empty_attribute( tokens[44] ) ) outputMessage.put(PktsSent, tokens[44].trim()); - if( !empty_attribute( tokens[45] ) ) outputMessage.put(PktsReceived, tokens[45].trim()); - if ( parser_version >= 61) { - if( !empty_attribute( tokens[46] ) ) outputMessage.put(EndReason, unquoted_attribute(tokens[46])); + if (parser_version >= 70) { + int custom_fields_offset = 0; + if (tokens.length == 24) { + if (!empty_attribute(tokens[14])) { + outputMessage.put(BeforeChangeDetail, unquoted_attribute(tokens[14 + custom_fields_offset])); + } + if (!empty_attribute(tokens[15])) { + outputMessage.put(AfterChangeDetail, unquoted_attribute(tokens[15 + custom_fields_offset])); + } + custom_fields_offset = 2; + } + if (!empty_attribute(tokens[14 + custom_fields_offset])) { + outputMessage.put(Seqno, unquoted_attribute(tokens[14 + custom_fields_offset])); + } + if (!empty_attribute(tokens[15 + custom_fields_offset])) { + outputMessage.put(ActionFlags, unquoted_attribute(tokens[15 + custom_fields_offset])); + } + if (!empty_attribute(tokens[16 + custom_fields_offset])) { + outputMessage.put(DGH1, unquoted_attribute(tokens[16 + custom_fields_offset])); + } + if (!empty_attribute(tokens[17 + custom_fields_offset])) { + outputMessage.put(DGH2, unquoted_attribute(tokens[17 + custom_fields_offset])); + } + if (!empty_attribute(tokens[18 + custom_fields_offset])) { + outputMessage.put(DGH3, unquoted_attribute(tokens[18 + custom_fields_offset])); + } + if (!empty_attribute(tokens[19 + custom_fields_offset])) { + outputMessage.put(DGH4, unquoted_attribute(tokens[19 + custom_fields_offset])); + } + if (!empty_attribute(tokens[20 + custom_fields_offset])) { + outputMessage.put(VSYSName, unquoted_attribute(tokens[20 + custom_fields_offset])); + } + if (!empty_attribute(tokens[21 + custom_fields_offset])) { + outputMessage.put(DeviceName, unquoted_attribute(tokens[21 + custom_fields_offset])); + } } - if ( parser_version >= 70) { - if( !empty_attribute( tokens[47] ) ) outputMessage.put(DGH1, tokens[47].trim()); - if( !empty_attribute( tokens[48] ) ) outputMessage.put(DGH2, tokens[48].trim()); - if( !empty_attribute( tokens[49] ) ) outputMessage.put(DGH3, tokens[49].trim()); - if( !empty_attribute( tokens[50] ) ) outputMessage.put(DGH4, tokens[50].trim()); - if( !empty_attribute( tokens[51] ) ) outputMessage.put(VSYSName, unquoted_attribute(tokens[51])); - if( !empty_attribute( tokens[52] ) ) outputMessage.put(DeviceName, unquoted_attribute(tokens[52])); - if( !empty_attribute( tokens[53] ) ) outputMessage.put(ActionSource, unquoted_attribute(tokens[53])); + } else if (LogTypeSystem.equals(type.toUpperCase())) { + if (tokens.length == 17) parser_version = 61; + else if (tokens.length == 23) parser_version = 80; + + if (parser_version >= 61) { + if (!empty_attribute(tokens[7])) outputMessage.put(VirtualSystem, tokens[7].trim()); + if (!empty_attribute(tokens[8])) outputMessage.put(EventId, tokens[8].trim()); + if (!empty_attribute(tokens[9])) outputMessage.put(Object, tokens[9].trim()); + + if (!empty_attribute(tokens[12])) outputMessage.put(Module, tokens[12].trim()); + if (!empty_attribute(tokens[13])) outputMessage.put(Severity, unquoted_attribute(tokens[13])); + if (!empty_attribute(tokens[14])) outputMessage.put(Description, unquoted_attribute(tokens[14])); + if (!empty_attribute(tokens[15])) outputMessage.put(Seqno, unquoted_attribute(tokens[15])); + if (!empty_attribute(tokens[16])) outputMessage.put(ActionFlags, unquoted_attribute(tokens[16])); } - if ( parser_version >= 80) { - if( !empty_attribute( tokens[54] ) ) outputMessage.put(SourceVmUuid, tokens[54].trim()); - if( !empty_attribute( tokens[55] ) ) outputMessage.put(DestinationVmUuid, tokens[55].trim()); - if( !empty_attribute( tokens[56] ) ) outputMessage.put(TunnelId, tokens[56].trim()); - if( !empty_attribute( tokens[57] ) ) outputMessage.put(MonitorTag, tokens[57].trim()); - if( !empty_attribute( tokens[58] ) ) outputMessage.put(ParentSessionId, tokens[58].trim()); - if( !empty_attribute( tokens[59] ) ) outputMessage.put(ParentSessionStartTime, tokens[59].trim()); - if( !empty_attribute( tokens[60] ) ) outputMessage.put(TunnelType, tokens[60].trim()); + + if (parser_version == 80) { + if (!empty_attribute(tokens[17])) outputMessage.put(DGH1, tokens[17].trim()); + if (!empty_attribute(tokens[18])) outputMessage.put(DGH2, tokens[18].trim()); + if (!empty_attribute(tokens[19])) outputMessage.put(DGH3, tokens[19].trim()); + if (!empty_attribute(tokens[20])) outputMessage.put(DGH4, tokens[20].trim()); + if (!empty_attribute(tokens[21])) outputMessage.put(VSYSName, unquoted_attribute(tokens[21])); + if (!empty_attribute(tokens[22])) outputMessage.put(DeviceName, unquoted_attribute(tokens[22])); } - if ( parser_version == 0) { - outputMessage.put(Tokens, tokens.length); + } else if (LogTypeThreat.equals(type.toUpperCase()) || + LogTypeTraffic.equals(type.toUpperCase())) { + if (!empty_attribute(tokens[7])) outputMessage.put(SourceAddress, tokens[7].trim()); + if (!empty_attribute(tokens[8])) outputMessage.put(DestinationAddress, tokens[8].trim()); + if (!empty_attribute(tokens[9])) outputMessage.put(NATSourceIP, tokens[9].trim()); + if (!empty_attribute(tokens[10])) outputMessage.put(NATDestinationIP, tokens[10].trim()); + if (!empty_attribute(tokens[11])) outputMessage.put(Rule, unquoted_attribute(tokens[11])); + if (!empty_attribute(tokens[12])) outputMessage.put(SourceUser, unquoted_attribute(tokens[12])); + if (!empty_attribute(tokens[13])) outputMessage.put(DestinationUser, unquoted_attribute(tokens[13])); + if (!empty_attribute(tokens[14])) outputMessage.put(Application, unquoted_attribute(tokens[14])); + if (!empty_attribute(tokens[15])) outputMessage.put(VirtualSystem, unquoted_attribute(tokens[15])); + if (!empty_attribute(tokens[16])) outputMessage.put(SourceZone, unquoted_attribute(tokens[16])); + if (!empty_attribute(tokens[17])) outputMessage.put(DestinationZone, unquoted_attribute(tokens[17])); + if (!empty_attribute(tokens[18])) outputMessage.put(InboundInterface, unquoted_attribute(tokens[18])); + if (!empty_attribute(tokens[19])) outputMessage.put(OutboundInterface, unquoted_attribute(tokens[19])); + if (!empty_attribute(tokens[20])) outputMessage.put(LogAction, unquoted_attribute(tokens[20])); + if (!empty_attribute(tokens[21])) outputMessage.put(TimeLogged, tokens[21].trim()); + if (!empty_attribute(tokens[22])) outputMessage.put(SessionID, tokens[22].trim()); + if (!empty_attribute(tokens[23])) outputMessage.put(RepeatCount, tokens[23].trim()); + if (!empty_attribute(tokens[24])) outputMessage.put(SourcePort, tokens[24].trim()); + if (!empty_attribute(tokens[25])) outputMessage.put(DestinationPort, tokens[25].trim()); + if (!empty_attribute(tokens[26])) outputMessage.put(NATSourcePort, tokens[26].trim()); + if (!empty_attribute(tokens[27])) outputMessage.put(NATDestinationPort, tokens[27].trim()); + if (!empty_attribute(tokens[28])) outputMessage.put(Flags, tokens[28].trim()); + if (!empty_attribute(tokens[29])) outputMessage.put(IPProtocol, unquoted_attribute(tokens[29])); + if (!empty_attribute(tokens[30])) outputMessage.put(Action, unquoted_attribute(tokens[30])); + + if (LogTypeThreat.equals(type.toUpperCase())) { + int p1_offset = 0; + if (tokens.length == 45) parser_version = 60; + else if (tokens.length == 53) parser_version = 61; + else if (tokens.length == 61) { + parser_version = 70; + p1_offset = 1; + } else if (tokens.length == 72) { + parser_version = 80; + p1_offset = 1; + } + if (!empty_attribute(tokens[31])) { + outputMessage.put(URL, unquoted_attribute(tokens[31])); + try { + URL url = new URL(unquoted_attribute(tokens[31])); + outputMessage.put(HOST, url.getHost()); + } catch (MalformedURLException e) { + } + } + if (!empty_attribute(tokens[32])) outputMessage.put(ThreatID, tokens[32].trim()); + if (!empty_attribute(tokens[33])) outputMessage.put(Category, unquoted_attribute(tokens[33])); + if (!empty_attribute(tokens[34])) outputMessage.put(Severity, unquoted_attribute(tokens[34])); + if (!empty_attribute(tokens[35])) outputMessage.put(Direction, unquoted_attribute(tokens[35])); + if (!empty_attribute(tokens[36])) outputMessage.put(Seqno, tokens[36].trim()); + if (!empty_attribute(tokens[37])) outputMessage.put(ActionFlags, unquoted_attribute(tokens[37])); + if (!empty_attribute(tokens[38])) outputMessage.put(SourceLocation, unquoted_attribute(tokens[38])); + if (!empty_attribute(tokens[39])) + outputMessage.put(DestinationLocation, unquoted_attribute(tokens[39])); + if (!empty_attribute(tokens[41])) outputMessage.put(ContentType, unquoted_attribute(tokens[41])); + if (!empty_attribute(tokens[42])) outputMessage.put(PCAPID, tokens[42].trim()); + if (!empty_attribute(tokens[43])) outputMessage.put(WFFileDigest, unquoted_attribute(tokens[43])); + if (!empty_attribute(tokens[44])) outputMessage.put(WFCloud, unquoted_attribute(tokens[44])); + if (parser_version >= 61) { + if (!empty_attribute(tokens[(45 + p1_offset)])) + outputMessage.put(UserAgent, unquoted_attribute(tokens[(45 + p1_offset)])); + if (!empty_attribute(tokens[(46 + p1_offset)])) + outputMessage.put(WFFileType, unquoted_attribute(tokens[(46 + p1_offset)])); + if (!empty_attribute(tokens[(47 + p1_offset)])) + outputMessage.put(XForwardedFor, unquoted_attribute(tokens[(47 + p1_offset)])); + if (!empty_attribute(tokens[(48 + p1_offset)])) + outputMessage.put(Referer, unquoted_attribute(tokens[(48 + p1_offset)])); + if (!empty_attribute(tokens[(49 + p1_offset)])) + outputMessage.put(WFSender, unquoted_attribute(tokens[(49 + p1_offset)])); + if (!empty_attribute(tokens[(50 + p1_offset)])) + outputMessage.put(WFSubject, unquoted_attribute(tokens[(50 + p1_offset)])); + if (!empty_attribute(tokens[(51 + p1_offset)])) + outputMessage.put(WFRecipient, unquoted_attribute(tokens[(51 + p1_offset)])); + if (!empty_attribute(tokens[(52 + p1_offset)])) + outputMessage.put(WFReportID, unquoted_attribute(tokens[(52 + p1_offset)])); + } + if (parser_version >= 70) { + if (!empty_attribute(tokens[45])) outputMessage.put(URLIndex, tokens[45].trim()); + if (!empty_attribute(tokens[54])) outputMessage.put(DGH1, tokens[54].trim()); + if (!empty_attribute(tokens[55])) outputMessage.put(DGH2, tokens[55].trim()); + if (!empty_attribute(tokens[56])) outputMessage.put(DGH3, tokens[56].trim()); + if (!empty_attribute(tokens[57])) outputMessage.put(DGH4, tokens[57].trim()); + if (!empty_attribute(tokens[58])) outputMessage.put(VSYSName, unquoted_attribute(tokens[58])); + if (!empty_attribute(tokens[59])) outputMessage.put(DeviceName, unquoted_attribute(tokens[59])); + } + if (parser_version >= 80) { + if (!empty_attribute(tokens[61])) outputMessage.put(SourceVmUuid, tokens[61].trim()); + if (!empty_attribute(tokens[62])) outputMessage.put(DestinationVmUuid, tokens[62].trim()); + if (!empty_attribute(tokens[63])) outputMessage.put(HTTPMethod, tokens[63].trim()); + if (!empty_attribute(tokens[64])) outputMessage.put(TunnelId, tokens[64].trim()); + if (!empty_attribute(tokens[65])) outputMessage.put(MonitorTag, tokens[65].trim()); + if (!empty_attribute(tokens[66])) outputMessage.put(ParentSessionId, tokens[66].trim()); + if (!empty_attribute(tokens[67])) outputMessage.put(ParentSessionStartTime, tokens[67].trim()); + if (!empty_attribute(tokens[68])) outputMessage.put(TunnelType, tokens[68].trim()); + if (!empty_attribute(tokens[69])) outputMessage.put(ThreatCategory, tokens[69].trim()); + if (!empty_attribute(tokens[70])) outputMessage.put(ContentVersion, tokens[70].trim()); + } + } else if (LogTypeTraffic.equals(type.toUpperCase())) { + if (tokens.length == 46) parser_version = 60; + else if (tokens.length == 47) parser_version = 61; + else if (tokens.length == 54) parser_version = 70; + else if (tokens.length == 61) parser_version = 80; + if (!empty_attribute(tokens[31])) outputMessage.put(Bytes, tokens[31].trim()); + if (!empty_attribute(tokens[32])) outputMessage.put(BytesSent, tokens[32].trim()); + if (!empty_attribute(tokens[33])) outputMessage.put(BytesReceived, tokens[33].trim()); + if (!empty_attribute(tokens[34])) outputMessage.put(Packets, tokens[34].trim()); + if (!empty_attribute(tokens[35])) outputMessage.put(StartTime, tokens[35].trim()); + if (!empty_attribute(tokens[36])) outputMessage.put(ElapsedTimeInSec, tokens[36].trim()); + if (!empty_attribute(tokens[37])) outputMessage.put(Category, unquoted_attribute(tokens[37])); + if (!empty_attribute(tokens[39])) outputMessage.put(Seqno, tokens[39].trim()); + if (!empty_attribute(tokens[40])) outputMessage.put(ActionFlags, unquoted_attribute(tokens[40])); + if (!empty_attribute(tokens[41])) outputMessage.put(SourceLocation, unquoted_attribute(tokens[41])); + if (!empty_attribute(tokens[42])) + outputMessage.put(DestinationLocation, unquoted_attribute(tokens[42])); + if (!empty_attribute(tokens[44])) outputMessage.put(PktsSent, tokens[44].trim()); + if (!empty_attribute(tokens[45])) outputMessage.put(PktsReceived, tokens[45].trim()); + if (parser_version >= 61) { + if (!empty_attribute(tokens[46])) outputMessage.put(EndReason, unquoted_attribute(tokens[46])); + } + if (parser_version >= 70) { + if (!empty_attribute(tokens[47])) outputMessage.put(DGH1, tokens[47].trim()); + if (!empty_attribute(tokens[48])) outputMessage.put(DGH2, tokens[48].trim()); + if (!empty_attribute(tokens[49])) outputMessage.put(DGH3, tokens[49].trim()); + if (!empty_attribute(tokens[50])) outputMessage.put(DGH4, tokens[50].trim()); + if (!empty_attribute(tokens[51])) outputMessage.put(VSYSName, unquoted_attribute(tokens[51])); + if (!empty_attribute(tokens[52])) outputMessage.put(DeviceName, unquoted_attribute(tokens[52])); + if (!empty_attribute(tokens[53])) outputMessage.put(ActionSource, unquoted_attribute(tokens[53])); + } + if (parser_version >= 80) { + if (!empty_attribute(tokens[54])) outputMessage.put(SourceVmUuid, tokens[54].trim()); + if (!empty_attribute(tokens[55])) outputMessage.put(DestinationVmUuid, tokens[55].trim()); + if (!empty_attribute(tokens[56])) outputMessage.put(TunnelId, tokens[56].trim()); + if (!empty_attribute(tokens[57])) outputMessage.put(MonitorTag, tokens[57].trim()); + if (!empty_attribute(tokens[58])) outputMessage.put(ParentSessionId, tokens[58].trim()); + if (!empty_attribute(tokens[59])) outputMessage.put(ParentSessionStartTime, tokens[59].trim()); + if (!empty_attribute(tokens[60])) outputMessage.put(TunnelType, tokens[60].trim()); + } } } - + outputMessage.put(ParserVersion, parser_version); + if (parser_version == 0) { + outputMessage.put(Tokens, tokens.length); + } } - - } http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/paloalto/README.md ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/paloalto/README.md b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/paloalto/README.md new file mode 100644 index 0000000..b6a1230 --- /dev/null +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/paloalto/README.md @@ -0,0 +1,32 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +# BasicPaloAltoFirewallParser +## Introduction +The parser is able to parse logs generated by Palo Alto firewall devices powered by Pan OS. The supported log format is CSV. + +The supported log types and versions + +| Log type | Pan OS version | +|----------|----------------| +| Traffic | 6.0, 6.1, 7.0, 8.0 | +| Threat | 6.0, 6.1, 7.0, 8.0 | +| Config | 6.1, 7.0, 8.0 | +| System | 6.1, 7.0, 8.0 | + + + http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java index e3ad941..5b62e85 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java @@ -25,23 +25,35 @@ import com.github.palindromicity.syslog.SyslogParserBuilder; import com.github.palindromicity.syslog.dsl.SyslogFieldKeys; import org.apache.commons.lang3.StringUtils; import org.apache.metron.parsers.BasicParser; +import org.apache.metron.parsers.DefaultMessageParserResult; +import org.apache.metron.parsers.interfaces.MessageParser; +import org.apache.metron.parsers.interfaces.MessageParserResult; import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.BufferedReader; +import java.io.IOException; import java.io.Reader; +import java.io.Serializable; import java.io.StringReader; +import java.lang.invoke.MethodHandles; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; +import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; /** * Parser for well structured RFC 5424 messages. */ -public class Syslog5424Parser extends BasicParser { +public class Syslog5424Parser implements MessageParser<JSONObject>, Serializable { + protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); public static final String NIL_POLICY_CONFIG = "nilPolicy"; private transient SyslogParser syslogParser; @@ -62,15 +74,31 @@ public class Syslog5424Parser extends BasicParser { } @Override + public boolean validate(JSONObject message) { + JSONObject value = message; + if (!(value.containsKey("original_string"))) { + LOG.trace("[Metron] Message does not have original_string: {}", message); + return false; + } else if (!(value.containsKey("timestamp"))) { + LOG.trace("[Metron] Message does not have timestamp: {}", message); + return false; + } else { + LOG.trace("[Metron] Message conforms to schema: {}", message); + return true; + } + } + + @Override @SuppressWarnings("unchecked") - public List<JSONObject> parse(byte[] rawMessage) { + public Optional<MessageParserResult<JSONObject>> parseOptionalResult(byte[] rawMessage) { try { if (rawMessage == null || rawMessage.length == 0) { - return null; + return Optional.empty(); } String originalString = new String(rawMessage); List<JSONObject> returnList = new ArrayList<>(); + Map<Object,Throwable> errorMap = new HashMap<>(); try (Reader reader = new BufferedReader(new StringReader(originalString))) { syslogParser.parseLines(reader, (m) -> { JSONObject jsonObject = new JSONObject(m); @@ -79,14 +107,14 @@ public class Syslog5424Parser extends BasicParser { jsonObject.put("original_string", originalString); setTimestamp(jsonObject); returnList.add(jsonObject); - }); + },errorMap::put); - return returnList; + return Optional.of(new DefaultMessageParserResult<JSONObject>(returnList,errorMap)); } - } catch (Exception e) { - String message = "Unable to parse " + new String(rawMessage) + ": " + e.getMessage(); + } catch (IOException e) { + String message = "Unable to read buffer " + new String(rawMessage) + ": " + e.getMessage(); LOG.error(message, e); - throw new IllegalStateException(message, e); + return Optional.of(new DefaultMessageParserResult<JSONObject>( new IllegalStateException(message, e))); } } http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponent.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponent.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponent.java new file mode 100644 index 0000000..eb5ff9f --- /dev/null +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponent.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.parsers.topology; + +import org.apache.metron.parsers.interfaces.MessageFilter; +import org.apache.metron.parsers.interfaces.MessageParser; +import org.json.simple.JSONObject; + +import java.io.Serializable; + +public class ParserComponent implements Serializable { + private static final long serialVersionUID = 7880346740026374665L; + + private MessageParser<JSONObject> messageParser; + private MessageFilter<JSONObject> filter; + + public ParserComponent( + MessageParser<JSONObject> messageParser, + MessageFilter<JSONObject> filter) { + this.messageParser = messageParser; + this.filter = filter; + } + + public MessageParser<JSONObject> getMessageParser() { + return messageParser; + } + + public MessageFilter<JSONObject> getFilter() { + return filter; + } + + public void setMessageParser( + MessageParser<JSONObject> messageParser) { + this.messageParser = messageParser; + } + + public void setFilter( + MessageFilter<JSONObject> filter) { + this.filter = filter; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java deleted file mode 100644 index 32d56b9..0000000 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.metron.parsers.topology; - -import java.io.Serializable; -import org.apache.metron.parsers.bolt.WriterHandler; -import org.apache.metron.parsers.interfaces.MessageFilter; -import org.apache.metron.parsers.interfaces.MessageParser; -import org.json.simple.JSONObject; - -public class ParserComponents implements Serializable { - private static final long serialVersionUID = 7880346740026374665L; - - private MessageParser<JSONObject> messageParser; - private MessageFilter<JSONObject> filter; - private WriterHandler writer; - - public ParserComponents( - MessageParser<JSONObject> messageParser, - MessageFilter<JSONObject> filter, - WriterHandler writer) { - this.messageParser = messageParser; - this.filter = filter; - this.writer = writer; - } - - public MessageParser<JSONObject> getMessageParser() { - return messageParser; - } - - public MessageFilter<JSONObject> getFilter() { - return filter; - } - - public WriterHandler getWriter() { - return writer; - } - - public void setMessageParser( - MessageParser<JSONObject> messageParser) { - this.messageParser = messageParser; - } - - public void setFilter( - MessageFilter<JSONObject> filter) { - this.filter = filter; - } - - public void setWriter(WriterHandler writer) { - this.writer = writer; - } -} http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java index d20e1a5..9dc7b88 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,11 +21,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; -import org.apache.commons.lang3.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.metron.common.Constants; @@ -37,12 +37,10 @@ import org.apache.metron.common.utils.KafkaUtils; import org.apache.metron.common.utils.ReflectionUtils; import org.apache.metron.common.writer.BulkMessageWriter; import org.apache.metron.common.writer.MessageWriter; +import org.apache.metron.parsers.ParserRunnerImpl; import org.apache.metron.parsers.bolt.ParserBolt; import org.apache.metron.parsers.bolt.WriterBolt; import org.apache.metron.parsers.bolt.WriterHandler; -import org.apache.metron.parsers.filters.Filters; -import org.apache.metron.parsers.interfaces.MessageFilter; -import org.apache.metron.parsers.interfaces.MessageParser; import org.apache.metron.parsers.topology.config.ValueSupplier; import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder; import org.apache.metron.storm.kafka.flux.SpoutConfiguration; @@ -268,29 +266,14 @@ public class ParserTopologyBuilder { Optional<String> securityProtocol, ParserConfigurations configs, Optional<String> outputTopic) { - - Map<String, ParserComponents> parserBoltConfigs = new HashMap<>(); + Map<String, WriterHandler> writerConfigs = new HashMap<>(); for( Entry<String, SensorParserConfig> entry : sensorTypeToParserConfig.entrySet()) { String sensorType = entry.getKey(); SensorParserConfig parserConfig = entry.getValue(); - // create message parser - MessageParser<JSONObject> parser = ReflectionUtils - .createInstance(parserConfig.getParserClassName()); - parser.configure(parserConfig.getParserConfig()); - - // create message filter - MessageFilter<JSONObject> filter = null; - if (!StringUtils.isEmpty(parserConfig.getFilterClassName())) { - filter = Filters.get( - parserConfig.getFilterClassName(), - parserConfig.getParserConfig() - ); - } // create a writer AbstractWriter writer; if (parserConfig.getWriterClassName() == null) { - // if not configured, use a sensible default writer = createKafkaWriter(brokerUrl, zookeeperUrl, securityProtocol) .withTopic(outputTopic.orElse(Constants.ENRICHMENT_TOPIC)); @@ -304,16 +287,10 @@ public class ParserTopologyBuilder { // create a writer handler WriterHandler writerHandler = createWriterHandler(writer); - - ParserComponents components = new ParserComponents( - parser, - filter, - writerHandler - ); - parserBoltConfigs.put(sensorType, components); + writerConfigs.put(sensorType, writerHandler); } - return new ParserBolt(zookeeperUrl, parserBoltConfigs); + return new ParserBolt(zookeeperUrl, new ParserRunnerImpl(new HashSet<>(sensorTypeToParserConfig.keySet())), writerConfigs); } /** http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/websphere/GrokWebSphereParser.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/websphere/GrokWebSphereParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/websphere/GrokWebSphereParser.java index 178719b..a58e0c9 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/websphere/GrokWebSphereParser.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/websphere/GrokWebSphereParser.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -27,117 +27,114 @@ import java.util.Iterator; public class GrokWebSphereParser extends GrokParser { - private static final long serialVersionUID = 4860439408055777358L; + private static final long serialVersionUID = 4860439408055777358L; - @Override - protected long formatTimestamp(Object value) { - long epochTimestamp = System.currentTimeMillis(); - if (value != null) { - try { - epochTimestamp = toEpoch(Calendar.getInstance().get(Calendar.YEAR) + " " + value); - } catch (ParseException e) { - //default to current time - } - } - return epochTimestamp; - } + @Override + protected long formatTimestamp(Object value) { + long epochTimestamp = System.currentTimeMillis(); + if (value != null) { + try { + epochTimestamp = toEpoch(Calendar.getInstance().get(Calendar.YEAR) + " " + value); + } catch (ParseException e) { + //default to current time + } + } + return epochTimestamp; + } - @Override - protected void postParse(JSONObject message) { - removeEmptyFields(message); - message.remove("timestamp_string"); - if (message.containsKey("message")) { - String messageValue = (String) message.get("message"); - if (messageValue.contains("logged into")) { - parseLoginMessage(message); - } - else if (messageValue.contains("logged out")) { - parseLogoutMessage(message); - } - else if (messageValue.contains("rbm(")) { - parseRBMMessage(message); - } - else { - parseOtherMessage(message); - } - } - } + @Override + protected void postParse(JSONObject message) { + removeEmptyFields(message); + message.remove("timestamp_string"); + if (message.containsKey("message")) { + String messageValue = (String) message.get("message"); + if (messageValue.contains("logged into")) { + parseLoginMessage(message); + } else if (messageValue.contains("logged out")) { + parseLogoutMessage(message); + } else if (messageValue.contains("rbm(")) { + parseRBMMessage(message); + } else { + parseOtherMessage(message); + } + } + } - @SuppressWarnings("unchecked") - private void removeEmptyFields(JSONObject json) { - Iterator<Object> keyIter = json.keySet().iterator(); - while (keyIter.hasNext()) { - Object key = keyIter.next(); - Object value = json.get(key); - if (null == value || "".equals(value.toString())) { - keyIter.remove(); - } - } - } + @SuppressWarnings("unchecked") + private void removeEmptyFields(JSONObject json) { + Iterator<Object> keyIter = json.keySet().iterator(); + while (keyIter.hasNext()) { + Object key = keyIter.next(); + Object value = json.get(key); + if (null == value || "".equals(value.toString())) { + keyIter.remove(); + } + } + } - //Extracts the appropriate fields from login messages - @SuppressWarnings("unchecked") - private void parseLoginMessage(JSONObject json) { - json.put("event_subtype", "login"); - String message = (String) json.get("message"); - if (message.contains(":")){ - String parts[] = message.split(":"); - String user = parts[0]; - String ip_src_addr = parts[1]; - if (user.contains("user(") && user.contains(")")) { - user = user.substring(user.indexOf("user(") + "user(".length()); - user = user.substring(0, user.indexOf(")")); - json.put("username", user); - } - if (ip_src_addr.contains("[") && ip_src_addr.contains("]")) { - ip_src_addr = ip_src_addr.substring(ip_src_addr.indexOf("[") + 1); - ip_src_addr = ip_src_addr.substring(0, ip_src_addr.indexOf("]")); - json.put("ip_src_addr", ip_src_addr); - } - json.remove("message"); - } - } + //Extracts the appropriate fields from login messages + @SuppressWarnings("unchecked") + private void parseLoginMessage(JSONObject json) { + json.put("event_subtype", "login"); + String message = (String) json.get("message"); + if (message.contains(":")) { + String[] parts = message.split(":"); + String user = parts[0]; + String ip_src_addr = parts[1]; + if (user.contains("user(") && user.contains(")")) { + user = user.substring(user.indexOf("user(") + "user(".length()); + user = user.substring(0, user.indexOf(")")); + json.put("username", user); + } + if (ip_src_addr.contains("[") && ip_src_addr.contains("]")) { + ip_src_addr = ip_src_addr.substring(ip_src_addr.indexOf("[") + 1); + ip_src_addr = ip_src_addr.substring(0, ip_src_addr.indexOf("]")); + json.put("ip_src_addr", ip_src_addr); + } + json.remove("message"); + } + } - //Extracts the appropriate fields from logout messages - @SuppressWarnings("unchecked") - private void parseLogoutMessage(JSONObject json) { - json.put("event_subtype", "logout"); - String message = (String) json.get("message"); - if (message.matches(".*'.*'.*'.*'.*")) { - String parts[] = message.split("'"); - String ip_src_addr = parts[0]; - if (ip_src_addr.contains("[") && ip_src_addr.contains("]")) { - ip_src_addr = ip_src_addr.substring(ip_src_addr.indexOf("[") + 1); - ip_src_addr = ip_src_addr.substring(0, ip_src_addr.indexOf("]")); - json.put("ip_src_addr", ip_src_addr); - } - json.put("username", parts[1]); - json.put("security_domain", parts[3]); - json.remove("message"); - } - } + //Extracts the appropriate fields from logout messages + @SuppressWarnings("unchecked") + private void parseLogoutMessage(JSONObject json) { + json.put("event_subtype", "logout"); + String message = (String) json.get("message"); + if (message.matches(".*'.*'.*'.*'.*")) { + String parts[] = message.split("'"); + String ip_src_addr = parts[0]; + if (ip_src_addr.contains("[") && ip_src_addr.contains("]")) { + ip_src_addr = ip_src_addr.substring(ip_src_addr.indexOf("[") + 1); + ip_src_addr = ip_src_addr.substring(0, ip_src_addr.indexOf("]")); + json.put("ip_src_addr", ip_src_addr); + } + json.put("username", parts[1]); + json.put("security_domain", parts[3]); + json.remove("message"); + } + } - //Extracts the appropriate fields from RBM messages - @SuppressWarnings("unchecked") - private void parseRBMMessage(JSONObject json) { - String message = (String) json.get("message"); - if (message.contains("(")) { - json.put("process", message.substring(0, message.indexOf("("))); - if (message.contains(":")) { - json.put("message", message.substring(message.indexOf(":") + 2)); - } - } - } + //Extracts the appropriate fields from RBM messages + @SuppressWarnings("unchecked") + private void parseRBMMessage(JSONObject json) { + String message = (String) json.get("message"); + if (message.contains("(")) { + json.put("process", message.substring(0, message.indexOf("("))); + if (message.contains(":")) { + json.put("message", message.substring(message.indexOf(":") + 2)); + } + } + } - //Extracts the appropriate fields from other messages - @SuppressWarnings("unchecked") - private void parseOtherMessage(JSONObject json) { - String message = (String) json.get("message"); - if (message.contains("(")) { - json.put("process", message.substring(0, message.indexOf("("))); - if (message.contains(":")) { - json.put("message", message.substring(message.indexOf(":") + 2)); - } - } - } + //Extracts the appropriate fields from other messages + @SuppressWarnings("unchecked") + private void parseOtherMessage(JSONObject json) { + String message = (String) json.get("message"); + if (message.contains("(")) { + json.put("process", message.substring(0, message.indexOf("("))); + if (message.contains(":")) { + json.put("message", message.substring(message.indexOf(":") + 2)); + } + } + } } http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java index 8441409..2f3784a 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java @@ -42,8 +42,8 @@ public class FiltersTest { put("filter.query", "exists(foo)"); }}; MessageFilter<JSONObject> filter = Filters.get(Filters.STELLAR.name(), config); - Assert.assertTrue(filter.emitTuple(new JSONObject(ImmutableMap.of("foo", 1)), Context.EMPTY_CONTEXT())); - Assert.assertFalse(filter.emitTuple(new JSONObject(ImmutableMap.of("bar", 1)), Context.EMPTY_CONTEXT())); + Assert.assertTrue(filter.emit(new JSONObject(ImmutableMap.of("foo", 1)), Context.EMPTY_CONTEXT())); + Assert.assertFalse(filter.emit(new JSONObject(ImmutableMap.of("bar", 1)), Context.EMPTY_CONTEXT())); } } http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java index 1a50dea..24386fa 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java @@ -19,6 +19,7 @@ package org.apache.metron.parsers; import com.google.common.collect.MapDifference; import com.google.common.collect.Maps; +import org.apache.metron.parsers.interfaces.MessageParserResult; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; @@ -30,6 +31,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; public abstract class GrokParserTest { @@ -53,8 +55,10 @@ public abstract class GrokParserTest { JSONObject expected = (JSONObject) jsonParser.parse(e.getValue()); byte[] rawMessage = e.getKey().getBytes(); - - List<JSONObject> parsedList = grokParser.parse(rawMessage); + Optional<MessageParserResult<JSONObject>> resultOptional = grokParser.parseOptionalResult(rawMessage); + Assert.assertNotNull(resultOptional); + Assert.assertTrue(resultOptional.isPresent()); + List<JSONObject> parsedList = resultOptional.get().getMessages(); Assert.assertEquals(1, parsedList.size()); compare(expected, parsedList.get(0)); } @@ -92,4 +96,5 @@ public abstract class GrokParserTest { public abstract List<String> getTimeFields(); public abstract String getDateFormat(); public abstract String getTimestampField(); + public abstract String getMultiLine(); } http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java index 9769baa..4842a1f 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MessageParserTest.java @@ -19,68 +19,118 @@ package org.apache.metron.parsers; import org.apache.metron.parsers.interfaces.MessageParser; +import org.apache.metron.parsers.interfaces.MessageParserResult; +import org.json.simple.JSONObject; import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; public class MessageParserTest { - @Test - public void testNullable() throws Exception { - MessageParser parser = new MessageParser() { - @Override - public void init() { - } + abstract class TestMessageParser implements MessageParser<JSONObject> { + @Override + public void init() { + } - @Override - public List parse(byte[] rawMessage) { - return null; - } + @Override + public boolean validate(JSONObject message) { + return false; + } - @Override - public boolean validate(Object message) { - return false; - } + @Override + public void configure(Map<String, Object> config) { - @Override - public void configure(Map<String, Object> config) { + } + } + @Test + public void testNullable() throws Exception { + MessageParser parser = new TestMessageParser() { + @Override + public List<JSONObject> parse(byte[] rawMessage) { + return null; } }; - Assert.assertNotNull(parser.parseOptional(null)); - Assert.assertFalse(parser.parseOptional(null).isPresent()); + Assert.assertNotNull(parser.parseOptionalResult(null)); + Assert.assertFalse(parser.parseOptionalResult(null).isPresent()); } @Test public void testNotNullable() throws Exception { - MessageParser parser = new MessageParser() { + MessageParser<JSONObject> parser = new TestMessageParser() { @Override - public void init() { - + public List<JSONObject> parse(byte[] rawMessage) { + return new ArrayList<>(); } + }; + Assert.assertNotNull(parser.parseOptionalResult(null)); + Optional<MessageParserResult<JSONObject>> ret = parser.parseOptionalResult(null); + Assert.assertTrue(ret.isPresent()); + Assert.assertEquals(0, ret.get().getMessages().size()); + } + @Test + public void testParse() { + JSONObject message = new JSONObject(); + MessageParser<JSONObject> parser = new TestMessageParser() { @Override - public List parse(byte[] rawMessage) { - return new ArrayList<>(); + public List<JSONObject> parse(byte[] rawMessage) { + return Collections.singletonList(message); } + }; + Optional<MessageParserResult<JSONObject>> ret = parser.parseOptionalResult("message".getBytes()); + Assert.assertTrue(ret.isPresent()); + Assert.assertEquals(1, ret.get().getMessages().size()); + Assert.assertEquals(message, ret.get().getMessages().get(0)); + } + @Test + public void testParseOptional() { + JSONObject message = new JSONObject(); + MessageParser<JSONObject> parser = new TestMessageParser() { @Override - public boolean validate(Object message) { - return false; + public Optional<List<JSONObject>> parseOptional(byte[] rawMessage) { + return Optional.of(Collections.singletonList(message)); } + }; + Optional<MessageParserResult<JSONObject>> ret = parser.parseOptionalResult("message".getBytes()); + Assert.assertTrue(ret.isPresent()); + Assert.assertEquals(1, ret.get().getMessages().size()); + Assert.assertEquals(message, ret.get().getMessages().get(0)); + } + @Test + public void testParseException() { + MessageParser<JSONObject> parser = new TestMessageParser() { @Override - public void configure(Map<String, Object> config) { + public List<JSONObject> parse(byte[] rawMessage) { + throw new RuntimeException("parse exception"); + } + }; + Optional<MessageParserResult<JSONObject>> ret = parser.parseOptionalResult("message".getBytes()); + Assert.assertTrue(ret.isPresent()); + Assert.assertTrue(ret.get().getMasterThrowable().isPresent()); + Assert.assertEquals("parse exception", ret.get().getMasterThrowable().get().getMessage()); + } + @Test + public void testParseOptionalException() { + MessageParser<JSONObject> parser = new TestMessageParser() { + @Override + public Optional<List<JSONObject>> parseOptional(byte[] rawMessage) { + throw new RuntimeException("parse exception"); } }; - Assert.assertNotNull(parser.parseOptional(null)); - Optional<List> ret = parser.parseOptional(null); + Optional<MessageParserResult<JSONObject>> ret = parser.parseOptionalResult("message".getBytes()); Assert.assertTrue(ret.isPresent()); - Assert.assertEquals(0, ret.get().size()); + Assert.assertTrue(ret.get().getMasterThrowable().isPresent()); + Assert.assertEquals("parse exception", ret.get().getMasterThrowable().get().getMessage()); } + }