METRON-1501 Parser messages that fail to validate are dropped silently (cestella via justinleet) closes apache/metron#972
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/0d847cf5 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/0d847cf5 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/0d847cf5 Branch: refs/heads/feature/METRON-1090-stellar-assignment Commit: 0d847cf5f91dc0d3b3b6838eb4b4de4aa2cf2fec Parents: 19b237d Author: cestella <ceste...@gmail.com> Authored: Tue Apr 3 10:29:19 2018 -0400 Committer: leet <l...@apache.org> Committed: Tue Apr 3 10:29:19 2018 -0400 ---------------------------------------------------------------------- metron-platform/metron-parsers/README.md | 29 +++++++++++++++++++- .../apache/metron/parsers/bolt/ParserBolt.java | 17 ++++++++---- .../metron/parsers/bolt/ParserBoltTest.java | 16 ++++++++--- 3 files changed, 51 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/0d847cf5/metron-platform/metron-parsers/README.md ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md index 3d9fdfe..6b9d62e 100644 --- a/metron-platform/metron-parsers/README.md +++ b/metron-platform/metron-parsers/README.md @@ -45,7 +45,34 @@ There are two general types types of parsers: * `ERROR` : Throw an error when a multidimensional map is encountered * `jsonpQuery` : A [JSON Path](#json_path) query string. If present, the result of the JSON Path query should be a list of messages. This is useful if you have a JSON document which contains a list or array of messages embedded in it, and you do not have another means of splitting the message. * A field called `timestamp` is expected to exist and, if it does not, then current time is inserted. - + +## Parser Error Routing + +Currently, we have a few mechanisms for either deferring processing of +messages or marking messages as invalid. + +### Invalidation Errors + +There are two reasons a message will be marked as invalid: +* Fail [global validation](../metron-common#validation-framework) +* Fail the parser's validate function (generally that means to not have a `timestamp` field or a `original_string` field. + +Those messages which are marked as invalid are sent to the error queue +with an indication that they are invalid in the error message. + +### Parser Errors + +Errors, which are defined as unexpected exceptions happening during the +parse, are sent along to the error queue with a message indicating that +there was an error in parse along with a stacktrace. This is to +distinguish from the invalid messages. + +## Filtered + +One can also filter a message by specifying a `filterClassName` in the +parser config. Filtered messages are just dropped rather than passed +through. + ## Parser Architecture ![Architecture](parser_arch.png) http://git-wip-us.apache.org/repos/asf/metron/blob/0d847cf5/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java index 6fc4ed7..e996f14 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java @@ -193,23 +193,28 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable { message.put(Constants.GUID, UUID.randomUUID().toString()); } - if (parser.validate(message) && (filter == null || filter.emitTuple(message, stellarContext))) { - numWritten++; - List<FieldValidator> failedValidators = getFailedValidators(message, fieldValidations); - if(failedValidators.size() > 0) { + if (filter == null || filter.emitTuple(message, stellarContext)) { + boolean isInvalid = !parser.validate(message); + List<FieldValidator> failedValidators = null; + if(!isInvalid) { + failedValidators = getFailedValidators(message, fieldValidations); + isInvalid = !failedValidators.isEmpty(); + } + if( isInvalid) { MetronError error = new MetronError() .withErrorType(Constants.ErrorType.PARSER_INVALID) .withSensorType(getSensorType()) .addRawMessage(message); - Set<String> errorFields = failedValidators.stream() + Set<String> errorFields = failedValidators == null?null:failedValidators.stream() .flatMap(fieldValidator -> fieldValidator.getInput().stream()) .collect(Collectors.toSet()); - if (!errorFields.isEmpty()) { + if (errorFields != null && !errorFields.isEmpty()) { error.withErrorFields(errorFields); } ErrorUtils.handleError(collector, error); } else { + numWritten++; writer.write(getSensorType(), tuple, message, getConfigurations(), messageGetStrategy); } } http://git-wip-us.apache.org/repos/asf/metron/blob/0d847cf5/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java index 3316b32..6439b2b 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java @@ -118,6 +118,9 @@ public class ParserBoltTest extends BaseBoltTest { } private static ConfigurationsUpdater<ParserConfigurations> createUpdater() { + return createUpdater(Optional.empty()); + } + private static ConfigurationsUpdater<ParserConfigurations> createUpdater(Optional<Integer> batchSize) { return new ConfigurationsUpdater<ParserConfigurations>(null, null) { @Override public void update(CuratorFramework client, String path, byte[] data) throws IOException { } @@ -153,6 +156,9 @@ public class ParserBoltTest extends BaseBoltTest { @Override public Map<String, Object> getParserConfig() { return new HashMap<String, Object>() {{ + if(batchSize.isPresent()) { + put(IndexingConfigurations.BATCH_SIZE_CONF, batchSize.get()); + } }}; } }; @@ -502,9 +508,9 @@ public void testImplicitBatchOfOne() throws Exception { ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) { @Override protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { - return ParserBoltTest.createUpdater(); + return ParserBoltTest.createUpdater(Optional.of(5)); } - }; + } ; parserBolt.setCuratorFramework(client); parserBolt.setZKCache(cache); @@ -524,6 +530,7 @@ public void testImplicitBatchOfOne() throws Exception { writeNonBatch(outputCollector, parserBolt, t3); writeNonBatch(outputCollector, parserBolt, t4); parserBolt.execute(t5); + verify(batchWriter, times(1)).write(eq(sensorType), any(WriterConfiguration.class), eq(tuples), any()); verify(outputCollector, times(1)).ack(t1); verify(outputCollector, times(1)).ack(t2); verify(outputCollector, times(1)).ack(t3); @@ -540,7 +547,7 @@ public void testImplicitBatchOfOne() throws Exception { ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) { @Override protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { - return ParserBoltTest.createUpdater(); + return ParserBoltTest.createUpdater(Optional.of(5)); } }; @@ -552,7 +559,7 @@ public void testImplicitBatchOfOne() throws Exception { doThrow(new Exception()).when(batchWriter).write(any(), any(), any(), any()); when(parser.validate(any())).thenReturn(true); - when(parser.parse(any())).thenReturn(ImmutableList.of(new JSONObject())); + when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject()))); when(filter.emitTuple(any(), any(Context.class))).thenReturn(true); parserBolt.withMessageFilter(filter); parserBolt.execute(t1); @@ -560,6 +567,7 @@ public void testImplicitBatchOfOne() throws Exception { parserBolt.execute(t3); parserBolt.execute(t4); parserBolt.execute(t5); + verify(batchWriter, times(1)).write(any(), any(), any(), any()); verify(outputCollector, times(1)).ack(t1); verify(outputCollector, times(1)).ack(t2); verify(outputCollector, times(1)).ack(t3);