METRON-1501 Parser messages that fail to validate are dropped silently 
(cestella via justinleet) closes apache/metron#972


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/0d847cf5
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/0d847cf5
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/0d847cf5

Branch: refs/heads/feature/METRON-1090-stellar-assignment
Commit: 0d847cf5f91dc0d3b3b6838eb4b4de4aa2cf2fec
Parents: 19b237d
Author: cestella <ceste...@gmail.com>
Authored: Tue Apr 3 10:29:19 2018 -0400
Committer: leet <l...@apache.org>
Committed: Tue Apr 3 10:29:19 2018 -0400

----------------------------------------------------------------------
 metron-platform/metron-parsers/README.md        | 29 +++++++++++++++++++-
 .../apache/metron/parsers/bolt/ParserBolt.java  | 17 ++++++++----
 .../metron/parsers/bolt/ParserBoltTest.java     | 16 ++++++++---
 3 files changed, 51 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/0d847cf5/metron-platform/metron-parsers/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/README.md 
b/metron-platform/metron-parsers/README.md
index 3d9fdfe..6b9d62e 100644
--- a/metron-platform/metron-parsers/README.md
+++ b/metron-platform/metron-parsers/README.md
@@ -45,7 +45,34 @@ There are two general types types of parsers:
       * `ERROR` : Throw an error when a multidimensional map is encountered
     * `jsonpQuery` : A [JSON Path](#json_path) query string. If present, the 
result of the JSON Path query should be a list of messages. This is useful if 
you have a JSON document which contains a list or array of messages embedded in 
it, and you do not have another means of splitting the message.
     * A field called `timestamp` is expected to exist and, if it does not, 
then current time is inserted.  
-    
+
+## Parser Error Routing
+
+Currently, we have a few mechanisms for either deferring processing of
+messages or marking messages as invalid.
+
+### Invalidation Errors
+
+There are two reasons a message will be marked as invalid:
+* Fail [global validation](../metron-common#validation-framework)
+* Fail the parser's validate function (generally that means to not have a 
`timestamp` field or a `original_string` field.
+
+Those messages which are marked as invalid are sent to the error queue
+with an indication that they are invalid in the error message.
+
+### Parser Errors
+
+Errors, which are defined as unexpected exceptions happening during the
+parse, are sent along to the error queue with a message indicating that
+there was an error in parse along with a stacktrace.  This is to
+distinguish from the invalid messages.
+ 
+## Filtered
+
+One can also filter a message by specifying a `filterClassName` in the
+parser config.  Filtered messages are just dropped rather than passed
+through.
+   
 ## Parser Architecture
 
 ![Architecture](parser_arch.png)

http://git-wip-us.apache.org/repos/asf/metron/blob/0d847cf5/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index 6fc4ed7..e996f14 100644
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -193,23 +193,28 @@ public class ParserBolt extends ConfiguredParserBolt 
implements Serializable {
             message.put(Constants.GUID, UUID.randomUUID().toString());
           }
 
-          if (parser.validate(message) && (filter == null || 
filter.emitTuple(message, stellarContext))) {
-            numWritten++;
-            List<FieldValidator> failedValidators = 
getFailedValidators(message, fieldValidations);
-            if(failedValidators.size() > 0) {
+          if (filter == null || filter.emitTuple(message, stellarContext)) {
+            boolean isInvalid = !parser.validate(message);
+            List<FieldValidator> failedValidators = null;
+            if(!isInvalid) {
+              failedValidators = getFailedValidators(message, 
fieldValidations);
+              isInvalid = !failedValidators.isEmpty();
+            }
+            if( isInvalid) {
               MetronError error = new MetronError()
                       .withErrorType(Constants.ErrorType.PARSER_INVALID)
                       .withSensorType(getSensorType())
                       .addRawMessage(message);
-              Set<String> errorFields = failedValidators.stream()
+              Set<String> errorFields = failedValidators == 
null?null:failedValidators.stream()
                       .flatMap(fieldValidator -> 
fieldValidator.getInput().stream())
                       .collect(Collectors.toSet());
-              if (!errorFields.isEmpty()) {
+              if (errorFields != null && !errorFields.isEmpty()) {
                 error.withErrorFields(errorFields);
               }
               ErrorUtils.handleError(collector, error);
             }
             else {
+              numWritten++;
               writer.write(getSensorType(), tuple, message, 
getConfigurations(), messageGetStrategy);
             }
           }

http://git-wip-us.apache.org/repos/asf/metron/blob/0d847cf5/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
index 3316b32..6439b2b 100644
--- 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
+++ 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
@@ -118,6 +118,9 @@ public class ParserBoltTest extends BaseBoltTest {
   }
 
   private static ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+    return createUpdater(Optional.empty());
+  }
+  private static ConfigurationsUpdater<ParserConfigurations> 
createUpdater(Optional<Integer> batchSize) {
     return new ConfigurationsUpdater<ParserConfigurations>(null, null) {
       @Override
       public void update(CuratorFramework client, String path, byte[] data) 
throws IOException { }
@@ -153,6 +156,9 @@ public class ParserBoltTest extends BaseBoltTest {
               @Override
               public Map<String, Object> getParserConfig() {
                 return new HashMap<String, Object>() {{
+                  if(batchSize.isPresent()) {
+                    put(IndexingConfigurations.BATCH_SIZE_CONF, 
batchSize.get());
+                  }
                 }};
               }
             };
@@ -502,9 +508,9 @@ public void testImplicitBatchOfOne() throws Exception {
     ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, 
new WriterHandler(batchWriter)) {
       @Override
       protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-        return ParserBoltTest.createUpdater();
+        return ParserBoltTest.createUpdater(Optional.of(5));
       }
-    };
+    } ;
 
     parserBolt.setCuratorFramework(client);
     parserBolt.setZKCache(cache);
@@ -524,6 +530,7 @@ public void testImplicitBatchOfOne() throws Exception {
     writeNonBatch(outputCollector, parserBolt, t3);
     writeNonBatch(outputCollector, parserBolt, t4);
     parserBolt.execute(t5);
+    verify(batchWriter, times(1)).write(eq(sensorType), 
any(WriterConfiguration.class), eq(tuples), any());
     verify(outputCollector, times(1)).ack(t1);
     verify(outputCollector, times(1)).ack(t2);
     verify(outputCollector, times(1)).ack(t3);
@@ -540,7 +547,7 @@ public void testImplicitBatchOfOne() throws Exception {
     ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, 
new WriterHandler(batchWriter)) {
       @Override
       protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-        return ParserBoltTest.createUpdater();
+        return ParserBoltTest.createUpdater(Optional.of(5));
       }
     };
 
@@ -552,7 +559,7 @@ public void testImplicitBatchOfOne() throws Exception {
 
     doThrow(new Exception()).when(batchWriter).write(any(), any(), any(), 
any());
     when(parser.validate(any())).thenReturn(true);
-    when(parser.parse(any())).thenReturn(ImmutableList.of(new JSONObject()));
+    
when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new 
JSONObject())));
     when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
     parserBolt.withMessageFilter(filter);
     parserBolt.execute(t1);
@@ -560,6 +567,7 @@ public void testImplicitBatchOfOne() throws Exception {
     parserBolt.execute(t3);
     parserBolt.execute(t4);
     parserBolt.execute(t5);
+    verify(batchWriter, times(1)).write(any(), any(), any(), any());
     verify(outputCollector, times(1)).ack(t1);
     verify(outputCollector, times(1)).ack(t2);
     verify(outputCollector, times(1)).ack(t3);

Reply via email to