This is an automated email from the ASF dual-hosted git repository. ijokarumawak pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 1c588f10b276dc8d2937b2c78caf0e034bc43fa8 Author: Koji Kawamura <ijokaruma...@apache.org> AuthorDate: Mon Apr 8 11:35:30 2019 +0900 NIFI-5979 Add Line-by-Line Evaluation Mode to ReplaceText Refactored to use functions to better handle strategy specific variables via closure. --- .../nifi/processors/standard/ReplaceText.java | 233 ++++++++++----------- 1 file changed, 105 insertions(+), 128 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java index 851770e..773458b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java @@ -53,7 +53,6 @@ import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.util.LineDemarcator; import org.apache.nifi.util.StopWatch; -import javax.annotation.Nullable; import java.io.BufferedWriter; import java.io.IOException; import java.io.InputStream; @@ -386,31 +385,28 @@ public class ReplaceText extends AbstractProcessor { } }); } else { - flowFile = session.write(flowFile, new StreamReplaceCallback(this, charset, maxBufferSize, context, flowFile, null)); - } + flowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(), + ((bw, oneLine) -> { + // We need to determine what line ending was used and use that after our replacement value. + lineEndingBuilder.setLength(0); + for (int i = oneLine.length() - 1; i >= 0; i--) { + final char c = oneLine.charAt(i); + if (c == '\r' || c == '\n') { + lineEndingBuilder.append(c); + } else { + break; + } + } - return flowFile; - } + bw.write(replacementValue); - public void replaceInLine(BufferedWriter bw, String oneLine, @Nullable Matcher matcher, @Nullable Pattern searchPattern, ProcessContext context, FlowFile flowFile) throws IOException { - final String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue(); - final StringBuilder lineEndingBuilder = new StringBuilder(2); - // We need to determine what line ending was used and use that after our replacement value. - lineEndingBuilder.setLength(0); - for (int i = oneLine.length() - 1; i >= 0; i--) { - final char c = oneLine.charAt(i); - if (c == '\r' || c == '\n') { - lineEndingBuilder.append(c); - } else { - break; - } + // Preserve original line endings. Reverse string because we iterated over original line ending in reverse order, appending to builder. + // So if builder has multiple characters, they are now reversed from the original string's ordering. + bw.write(lineEndingBuilder.reverse().toString()); + }))); } - bw.write(replacementValue); - - // Preserve original line endings. Reverse string because we iterated over original line ending in reverse order, appending to builder. - // So if builder has multiple characters, they are now reversed from the original string's ordering. - bw.write(lineEndingBuilder.reverse().toString()); + return flowFile; } @Override @@ -433,7 +429,8 @@ public class ReplaceText extends AbstractProcessor { } }); } else { - flowFile = session.write(flowFile, new StreamReplaceCallback(this, charset, maxBufferSize, context, flowFile, null)); + flowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(), + (bw, oneLine) -> bw.write(replacementValue.concat(oneLine)))); } return flowFile; } @@ -443,11 +440,6 @@ public class ReplaceText extends AbstractProcessor { return false; } - @Override - public void replaceInLine(BufferedWriter bw, String oneLine, @Nullable Matcher matcher, @Nullable Pattern searchPattern, ProcessContext context, FlowFile flowFile) throws IOException { - final String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue(); - bw.write(replacementValue.concat(oneLine)); - } } private class AppendReplace implements ReplacementStrategyExecutor { @@ -465,36 +457,34 @@ public class ReplaceText extends AbstractProcessor { } }); } else { - flowFile = session.write(flowFile, new StreamReplaceCallback(this, charset, maxBufferSize, context, flowFile, null)); - } - return flowFile; - } - - public void replaceInLine(BufferedWriter bw, String oneLine, @Nullable Matcher matcher, @Nullable Pattern searchPattern, ProcessContext context, FlowFile flowFile) throws IOException { - String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue(); - // we need to find the first carriage return or new-line so that we can append the new value - // before the line separate. However, we don't want to do this using a regular expression due - // to performance concerns. So we will find the first occurrence of either \r or \n and use - // that to insert the replacement value. - boolean foundNewLine = false; - for (int i = 0; i < oneLine.length(); i++) { - final char c = oneLine.charAt(i); - if (foundNewLine) { - bw.write(c); - continue; - } - - if (c == '\r' || c == '\n') { - bw.write(replacementValue); - foundNewLine = true; - } - - bw.write(c); - } + flowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(), + (bw, oneLine) -> { + // we need to find the first carriage return or new-line so that we can append the new value + // before the line separate. However, we don't want to do this using a regular expression due + // to performance concerns. So we will find the first occurrence of either \r or \n and use + // that to insert the replacement value. + boolean foundNewLine = false; + for (int i = 0; i < oneLine.length(); i++) { + final char c = oneLine.charAt(i); + if (foundNewLine) { + bw.write(c); + continue; + } + + if (c == '\r' || c == '\n') { + bw.write(replacementValue); + foundNewLine = true; + } + + bw.write(c); + } - if (!foundNewLine) { - bw.write(replacementValue); + if (!foundNewLine) { + bw.write(replacementValue); + } + })); } + return flowFile; } @Override @@ -580,46 +570,42 @@ public class ReplaceText extends AbstractProcessor { } } else { - updatedFlowFile = session.write(flowFile, new StreamReplaceCallback(this, charset, maxBufferSize, context, flowFile, searchPattern)); - } + final Matcher matcher = searchPattern.matcher(""); + updatedFlowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(), + (bw, oneLine) -> { + additionalAttrs.clear(); + matcher.reset(oneLine); - return updatedFlowFile; - } + int matches = 0; + StringBuffer sb = new StringBuffer(); - public void replaceInLine(BufferedWriter bw, String oneLine, @Nullable Matcher matcher, Pattern searchPattern, ProcessContext context, FlowFile flowFile) throws IOException { - additionalAttrs.clear(); - if (matcher == null) { - matcher = searchPattern.matcher(oneLine); - } else { - matcher.reset(oneLine); - } + while (matcher.find()) { + matches++; - int matches = 0; - StringBuffer sb = new StringBuffer(); + for (int i=0; i <= matcher.groupCount(); i++) { + additionalAttrs.put("$" + i, matcher.group(i)); + } - while (matcher.find()) { - matches++; + String replacement = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile, additionalAttrs, escapeBackRefDecorator).getValue(); + replacement = escapeLiteralBackReferences(replacement, numCapturingGroups); + String replacementFinal = normalizeReplacementString(replacement); - for (int i=0; i <= matcher.groupCount(); i++) { - additionalAttrs.put("$" + i, matcher.group(i)); - } + matcher.appendReplacement(sb, replacementFinal); + } - String replacement = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile, additionalAttrs, escapeBackRefDecorator).getValue(); - replacement = escapeLiteralBackReferences(replacement, numCapturingGroups); - String replacementFinal = normalizeReplacementString(replacement); + if (matches > 0) { + matcher.appendTail(sb); - matcher.appendReplacement(sb, replacementFinal); + final String updatedValue = sb.toString(); + bw.write(updatedValue); + } else { + // No match. Just write out the line as it was. + bw.write(oneLine); + } + })); } - if (matches > 0) { - matcher.appendTail(sb); - - final String updatedValue = sb.toString(); - bw.write(updatedValue); - } else { - // No match. Just write out the line as it was. - bw.write(oneLine); - } + return updatedFlowFile; } @Override @@ -657,30 +643,28 @@ public class ReplaceText extends AbstractProcessor { } else { final Pattern searchPattern = Pattern.compile(searchValue, Pattern.LITERAL); - flowFile = session.write(flowFile, new StreamReplaceCallback(this, charset, maxBufferSize, context, flowFile, searchPattern)); - } - return flowFile; - } - - public void replaceInLine(BufferedWriter bw, String oneLine, @Nullable Matcher matcher, @Nullable Pattern searchPattern, ProcessContext context, FlowFile flowFile) throws IOException { - String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue(); - int matches = 0; - int lastEnd = 0; - + flowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(), + (bw, oneLine) -> { + int matches = 0; + int lastEnd = 0; - while (matcher.find()) { - bw.write(oneLine, lastEnd, matcher.start() - lastEnd); - bw.write(replacementValue); - matches++; + final Matcher matcher = searchPattern.matcher(oneLine); + while (matcher.find()) { + bw.write(oneLine, lastEnd, matcher.start() - lastEnd); + bw.write(replacementValue); + matches++; - lastEnd = matcher.end(); - } + lastEnd = matcher.end(); + } - if (matches > 0) { - bw.write(oneLine, lastEnd, oneLine.length() - lastEnd); - } else { - bw.write(oneLine); + if (matches > 0) { + bw.write(oneLine, lastEnd, oneLine.length() - lastEnd); + } else { + bw.write(oneLine); + } + })); } + return flowFile; } @Override @@ -706,49 +690,43 @@ public class ReplaceText extends AbstractProcessor { FlowFile replace(FlowFile flowFile, ProcessSession session, ProcessContext context, String evaluateMode, Charset charset, int maxBufferSize); boolean isAllDataBufferedForEntireText(); + } - void replaceInLine(BufferedWriter bw, String oneLine, @Nullable Matcher matcher, @Nullable Pattern searchPattern, ProcessContext context, FlowFile flowFile) throws IOException ; + @FunctionalInterface + private interface ReplaceLine { + void apply(BufferedWriter bw, String oneLine) throws IOException; } private class StreamReplaceCallback implements StreamCallback { private final Charset charset; private final int maxBufferSize; - private final ProcessContext context; - private final FlowFile flowFile; - private final ReplacementStrategyExecutor replacementStrategyExecutor; - private final Pattern searchPattern; + private final String lineByLineEvaluationMode; + private final ReplaceLine replaceLine; - public StreamReplaceCallback(ReplacementStrategyExecutor replacementStrategyExecutor, - Charset charset, + private StreamReplaceCallback(Charset charset, int maxBufferSize, - ProcessContext context, - FlowFile flowFile, - @Nullable Pattern searchPattern) { - this.replacementStrategyExecutor = replacementStrategyExecutor; + String lineByLineEvaluationMode, + ReplaceLine replaceLine) { this.charset = charset; this.maxBufferSize = maxBufferSize; - this.context = context; - this.flowFile = flowFile; - this.searchPattern = searchPattern; + this.lineByLineEvaluationMode = lineByLineEvaluationMode; + this.replaceLine = replaceLine; } @Override public void process(final InputStream in, final OutputStream out) throws IOException { - final String lineByLineEvaluationMode = context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(); try (final LineDemarcator demarcator = new LineDemarcator(in, charset, maxBufferSize, 8192); final BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) { String precedingLine = demarcator.nextLine(); String succeedingLine; - Matcher matcher = null; boolean firstLine = true; while (null != (succeedingLine = demarcator.nextLine())) { - matcher = null != searchPattern ? searchPattern.matcher(precedingLine) : null; if(firstLine && lineByLineEvaluationMode.equalsIgnoreCase(FIRST_LINE)){ - replacementStrategyExecutor.replaceInLine(bw, precedingLine, matcher, searchPattern, context, flowFile); + replaceLine.apply(bw, precedingLine); firstLine = false; } else if(firstLine && lineByLineEvaluationMode.equalsIgnoreCase(EXCEPT_FIRST_LINE)) { firstLine = false; @@ -757,7 +735,7 @@ public class ReplaceText extends AbstractProcessor { || lineByLineEvaluationMode.equalsIgnoreCase(EXCEPT_LAST_LINE) || lineByLineEvaluationMode.equalsIgnoreCase(ALL) || (!firstLine && lineByLineEvaluationMode.equalsIgnoreCase(EXCEPT_FIRST_LINE))) { - replacementStrategyExecutor.replaceInLine(bw, precedingLine, matcher, searchPattern, context, flowFile); + replaceLine.apply(bw, precedingLine); } else { bw.write(precedingLine); } @@ -771,8 +749,7 @@ public class ReplaceText extends AbstractProcessor { || (firstLine && lineByLineEvaluationMode.equalsIgnoreCase(EXCEPT_FIRST_LINE))) { bw.write(precedingLine); } else { - matcher = null != searchPattern ? searchPattern.matcher(precedingLine) : null; - replacementStrategyExecutor.replaceInLine(bw, precedingLine, matcher, searchPattern, context, flowFile); + replaceLine.apply(bw, precedingLine); } } }