Github user charlesporter commented on a diff in the pull request: https://github.com/apache/nifi/pull/2411#discussion_r164564814 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java --- @@ -209,78 +287,94 @@ public void onTrigger(final ProcessContext context, final ProcessSession session try { final byte[] byteBuffer = buffer; - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(InputStream in) throws IOException { - StreamUtils.fillBuffer(in, byteBuffer, false); - } - }); + session.read(flowFile, in -> StreamUtils.fillBuffer(in, byteBuffer, false)); final long len = Math.min(byteBuffer.length, flowFile.getSize()); contentString = new String(byteBuffer, 0, (int) len, charset); } finally { bufferQueue.offer(buffer); } - final Match gm = grok.match(contentString); - gm.captures(); - - if (gm.toMap().isEmpty()) { - session.transfer(flowFile, REL_NO_MATCH); - getLogger().info("Did not match any Grok Expressions for FlowFile {}", new Object[]{flowFile}); - return; - } - - final ObjectMapper objectMapper = new ObjectMapper(); - switch (context.getProperty(DESTINATION).getValue()) { - case FLOWFILE_ATTRIBUTE: - Map<String, String> grokResults = new HashMap<>(); - for (Map.Entry<String, Object> entry : gm.toMap().entrySet()) { - if (null != entry.getValue()) { - grokResults.put("grok." + entry.getKey(), entry.getValue().toString()); + try{ + for (Grok grok : grokList) { + final Match gm = grok.match(contentString); + gm.captures(); + final Map<String, Object> localResults = gm.toMap(); + if (!localResults.isEmpty()) { + matchedExpressionList.add(grok.getOriginalGrokPattern()); + results.putAll(localResults); + if (breakOnFirstMatch) { + break; } } + } - flowFile = session.putAllAttributes(flowFile, grokResults); - session.getProvenanceReporter().modifyAttributes(flowFile); - session.transfer(flowFile, REL_MATCH); - getLogger().info("Matched {} Grok Expressions and added attributes to FlowFile {}", new Object[]{grokResults.size(), flowFile}); - - break; - case FLOWFILE_CONTENT: - FlowFile conFlowfile = session.write(flowFile, new StreamCallback() { - @Override - public void process(InputStream in, OutputStream out) throws IOException { - out.write(objectMapper.writeValueAsBytes(gm.toMap())); - } - }); - conFlowfile = session.putAttribute(conFlowfile, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON); - session.getProvenanceReporter().modifyContent(conFlowfile, "Replaced content with parsed Grok fields and values", stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - session.transfer(conFlowfile, REL_MATCH); + if (results.isEmpty()) { + session.transfer(flowFile, REL_NO_MATCH); + getLogger().debug("Did not match any Grok Expression for FlowFile {}", new Object[]{flowFile}); + return; + } + + String matchedExpressions = StringUtils.join(matchedExpressionList, expressionSeparator); + flowFile = session.putAttribute(flowFile, matchedExpressionAttribute, matchedExpressions); - break; + switch (context.getProperty(DESTINATION).getValue()) { + case FLOWFILE_ATTRIBUTE: + Map<String, String> grokResults = new HashMap<>(); + for (Map.Entry<String, Object> entry : results.entrySet()) { + if (null != entry.getValue()) { + grokResults.put(resultPrefix + entry.getKey(), entry.getValue().toString()); + } + } + flowFile = session.putAllAttributes(flowFile, grokResults); + session.getProvenanceReporter().modifyAttributes(flowFile); + session.transfer(flowFile, REL_MATCH); + getLogger().info("Matched {} Grok Expressions and added attributes to FlowFile {}", new Object[]{grokResults.size(), flowFile}); + break; + case FLOWFILE_CONTENT: + final ObjectMapper objectMapper = new ObjectMapper(); + FlowFile conFlowfile = session.write(flowFile, (in, out) -> { + out.write(objectMapper.writeValueAsBytes(results)); + }); + + conFlowfile = session.putAttribute(conFlowfile, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON); + session.getProvenanceReporter().modifyContent(conFlowfile, "Replaced content with parsed Grok fields and values", stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(conFlowfile, REL_MATCH); + break; + } + }catch(ProcessException t){ + flowFile = session.putAttribute(flowFile, getClass().getSimpleName() + ".exception", t.getMessage()); + session.transfer(flowFile, REL_NO_MATCH); + getLogger().error("Did not match any Grok Expression for FlowFile {}", new Object[]{flowFile}); + return; } } + public static Validator validateMultipleFilesExist() { + return (subject, input, context) -> { + for (String s : input.split(PATTERN_FILE_LIST_SEPARATOR)) { --- End diff -- oops.
---