Github user charlesporter commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2411#discussion_r164564814
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java
 ---
    @@ -209,78 +287,94 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
     
             try {
                 final byte[] byteBuffer = buffer;
    -            session.read(flowFile, new InputStreamCallback() {
    -                @Override
    -                public void process(InputStream in) throws IOException {
    -                    StreamUtils.fillBuffer(in, byteBuffer, false);
    -                }
    -            });
    +            session.read(flowFile, in -> StreamUtils.fillBuffer(in, 
byteBuffer, false));
                 final long len = Math.min(byteBuffer.length, 
flowFile.getSize());
                 contentString = new String(byteBuffer, 0, (int) len, charset);
             } finally {
                 bufferQueue.offer(buffer);
             }
     
    -        final Match gm = grok.match(contentString);
    -        gm.captures();
    -
    -        if (gm.toMap().isEmpty()) {
    -            session.transfer(flowFile, REL_NO_MATCH);
    -            getLogger().info("Did not match any Grok Expressions for 
FlowFile {}", new Object[]{flowFile});
    -            return;
    -        }
    -
    -        final ObjectMapper objectMapper = new ObjectMapper();
    -        switch (context.getProperty(DESTINATION).getValue()) {
    -            case FLOWFILE_ATTRIBUTE:
    -                Map<String, String> grokResults = new HashMap<>();
    -                for (Map.Entry<String, Object> entry : 
gm.toMap().entrySet()) {
    -                    if (null != entry.getValue()) {
    -                        grokResults.put("grok." + entry.getKey(), 
entry.getValue().toString());
    +        try{
    +            for (Grok grok : grokList) {
    +                final Match gm = grok.match(contentString);
    +                gm.captures();
    +                final Map<String, Object> localResults = gm.toMap();
    +                if (!localResults.isEmpty()) {
    +                    
matchedExpressionList.add(grok.getOriginalGrokPattern());
    +                    results.putAll(localResults);
    +                    if (breakOnFirstMatch) {
    +                        break;
                         }
                     }
    +            }
     
    -                flowFile = session.putAllAttributes(flowFile, grokResults);
    -                session.getProvenanceReporter().modifyAttributes(flowFile);
    -                session.transfer(flowFile, REL_MATCH);
    -                getLogger().info("Matched {} Grok Expressions and added 
attributes to FlowFile {}", new Object[]{grokResults.size(), flowFile});
    -
    -                break;
    -            case FLOWFILE_CONTENT:
    -                FlowFile conFlowfile = session.write(flowFile, new 
StreamCallback() {
    -                    @Override
    -                    public void process(InputStream in, OutputStream out) 
throws IOException {
    -                        
out.write(objectMapper.writeValueAsBytes(gm.toMap()));
    -                    }
    -                });
    -                conFlowfile = session.putAttribute(conFlowfile, 
CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
    -                session.getProvenanceReporter().modifyContent(conFlowfile, 
"Replaced content with parsed Grok fields and values", 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
    -                session.transfer(conFlowfile, REL_MATCH);
    +            if (results.isEmpty()) {
    +                session.transfer(flowFile, REL_NO_MATCH);
    +                getLogger().debug("Did not match any Grok Expression for 
FlowFile {}", new Object[]{flowFile});
    +                return;
    +            }
    +
    +            String matchedExpressions = 
StringUtils.join(matchedExpressionList, expressionSeparator);
    +            flowFile = session.putAttribute(flowFile, 
matchedExpressionAttribute, matchedExpressions);
     
    -                break;
    +            switch (context.getProperty(DESTINATION).getValue()) {
    +                case FLOWFILE_ATTRIBUTE:
    +                    Map<String, String> grokResults = new HashMap<>();
    +                    for (Map.Entry<String, Object> entry : 
results.entrySet()) {
    +                        if (null != entry.getValue()) {
    +                            grokResults.put(resultPrefix + entry.getKey(), 
entry.getValue().toString());
    +                        }
    +                    }
    +                    flowFile = session.putAllAttributes(flowFile, 
grokResults);
    +                    
session.getProvenanceReporter().modifyAttributes(flowFile);
    +                    session.transfer(flowFile, REL_MATCH);
    +                    getLogger().info("Matched {} Grok Expressions and 
added attributes to FlowFile {}", new Object[]{grokResults.size(), flowFile});
    +                    break;
    +                case FLOWFILE_CONTENT:
    +                    final ObjectMapper objectMapper = new ObjectMapper();
    +                    FlowFile conFlowfile = session.write(flowFile, (in, 
out) -> {
    +                        out.write(objectMapper.writeValueAsBytes(results));
    +                    });
    +
    +                    conFlowfile = session.putAttribute(conFlowfile, 
CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
    +                    
session.getProvenanceReporter().modifyContent(conFlowfile, "Replaced content 
with parsed Grok fields and values", 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
    +                    session.transfer(conFlowfile, REL_MATCH);
    +                    break;
    +            }
    +        }catch(ProcessException t){
    +            flowFile = session.putAttribute(flowFile, 
getClass().getSimpleName() + ".exception", t.getMessage());
    +            session.transfer(flowFile, REL_NO_MATCH);
    +            getLogger().error("Did not match any Grok Expression for 
FlowFile {}", new Object[]{flowFile});
    +            return;
             }
         }
     
    +    public static Validator validateMultipleFilesExist() {
    +        return (subject, input, context) -> {
    +            for (String s : input.split(PATTERN_FILE_LIST_SEPARATOR)) {
    --- End diff --
    
    oops.


---

Reply via email to