METRON-1820 Update to new Simple-Syslog-5424 version to support error handling (ottobackwards) closes apache/metron#1234
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/08f3de0f Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/08f3de0f Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/08f3de0f Branch: refs/heads/feature/METRON-1090-stellar-assignment Commit: 08f3de0fe31fefa828952cbe76456580a4697630 Parents: 6214150 Author: ottobackwards <ottobackwa...@gmail.com> Authored: Mon Oct 15 16:36:36 2018 -0400 Committer: otto <o...@apache.org> Committed: Mon Oct 15 16:36:36 2018 -0400 ---------------------------------------------------------------------- dependencies_with_url.csv | 2 +- .../impl/SensorParserConfigServiceImpl.java | 65 +++++++++++++- .../org/apache/metron/parsers/GrokParser.java | 18 +--- .../apache/metron/parsers/bolt/ParserBolt.java | 37 +++++++- .../parsers/interfaces/MessageParser.java | 18 ---- .../interfaces/MultilineMessageParser.java | 51 +++++++++++ .../metron/parsers/syslog/Syslog5424Parser.java | 44 ++++++++-- .../apache/metron/parsers/GrokParserTest.java | 8 +- .../metron/parsers/MultiLineGrokParserTest.java | 5 +- .../metron/parsers/bolt/ParserBoltTest.java | 91 +++++++++++--------- .../parsers/syslog/Syslog5424ParserTest.java | 55 +++++++++--- .../websphere/GrokWebSphereParserTest.java | 56 ++++++++---- pom.xml | 2 +- 13 files changed, 335 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/08f3de0f/dependencies_with_url.csv ---------------------------------------------------------------------- diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv index fb6c03c..2e1eedd 100644 --- a/dependencies_with_url.csv +++ b/dependencies_with_url.csv @@ -484,4 +484,4 @@ org.sonatype.sisu:sisu-inject-bean:jar:2.2.2:compile org.sonatype.sisu:sisu-inject-plexus:jar:2.2.2:compile com.zaxxer:HikariCP:jar:2.7.8:compile,ASLv2,https://github.com/brettwooldridge/HikariCP org.hibernate.validator:hibernate-validator:jar:6.0.9.Final:compile,ASLv2,https://github.com/hibernate/hibernate-validator -com.github.palindromicity:simple-syslog-5424:jar:0.0.8:compile,ASLv2,https://github.com/palindromicity/simple-syslog-5424 +com.github.palindromicity:simple-syslog-5424:jar:0.0.9:compile,ASLv2,https://github.com/palindromicity/simple-syslog-5424 http://git-wip-us.apache.org/repos/asf/metron/blob/08f3de0f/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java index 85b84b8..4cd272e 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import org.apache.curator.framework.CuratorFramework; import org.apache.hadoop.fs.Path; @@ -33,6 +34,8 @@ import org.apache.metron.common.configuration.ParserConfigurations; import org.apache.metron.common.configuration.SensorParserConfig; import org.apache.metron.common.zookeeper.ConfigurationsCache; import org.apache.metron.parsers.interfaces.MessageParser; +import org.apache.metron.parsers.interfaces.MessageParserResult; +import org.apache.metron.parsers.interfaces.MultilineMessageParser; import org.apache.metron.rest.MetronRestConstants; import org.apache.metron.rest.RestException; import org.apache.metron.rest.model.ParseMessageRequest; @@ -138,13 +141,53 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService } else if (sensorParserConfig.getParserClassName() == null) { throw new RestException("SensorParserConfig must have a parserClassName"); } else { - MessageParser<JSONObject> parser; + MultilineMessageParser<JSONObject> parser; + Object parserObject; try { - parser = (MessageParser<JSONObject>) Class.forName(sensorParserConfig.getParserClassName()) + parserObject = Class.forName(sensorParserConfig.getParserClassName()) .newInstance(); } catch (Exception e) { throw new RestException(e.toString(), e.getCause()); } + + if (!(parserObject instanceof MultilineMessageParser)) { + parser = new MultilineMessageParser<JSONObject>() { + + @Override + @SuppressWarnings("unchecked") + public void configure(Map<String, Object> config) { + ((MessageParser<JSONObject>)parserObject).configure(config); + } + + @Override + @SuppressWarnings("unchecked") + public void init() { + ((MessageParser<JSONObject>)parserObject).init(); + } + + @Override + @SuppressWarnings("unchecked") + public boolean validate(JSONObject message) { + return ((MessageParser<JSONObject>)parserObject).validate(message); + } + + @Override + @SuppressWarnings("unchecked") + public List<JSONObject> parse(byte[] message) { + return ((MessageParser<JSONObject>)parserObject).parse(message); + } + + @Override + @SuppressWarnings("unchecked") + public Optional<List<JSONObject>> parseOptional(byte[] message) { + return ((MessageParser<JSONObject>)parserObject).parseOptional(message); + } + }; + } else { + parser = (MultilineMessageParser<JSONObject>)parserObject; + } + + Path temporaryGrokPath = null; if (isGrokConfig(sensorParserConfig)) { String name = parseMessageRequest.getSensorParserConfig().getSensorTopic(); @@ -152,13 +195,27 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService sensorParserConfig.getParserConfig() .put(MetronRestConstants.GROK_PATH_KEY, new Path(temporaryGrokPath, name).toString()); } + parser.configure(sensorParserConfig.getParserConfig()); parser.init(); - JSONObject results = parser.parse(parseMessageRequest.getSampleData().getBytes()).get(0); + + Optional<MessageParserResult<JSONObject>> result = parser.parseOptionalResult(parseMessageRequest.getSampleData().getBytes()); + if (!result.isPresent()) { + throw new RestException("Unknown error parsing sample data"); + } + + if (result.get().getMasterThrowable().isPresent()) { + throw new RestException("Error parsing sample data",result.get().getMasterThrowable().get()); + } + + if (result.get().getMessages().isEmpty()) { + throw new RestException("No results from parsing sample data"); + } + if (isGrokConfig(sensorParserConfig) && temporaryGrokPath != null) { grokService.deleteTemporary(); } - return results; + return result.get().getMessages().get(0); } } http://git-wip-us.apache.org/repos/asf/metron/blob/08f3de0f/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java index bead477..a81149d 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path; import org.apache.metron.common.Constants; import org.apache.metron.parsers.interfaces.MessageParser; import org.apache.metron.parsers.interfaces.MessageParserResult; +import org.apache.metron.parsers.interfaces.MultilineMessageParser; import org.json.simple.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +54,7 @@ import java.util.Optional; import java.util.TimeZone; -public class GrokParser implements MessageParser<JSONObject>, Serializable { +public class GrokParser implements MultilineMessageParser<JSONObject>, Serializable { protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -143,21 +144,6 @@ public class GrokParser implements MessageParser<JSONObject>, Serializable { @SuppressWarnings("unchecked") @Override - public List<JSONObject> parse(byte[] rawMessage) { - Optional<MessageParserResult<JSONObject>> resultOptional = parseOptionalResult(rawMessage); - if (!resultOptional.isPresent()) { - return Collections.EMPTY_LIST; - } - Map<Object,Throwable> errors = resultOptional.get().getMessageThrowables(); - if (!errors.isEmpty()) { - throw new RuntimeException(errors.entrySet().iterator().next().getValue()); - } - - return resultOptional.get().getMessages(); - } - - @SuppressWarnings("unchecked") - @Override public Optional<MessageParserResult<JSONObject>> parseOptionalResult(byte[] rawMessage) { if (grok == null) { init(); http://git-wip-us.apache.org/repos/asf/metron/blob/08f3de0f/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java index ff5c1d4..05334c2 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java @@ -37,6 +37,7 @@ import org.apache.metron.parsers.filters.Filters; import org.apache.metron.parsers.interfaces.MessageFilter; import org.apache.metron.parsers.interfaces.MessageParser; import org.apache.metron.parsers.interfaces.MessageParserResult; +import org.apache.metron.parsers.interfaces.MultilineMessageParser; import org.apache.metron.parsers.topology.ParserComponents; import org.apache.metron.stellar.common.CachingStellarProcessor; import org.apache.metron.stellar.dsl.Context; @@ -307,9 +308,43 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable { , sensorParserConfig.getReadMetadata() , sensorParserConfig.getRawMessageStrategyConfig() ); + metadata = rawMessage.getMetadata(); - Optional<MessageParserResult<JSONObject>> results = parser.parseOptionalResult(rawMessage.getMessage()); + MultilineMessageParser mmp = null; + if (!(parser instanceof MultilineMessageParser)) { + mmp = new MultilineMessageParser<JSONObject>() { + + @Override + public void configure(Map<String, Object> config) { + parser.configure(config); + } + + @Override + public void init() { + parser.init(); + } + + @Override + public boolean validate(JSONObject message) { + return parser.validate(message); + } + + @Override + public List<JSONObject> parse(byte[] message) { + return parser.parse(message); + } + + @Override + public Optional<List<JSONObject>> parseOptional(byte[] message) { + return parser.parseOptional(message); + } + }; + } else { + mmp = (MultilineMessageParser) parser; + } + + Optional<MessageParserResult<JSONObject>> results = mmp.parseOptionalResult(rawMessage.getMessage()); // check if there is a master error if (results.isPresent() && results.get().getMasterThrowable().isPresent()) { http://git-wip-us.apache.org/repos/asf/metron/blob/08f3de0f/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java index f8243b9..665076b 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java @@ -50,24 +50,6 @@ public interface MessageParser<T> extends Configurable { } /** - * Take raw data and convert it to messages. Each raw message may produce multiple messages and therefore - * multiple errors. A {@link MessageParserResult} is returned, which will have both the messages produced - * and the errors. - * @param parseMessage the raw bytes of the message - * @return Optional of {@link MessageParserResult} - */ - default Optional<MessageParserResult<T>> parseOptionalResult(byte[] parseMessage) { - List<T> list = new ArrayList<>(); - try { - Optional<List<T>> optionalMessages = parseOptional(parseMessage); - optionalMessages.ifPresent(list::addAll); - } catch (Throwable t) { - return Optional.of(new DefaultMessageParserResult<>(t)); - } - return Optional.of(new DefaultMessageParserResult<T>(list)); - } - - /** * Validate the message to ensure that it's correct. * @param message the message to validate * @return true if the message is valid, false if not http://git-wip-us.apache.org/repos/asf/metron/blob/08f3de0f/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MultilineMessageParser.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MultilineMessageParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MultilineMessageParser.java new file mode 100644 index 0000000..7818f9a --- /dev/null +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MultilineMessageParser.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.parsers.interfaces; + +import org.apache.commons.lang3.NotImplementedException; +import org.apache.metron.parsers.DefaultMessageParserResult; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +public interface MultilineMessageParser<T> extends MessageParser<T> { + + default List<T> parse(byte[] rawMessage) { + throw new NotImplementedException("parse is not implemented"); + } + + /** + * Take raw data and convert it to messages. Each raw message may produce multiple messages and therefore + * multiple errors. A {@link MessageParserResult} is returned, which will have both the messages produced + * and the errors. + * @param parseMessage the raw bytes of the message + * @return Optional of {@link MessageParserResult} + */ + default Optional<MessageParserResult<T>> parseOptionalResult(byte[] parseMessage) { + List<T> list = new ArrayList<>(); + try { + Optional<List<T>> optionalMessages = parseOptional(parseMessage); + optionalMessages.ifPresent(list::addAll); + } catch (Throwable t) { + return Optional.of(new DefaultMessageParserResult<>(t)); + } + return Optional.of(new DefaultMessageParserResult<T>(list)); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/08f3de0f/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java index e3ad941..79a082a 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/syslog/Syslog5424Parser.java @@ -25,23 +25,35 @@ import com.github.palindromicity.syslog.SyslogParserBuilder; import com.github.palindromicity.syslog.dsl.SyslogFieldKeys; import org.apache.commons.lang3.StringUtils; import org.apache.metron.parsers.BasicParser; +import org.apache.metron.parsers.DefaultMessageParserResult; +import org.apache.metron.parsers.interfaces.MessageParserResult; +import org.apache.metron.parsers.interfaces.MultilineMessageParser; import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.BufferedReader; +import java.io.IOException; import java.io.Reader; +import java.io.Serializable; import java.io.StringReader; +import java.lang.invoke.MethodHandles; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; +import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; /** * Parser for well structured RFC 5424 messages. */ -public class Syslog5424Parser extends BasicParser { +public class Syslog5424Parser implements MultilineMessageParser<JSONObject>, Serializable { + protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); public static final String NIL_POLICY_CONFIG = "nilPolicy"; private transient SyslogParser syslogParser; @@ -62,15 +74,31 @@ public class Syslog5424Parser extends BasicParser { } @Override + public boolean validate(JSONObject message) { + JSONObject value = message; + if (!(value.containsKey("original_string"))) { + LOG.trace("[Metron] Message does not have original_string: {}", message); + return false; + } else if (!(value.containsKey("timestamp"))) { + LOG.trace("[Metron] Message does not have timestamp: {}", message); + return false; + } else { + LOG.trace("[Metron] Message conforms to schema: {}", message); + return true; + } + } + + @Override @SuppressWarnings("unchecked") - public List<JSONObject> parse(byte[] rawMessage) { + public Optional<MessageParserResult<JSONObject>> parseOptionalResult(byte[] rawMessage) { try { if (rawMessage == null || rawMessage.length == 0) { - return null; + return Optional.empty(); } String originalString = new String(rawMessage); List<JSONObject> returnList = new ArrayList<>(); + Map<Object,Throwable> errorMap = new HashMap<>(); try (Reader reader = new BufferedReader(new StringReader(originalString))) { syslogParser.parseLines(reader, (m) -> { JSONObject jsonObject = new JSONObject(m); @@ -79,14 +107,14 @@ public class Syslog5424Parser extends BasicParser { jsonObject.put("original_string", originalString); setTimestamp(jsonObject); returnList.add(jsonObject); - }); + },errorMap::put); - return returnList; + return Optional.of(new DefaultMessageParserResult<JSONObject>(returnList,errorMap)); } - } catch (Exception e) { - String message = "Unable to parse " + new String(rawMessage) + ": " + e.getMessage(); + } catch (IOException e) { + String message = "Unable to read buffer " + new String(rawMessage) + ": " + e.getMessage(); LOG.error(message, e); - throw new IllegalStateException(message, e); + return Optional.of(new DefaultMessageParserResult<JSONObject>( new IllegalStateException(message, e))); } } http://git-wip-us.apache.org/repos/asf/metron/blob/08f3de0f/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java index 7fa6a31..24386fa 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java @@ -19,6 +19,7 @@ package org.apache.metron.parsers; import com.google.common.collect.MapDifference; import com.google.common.collect.Maps; +import org.apache.metron.parsers.interfaces.MessageParserResult; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; @@ -30,6 +31,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; public abstract class GrokParserTest { @@ -53,8 +55,10 @@ public abstract class GrokParserTest { JSONObject expected = (JSONObject) jsonParser.parse(e.getValue()); byte[] rawMessage = e.getKey().getBytes(); - - List<JSONObject> parsedList = grokParser.parse(rawMessage); + Optional<MessageParserResult<JSONObject>> resultOptional = grokParser.parseOptionalResult(rawMessage); + Assert.assertNotNull(resultOptional); + Assert.assertTrue(resultOptional.isPresent()); + List<JSONObject> parsedList = resultOptional.get().getMessages(); Assert.assertEquals(1, parsedList.size()); compare(expected, parsedList.get(0)); } http://git-wip-us.apache.org/repos/asf/metron/blob/08f3de0f/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java index cc4d20f..e24a39d 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java @@ -61,7 +61,10 @@ public class MultiLineGrokParserTest { Map<String, String> testData = getTestData(); for (Map.Entry<String, String> e : testData.entrySet()) { byte[] rawMessage = e.getKey().getBytes(); - List<JSONObject> parsedList = grokParser.parse(rawMessage); + Optional<MessageParserResult<JSONObject>> resultOptional = grokParser.parseOptionalResult(rawMessage); + Assert.assertNotNull(resultOptional); + Assert.assertTrue(resultOptional.isPresent()); + List<JSONObject> parsedList = resultOptional.get().getMessages(); Assert.assertEquals(10, parsedList.size()); } } http://git-wip-us.apache.org/repos/asf/metron/blob/08f3de0f/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java index 0ae2817..e5e7180 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java @@ -17,29 +17,7 @@ */ package org.apache.metron.parsers.bolt; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.argThat; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - import com.google.common.collect.ImmutableList; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; import org.adrianwalker.multilinestring.Multiline; import org.apache.curator.framework.CuratorFramework; import org.apache.hadoop.hbase.util.Bytes; @@ -55,10 +33,10 @@ import org.apache.metron.common.writer.BulkMessageWriter; import org.apache.metron.common.writer.BulkWriterResponse; import org.apache.metron.common.writer.MessageWriter; import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater; -import org.apache.metron.parsers.BasicParser; import org.apache.metron.parsers.DefaultMessageParserResult; import org.apache.metron.parsers.interfaces.MessageFilter; -import org.apache.metron.parsers.interfaces.MessageParser; +import org.apache.metron.parsers.interfaces.MessageParserResult; +import org.apache.metron.parsers.interfaces.MultilineMessageParser; import org.apache.metron.parsers.topology.ParserComponents; import org.apache.metron.stellar.dsl.Context; import org.apache.metron.test.bolt.BaseBoltTest; @@ -72,10 +50,33 @@ import org.junit.Assert; import org.junit.Test; import org.mockito.Mock; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.argThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + public class ParserBoltTest extends BaseBoltTest { @Mock - private MessageParser<JSONObject> parser; + private MultilineMessageParser<JSONObject> parser; @Mock private MessageWriter<JSONObject> writer; @@ -210,7 +211,7 @@ public class ParserBoltTest extends BaseBoltTest { byte[] sampleBinary = "some binary message".getBytes(); when(tuple.getBinary(0)).thenReturn(sampleBinary); - when(parser.parseOptional(sampleBinary)).thenReturn(null); + when(parser.parseOptionalResult(sampleBinary)).thenReturn(null); parserBolt.execute(tuple); verify(parser, times(0)).validate(any()); verify(writer, times(0)).write(eq(sensorType), any(ParserWriterConfiguration.class), eq(tuple), any()); @@ -405,9 +406,9 @@ public class ParserBoltTest extends BaseBoltTest { verify(parser, times(1)).init(); verify(batchWriter, times(1)).init(any(), any(), any()); when(parser.validate(any())).thenReturn(true); - when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject(new HashMap<String, Object>() {{ + when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject(new HashMap<String, Object>() {{ put("field2", "blah"); - }})))); + }}))))); parserBolt.execute(t1); verify(batchWriter, times(0)).write(any(), any(), any(), any()); verify(outputCollector, times(1)).ack(t1); @@ -437,26 +438,36 @@ public class ParserBoltTest extends BaseBoltTest { String sensorType = "dummy"; RecordingWriter recordingWriter = new RecordingWriter(); //create a parser which acts like a basic parser but returns no timestamp field. - BasicParser dummyParser = new BasicParser() { + MultilineMessageParser<JSONObject> dummyParser = new MultilineMessageParser<JSONObject>() { @Override - public void init() { - + public void configure(Map<String, Object> config) { } @Override - public List<JSONObject> parse(byte[] rawMessage) { - return ImmutableList.of(new JSONObject() {{ - put("data", "foo"); - put("timestampstr", "2016-01-05 17:02:30"); - put("original_string", "blah"); - }}); + public void init() { } @Override - public void configure(Map<String, Object> config) { + public boolean validate(JSONObject message) { + Object timestampObject = message.get(Constants.Fields.TIMESTAMP.getName()); + if (timestampObject instanceof Long) { + Long timestamp = (Long) timestampObject; + return timestamp > 0; + } + return false; + } + @Override + @SuppressWarnings("unchecked") + public Optional<MessageParserResult<JSONObject>> parseOptionalResult(byte[] rawMessage) { + return Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject() {{ + put("data", "foo"); + put("timestampstr", "2016-01-05 17:02:30"); + put("original_string", "blah"); + }}))); } }; + Map<String, ParserComponents> parserMap = Collections.singletonMap( sensorType, new ParserComponents( @@ -502,7 +513,7 @@ public class ParserBoltTest extends BaseBoltTest { verify(parser, times(1)).init(); verify(batchWriter, times(1)).init(any(), any(), any()); when(parser.validate(any())).thenReturn(true); - when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject()))); + when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject())))); when(filter.emitTuple(any(), any(Context.class))).thenReturn(true); BulkWriterResponse response = new BulkWriterResponse(); Tuple[] uniqueTuples = new Tuple[ParserConfigurations.DEFAULT_KAFKA_BATCH_SIZE]; @@ -592,7 +603,7 @@ public class ParserBoltTest extends BaseBoltTest { verify(parser, times(1)).init(); verify(batchWriter, times(1)).init(any(), any(), any()); when(parser.validate(any())).thenReturn(true); - when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject()))); + when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject())))); when(filter.emitTuple(any(), any(Context.class))).thenReturn(true); BulkWriterResponse response = new BulkWriterResponse(); response.addSuccess(t1); http://git-wip-us.apache.org/repos/asf/metron/blob/08f3de0f/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/syslog/Syslog5424ParserTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/syslog/Syslog5424ParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/syslog/Syslog5424ParserTest.java index 0ef26ff..b3e4507 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/syslog/Syslog5424ParserTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/syslog/Syslog5424ParserTest.java @@ -20,6 +20,7 @@ package org.apache.metron.parsers.syslog; import com.github.palindromicity.syslog.NilPolicy; import com.github.palindromicity.syslog.dsl.SyslogFieldKeys; +import org.apache.metron.parsers.interfaces.MessageParserResult; import org.json.simple.JSONObject; import org.junit.Assert; import org.junit.Test; @@ -28,6 +29,7 @@ import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.Consumer; public class Syslog5424ParserTest { @@ -87,7 +89,7 @@ public class Syslog5424ParserTest { }); } - @Test(expected = IllegalStateException.class) + @Test() public void testNotValid() { test(null, "not valid", (message) -> Assert.assertTrue(false)); } @@ -100,7 +102,7 @@ public class Syslog5424ParserTest { } parser.configure(config); - List<JSONObject> output = parser.parse(line.getBytes()); + parser.parseOptionalResult(line.getBytes()); } @Test @@ -116,8 +118,33 @@ public class Syslog5424ParserTest { .append(SYSLOG_LINE_MISSING) .append("\n") .append(SYSLOG_LINE_ALL); - List<JSONObject> output = parser.parse(builder.toString().getBytes()); - Assert.assertEquals(3,output.size()); + Optional<MessageParserResult<JSONObject>> resultOptional = parser.parseOptionalResult(builder.toString().getBytes()); + Assert.assertNotNull(resultOptional); + Assert.assertTrue(resultOptional.isPresent()); + List<JSONObject> parsedList = resultOptional.get().getMessages(); + Assert.assertEquals(3,parsedList.size()); + } + + @Test + public void testReadMultiLineWithErrors() throws Exception { + Syslog5424Parser parser = new Syslog5424Parser(); + Map<String, Object> config = new HashMap<>(); + config.put(Syslog5424Parser.NIL_POLICY_CONFIG, NilPolicy.DASH.name()); + parser.configure(config); + StringBuilder builder = new StringBuilder(); + builder + .append("HEREWEGO!!!!\n") + .append(SYSLOG_LINE_ALL) + .append("\n") + .append(SYSLOG_LINE_MISSING) + .append("\n") + .append("BOOM!\n") + .append(SYSLOG_LINE_ALL) + .append("\nOHMY!"); + Optional<MessageParserResult<JSONObject>> output = parser.parseOptionalResult(builder.toString().getBytes()); + Assert.assertTrue(output.isPresent()); + Assert.assertEquals(3,output.get().getMessages().size()); + Assert.assertEquals(3,output.get().getMessageThrowables().size()); } @Test @@ -126,21 +153,29 @@ public class Syslog5424ParserTest { Map<String, Object> config = new HashMap<>(); config.put(Syslog5424Parser.NIL_POLICY_CONFIG, NilPolicy.DASH.name()); parser.configure(config); - List<JSONObject> output = parser.parse(SYSLOG_LINE_MISSING_DATE.getBytes()); - String timeStampString = output.get(0).get("timestamp").toString(); + Optional<MessageParserResult<JSONObject>> output = parser.parseOptionalResult(SYSLOG_LINE_MISSING_DATE.getBytes()); + Assert.assertNotNull(output); + Assert.assertTrue(output.isPresent()); + String timeStampString = output.get().getMessages().get(0).get("timestamp").toString(); DateTimeFormatter.ISO_DATE_TIME.parse(timeStampString); config.clear(); config.put(Syslog5424Parser.NIL_POLICY_CONFIG, NilPolicy.NULL.name()); parser.configure(config); - output = parser.parse(SYSLOG_LINE_MISSING_DATE.getBytes()); - timeStampString = output.get(0).get("timestamp").toString(); + output = parser.parseOptionalResult(SYSLOG_LINE_MISSING_DATE.getBytes()); + Assert.assertNotNull(output); + Assert.assertTrue(output.isPresent()); + timeStampString = output.get().getMessages().get(0).get("timestamp").toString(); DateTimeFormatter.ISO_DATE_TIME.parse(timeStampString); config.clear(); config.put(Syslog5424Parser.NIL_POLICY_CONFIG, NilPolicy.OMIT.name()); parser.configure(config); - output = parser.parse(SYSLOG_LINE_MISSING_DATE.getBytes()); - timeStampString = output.get(0).get("timestamp").toString(); + + output = parser.parseOptionalResult(SYSLOG_LINE_MISSING_DATE.getBytes()); + Assert.assertNotNull(output); + Assert.assertTrue(output.isPresent()); + + timeStampString = output.get().getMessages().get(0).get("timestamp").toString(); DateTimeFormatter.ISO_DATE_TIME.parse(timeStampString); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metron/blob/08f3de0f/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java index 2923a4f..eb447d0 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java @@ -24,11 +24,14 @@ import java.time.*; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.log4j.Level; import org.apache.metron.parsers.GrokParser; +import org.apache.metron.parsers.interfaces.MessageParserResult; import org.apache.metron.test.utils.UnitTestHelper; import org.json.simple.JSONObject; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -54,7 +57,10 @@ public class GrokWebSphereParserTest { parser.configure(parserConfig); String testString = "<133>Apr 15 17:47:28 ABCXML1413 [rojOut][0x81000033][auth][notice] user(rick007): " + "[120.43.200.6]: User logged into 'cohlOut'."; - List<JSONObject> result = parser.parse(testString.getBytes()); + Optional<MessageParserResult<JSONObject>> resultOptional = parser.parseOptionalResult(testString.getBytes()); + Assert.assertNotNull(resultOptional); + Assert.assertTrue(resultOptional.isPresent()); + List<JSONObject> result = resultOptional.get().getMessages(); JSONObject parsedJSON = result.get(0); long expectedTimestamp = ZonedDateTime.of(Year.now(UTC).getValue(), 4, 15, 17, 47, 28, 0, UTC).toInstant().toEpochMilli(); @@ -73,14 +79,17 @@ public class GrokWebSphereParserTest { } @Test - public void tetsParseLogoutLine() throws Exception { + public void testParseLogoutLine() throws Exception { //Set up parser, parse message GrokWebSphereParser parser = new GrokWebSphereParser(); parser.configure(parserConfig); String testString = "<134>Apr 15 18:02:27 PHIXML3RWD [0x81000019][auth][info] [14.122.2.201]: " + "User 'hjpotter' logged out from 'default'."; - List<JSONObject> result = parser.parse(testString.getBytes()); + Optional<MessageParserResult<JSONObject>> resultOptional = parser.parseOptionalResult(testString.getBytes()); + Assert.assertNotNull(resultOptional); + Assert.assertTrue(resultOptional.isPresent()); + List<JSONObject> result = resultOptional.get().getMessages(); JSONObject parsedJSON = result.get(0); long expectedTimestamp = ZonedDateTime.of(Year.now(UTC).getValue(), 4, 15, 18, 2, 27, 0, UTC).toInstant().toEpochMilli(); @@ -98,14 +107,17 @@ public class GrokWebSphereParserTest { } @Test - public void tetsParseRBMLine() throws Exception { + public void testParseRBMLine() throws Exception { //Set up parser, parse message GrokWebSphereParser parser = new GrokWebSphereParser(); parser.configure(parserConfig); String testString = "<131>Apr 15 17:36:35 ROBXML3QRS [0x80800018][auth][error] rbm(RBM-Settings): " + "trans(3502888135)[request] gtid(3502888135): RBM: Resource access denied."; - List<JSONObject> result = parser.parse(testString.getBytes()); + Optional<MessageParserResult<JSONObject>> resultOptional = parser.parseOptionalResult(testString.getBytes()); + Assert.assertNotNull(resultOptional); + Assert.assertTrue(resultOptional.isPresent()); + List<JSONObject> result = resultOptional.get().getMessages(); JSONObject parsedJSON = result.get(0); long expectedTimestamp = ZonedDateTime.of(Year.now(UTC).getValue(), 4, 15, 17, 36, 35, 0, UTC).toInstant().toEpochMilli(); @@ -122,16 +134,18 @@ public class GrokWebSphereParserTest { } @Test - public void tetsParseOtherLine() throws Exception { + public void testParseOtherLine() throws Exception { //Set up parser, parse message GrokWebSphereParser parser = new GrokWebSphereParser(); parser.configure(parserConfig); String testString = "<134>Apr 15 17:17:34 SAGPXMLQA333 [0x8240001c][audit][info] trans(191): (admin:default:system:*): " + "ntp-service 'NTP Service' - Operational state down"; - List<JSONObject> result = parser.parse(testString.getBytes()); + Optional<MessageParserResult<JSONObject>> resultOptional = parser.parseOptionalResult(testString.getBytes()); + Assert.assertNotNull(resultOptional); + Assert.assertTrue(resultOptional.isPresent()); + List<JSONObject> result = resultOptional.get().getMessages(); JSONObject parsedJSON = result.get(0); - long expectedTimestamp = ZonedDateTime.of(Year.now(UTC).getValue(), 4, 15, 17, 17, 34, 0, UTC).toInstant().toEpochMilli(); //Compare fields @@ -153,7 +167,10 @@ public class GrokWebSphereParserTest { parser.configure(parserConfig); String testString = "<133>Apr 15 17:47:28 ABCXML1413 [rojOut][0x81000033][auth][notice] rick007): " + "[120.43.200. User logged into 'cohlOut'."; - List<JSONObject> result = parser.parse(testString.getBytes()); + Optional<MessageParserResult<JSONObject>> resultOptional = parser.parseOptionalResult(testString.getBytes()); + Assert.assertNotNull(resultOptional); + Assert.assertTrue(resultOptional.isPresent()); + List<JSONObject> result = resultOptional.get().getMessages(); JSONObject parsedJSON = result.get(0); long expectedTimestamp = ZonedDateTime.of(Year.now(UTC).getValue(), 4, 15, 17, 47, 28, 0, UTC).toInstant().toEpochMilli(); @@ -172,14 +189,17 @@ public class GrokWebSphereParserTest { } @Test - public void tetsParseMalformedLogoutLine() throws Exception { + public void testParseMalformedLogoutLine() throws Exception { //Set up parser, attempt to parse malformed message GrokWebSphereParser parser = new GrokWebSphereParser(); parser.configure(parserConfig); String testString = "<134>Apr 15 18:02:27 PHIXML3RWD [0x81000019][auth][info] [14.122.2.201: " + "User 'hjpotter' logged out from 'default."; - List<JSONObject> result = parser.parse(testString.getBytes()); + Optional<MessageParserResult<JSONObject>> resultOptional = parser.parseOptionalResult(testString.getBytes()); + Assert.assertNotNull(resultOptional); + Assert.assertTrue(resultOptional.isPresent()); + List<JSONObject> result = resultOptional.get().getMessages(); JSONObject parsedJSON = result.get(0); long expectedTimestamp = ZonedDateTime.of(Year.now(UTC).getValue(), 4, 15, 18, 2, 27, 0, UTC).toInstant().toEpochMilli(); @@ -197,14 +217,17 @@ public class GrokWebSphereParserTest { } @Test - public void tetsParseMalformedRBMLine() throws Exception { + public void testParseMalformedRBMLine() throws Exception { //Set up parser, parse message GrokWebSphereParser parser = new GrokWebSphereParser(); parser.configure(parserConfig); String testString = "<131>Apr 15 17:36:35 ROBXML3QRS [0x80800018][auth][error] rbmRBM-Settings): " + "trans3502888135)[request] gtid3502888135) RBM: Resource access denied."; - List<JSONObject> result = parser.parse(testString.getBytes()); + Optional<MessageParserResult<JSONObject>> resultOptional = parser.parseOptionalResult(testString.getBytes()); + Assert.assertNotNull(resultOptional); + Assert.assertTrue(resultOptional.isPresent()); + List<JSONObject> result = resultOptional.get().getMessages(); JSONObject parsedJSON = result.get(0); long expectedTimestamp = ZonedDateTime.of(Year.now(UTC).getValue(), 4, 15, 17, 36, 35, 0, UTC).toInstant().toEpochMilli(); @@ -221,14 +244,17 @@ public class GrokWebSphereParserTest { } @Test - public void tetsParseMalformedOtherLine() throws Exception { + public void testParseMalformedOtherLine() throws Exception { //Set up parser, parse message GrokWebSphereParser parser = new GrokWebSphereParser(); parser.configure(parserConfig); String testString = "<134>Apr 15 17:17:34 SAGPXMLQA333 [0x8240001c][audit][info] trans 191) admindefaultsystem*): " + "ntp-service 'NTP Service' - Operational state down:"; - List<JSONObject> result = parser.parse(testString.getBytes()); + Optional<MessageParserResult<JSONObject>> resultOptional = parser.parseOptionalResult(testString.getBytes()); + Assert.assertNotNull(resultOptional); + Assert.assertTrue(resultOptional.isPresent()); + List<JSONObject> result = resultOptional.get().getMessages(); JSONObject parsedJSON = result.get(0); long expectedTimestamp = ZonedDateTime.of(Year.now(UTC).getValue(), 4, 15, 17, 17, 34, 0, UTC).toInstant().toEpochMilli(); http://git-wip-us.apache.org/repos/asf/metron/blob/08f3de0f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 1e6adb0..f412036 100644 --- a/pom.xml +++ b/pom.xml @@ -137,7 +137,7 @@ <global_reflections_version>0.9.10</global_reflections_version> <global_checkstyle_version>8.0</global_checkstyle_version> <global_log4j_core_version>2.1</global_log4j_core_version> - <global_simple_syslog_version>0.0.8</global_simple_syslog_version> + <global_simple_syslog_version>0.0.9</global_simple_syslog_version> <global_spark_version>2.3.1</global_spark_version> </properties>