[ https://issues.apache.org/jira/browse/NIFI-2851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582224#comment-15582224 ]
ASF GitHub Bot commented on NIFI-2851: -------------------------------------- Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83639515 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -150,548 +145,320 @@ .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") .build(); - private List<PropertyDescriptor> properties; - private Set<Relationship> relationships; + private static final List<PropertyDescriptor> properties; + private static final Set<Relationship> relationships; - @Override - protected void init(final ProcessorInitializationContext context) { - final List<PropertyDescriptor> properties = new ArrayList<>(); + static { + properties = new ArrayList<>(); properties.add(LINE_SPLIT_COUNT); properties.add(FRAGMENT_MAX_SIZE); properties.add(HEADER_LINE_COUNT); properties.add(HEADER_MARKER); properties.add(REMOVE_TRAILING_NEWLINES); - this.properties = Collections.unmodifiableList(properties); - final Set<Relationship> relationships = new HashSet<>(); + relationships = new HashSet<>(); relationships.add(REL_ORIGINAL); relationships.add(REL_SPLITS); relationships.add(REL_FAILURE); - this.relationships = Collections.unmodifiableSet(relationships); } - @Override - protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { - List<ValidationResult> results = new ArrayList<>(); - - final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 - && !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); - - results.add(new ValidationResult.Builder() - .subject("Maximum Fragment Size") - .valid(!invalidState) - .explanation("Property must be specified when Line Split Count is 0") - .build() - ); - return results; - } - - @Override - public Set<Relationship> getRelationships() { - return relationships; - } - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return properties; - } - - private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out, - final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException { - final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - - byte[] leadingBytes = leadingNewLineBytes; - int numLines = 0; - long totalBytes = 0L; - for (int i = 0; i < maxNumLines; i++) { - final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes); - final long bytes = eolMarker.getBytesConsumed(); - leadingBytes = eolMarker.getLeadingNewLineBytes(); - - if (includeLineDelimiter && out != null) { - if (leadingBytes != null) { - out.write(leadingBytes); - leadingBytes = null; - } - eolBuffer.drainTo(out); - } - totalBytes += bytes; - if (bytes <= 0) { - return numLines; - } - numLines++; - if (totalBytes >= maxByteCount) { - break; - } - } - return numLines; - } - - private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize, - final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException { - long bytesRead = 0L; - final ByteArrayOutputStream buffer; - if (out != null) { - buffer = new ByteArrayOutputStream(); - } else { - buffer = null; - } - byte[] bytesToWriteFirst = leadingNewLineBytes; - - in.mark(Integer.MAX_VALUE); - while (true) { - final int nextByte = in.read(); - - // if we hit end of stream we're done - if (nextByte == -1) { - if (buffer != null) { - buffer.writeTo(out); - buffer.close(); - } - return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst); // bytesToWriteFirst should be "null"? - } + private volatile boolean removeTrailingNewLines; - // Verify leading bytes do not violate size limitation - if (bytesToWriteFirst != null && (bytesToWriteFirst.length + bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) { - return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes); - } - // Write leadingNewLines, if appropriate - if ( buffer != null && includeLineDelimiter && bytesToWriteFirst != null) { - bytesRead += bytesToWriteFirst.length; - buffer.write(bytesToWriteFirst); - bytesToWriteFirst = null; - } - // buffer the output - bytesRead++; - if (buffer != null && nextByte != '\n' && nextByte != '\r') { - if (bytesToWriteFirst != null) { - buffer.write(bytesToWriteFirst); - } - bytesToWriteFirst = null; - eolBuffer.drainTo(buffer); - eolBuffer.clear(); - buffer.write(nextByte); - } + private volatile long maxSplitSize; - // check the size limit - if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar > 0) { - in.reset(); - if (buffer != null) { - buffer.close(); - } - return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes); - } + private volatile int lineCount; - // if we have a new line, then we're done - if (nextByte == '\n') { - if (buffer != null) { - buffer.writeTo(out); - buffer.close(); - eolBuffer.addEndOfLine(false, true); - } - return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst); - } + private volatile int headerLineCount; - // Determine if \n follows \r; in either case, end of line has been reached - if (nextByte == '\r') { - if (buffer != null) { - buffer.writeTo(out); - buffer.close(); - } - in.mark(1); - final int lookAheadByte = in.read(); - if (lookAheadByte == '\n') { - eolBuffer.addEndOfLine(true, true); - return new EndOfLineMarker(bytesRead + 1, eolBuffer, false, bytesToWriteFirst); - } else { - in.reset(); - eolBuffer.addEndOfLine(true, false); - return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst); - } - } - } - } + private volatile String headerMarker; - private SplitInfo locateSplitPoint(final InputStream in, final int numLines, final boolean keepAllNewLines, final long maxSize, - final long bufferedBytes) throws IOException { - final SplitInfo info = new SplitInfo(); - final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - int lastByte = -1; - info.lengthBytes = bufferedBytes; - long lastEolBufferLength = 0L; - - while ((info.lengthLines < numLines || (info.lengthLines == numLines && lastByte == '\r')) - && (((info.lengthBytes + eolBuffer.length()) < maxSize) || info.lengthLines == 0) - && eolBuffer.length() < maxSize) { - in.mark(1); - final int nextByte = in.read(); - // Check for \n following \r on last line - if (info.lengthLines == numLines && lastByte == '\r' && nextByte != '\n') { - in.reset(); - break; - } - switch (nextByte) { - case -1: - info.endOfStream = true; - if (keepAllNewLines) { - info.lengthBytes += eolBuffer.length(); - } - if (lastByte != '\r') { - info.lengthLines++; - } - info.bufferedBytes = 0; - return info; - case '\r': - eolBuffer.addEndOfLine(true, false); - info.lengthLines++; - info.bufferedBytes = 0; - break; - case '\n': - eolBuffer.addEndOfLine(false, true); - if (lastByte != '\r') { - info.lengthLines++; - } - info.bufferedBytes = 0; - break; - default: - if (eolBuffer.length() > 0) { - info.lengthBytes += eolBuffer.length(); - lastEolBufferLength = eolBuffer.length(); - eolBuffer.clear(); - } - info.lengthBytes++; - info.bufferedBytes++; - break; - } - lastByte = nextByte; - } - // if current line exceeds size and not keeping eol characters, remove previously applied eol characters - if ((info.lengthBytes + eolBuffer.length()) >= maxSize && !keepAllNewLines) { - info.lengthBytes -= lastEolBufferLength; - } - if (keepAllNewLines) { - info.lengthBytes += eolBuffer.length(); - } - return info; + @Override + public Set<Relationship> getRelationships() { + return Collections.unmodifiableSet(relationships); } - private int countHeaderLines(final ByteCountingInputStream in, - final String headerMarker) throws IOException { - int headerInfo = 0; - - final BufferedReader br = new BufferedReader(new InputStreamReader(in)); - in.mark(Integer.MAX_VALUE); - String line = br.readLine(); - while (line != null) { - // if line is not a header line, reset stream and return header counts - if (!line.startsWith(headerMarker)) { - in.reset(); - return headerInfo; - } else { - headerInfo++; - } - line = br.readLine(); - } - in.reset(); - return headerInfo; + /** + * + */ + @OnScheduled + public void onSchedule(ProcessContext context) { + this.removeTrailingNewLines = context.getProperty(REMOVE_TRAILING_NEWLINES).isSet() + ? context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean() : false; + this.maxSplitSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet() + ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE; + this.lineCount = context.getProperty(LINE_SPLIT_COUNT).asInteger(); + this.headerLineCount = context.getProperty(HEADER_LINE_COUNT).asInteger(); + this.headerMarker = context.getProperty(HEADER_MARKER).getValue(); } + /** + * Will split the incoming stream releasing all splits as FlowFile at once. + */ @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) { - final FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - - final ComponentLog logger = getLogger(); - final int headerCount = context.getProperty(HEADER_LINE_COUNT).asInteger(); - final int maxLineCount = (context.getProperty(LINE_SPLIT_COUNT).asInteger() == 0) - ? Integer.MAX_VALUE : context.getProperty(LINE_SPLIT_COUNT).asInteger(); - final long maxFragmentSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet() - ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE; - final String headerMarker = context.getProperty(HEADER_MARKER).getValue(); - final boolean includeLineDelimiter = !context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean(); - - final AtomicReference<String> errorMessage = new AtomicReference<>(null); - final ArrayList<SplitInfo> splitInfos = new ArrayList<>(); - - final long startNanos = System.nanoTime(); - final List<FlowFile> splits = new ArrayList<>(); - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream rawIn) throws IOException { - try (final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn); - final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) { - - long bufferedPartialLine = 0; - - // if we have header lines, copy them into a ByteArrayOutputStream - final ByteArrayOutputStream headerStream = new ByteArrayOutputStream(); - // Determine the number of lines of header, priority given to HEADER_LINE_COUNT property - int headerInfoLineCount = 0; - if (headerCount > 0) { - headerInfoLineCount = headerCount; - } else { - if (headerMarker != null) { - headerInfoLineCount = countHeaderLines(in, headerMarker); + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile != null) { + AtomicBoolean error = new AtomicBoolean(); + List<FlowFile> splitFlowFiles = new ArrayList<>(); + List<SplitInfo> computedSplitsInfo = new ArrayList<>(); + AtomicReference<SplitInfo> headerSplitInfoRef = new AtomicReference<>(); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(InputStream in) throws IOException { + TextLineDemarcator demarcator = new TextLineDemarcator(in); + SplitInfo splitInfo = null; + long startOffset = 0; + + // Compute fragment representing the header (if available) + long start = System.nanoTime(); + try { + if (SplitText.this.headerLineCount > 0) { + splitInfo = SplitText.this.computeHeader(demarcator, startOffset, SplitText.this.headerLineCount, null, null); + if (splitInfo.lineCount < SplitText.this.headerLineCount) { + error.set(true); + getLogger().error("Unable to split " + flowFile + " due to insufficient amount of header lines. Required " + + SplitText.this.headerLineCount + " but was " + splitInfo.lineCount + ". Routing to failure."); + } + } else if (SplitText.this.headerMarker != null) { + splitInfo = SplitText.this.computeHeader(demarcator, startOffset, Long.MAX_VALUE, SplitText.this.headerMarker.getBytes(StandardCharsets.UTF_8), null); } + headerSplitInfoRef.set(splitInfo); + } catch (IllegalStateException e) { + error.set(true); + getLogger().error(e.getMessage() + " Routing to failure."); } - final byte[] headerNewLineBytes; - final byte[] headerBytesWithoutTrailingNewLines; - if (headerInfoLineCount > 0) { - final int headerLinesCopied = readLines(in, headerInfoLineCount, Long.MAX_VALUE, headerStream, true, null); - - if (headerLinesCopied < headerInfoLineCount) { - errorMessage.set("Header Line Count is set to " + headerInfoLineCount + " but file had only " + headerLinesCopied + " lines"); - return; + // Compute and collect fragments representing the individual splits + if (!error.get()) { + if (headerSplitInfoRef.get() != null) { + startOffset = headerSplitInfoRef.get().length; } - - // Break header apart into trailing newlines and remaining text - final byte[] headerBytes = headerStream.toByteArray(); - int headerNewLineByteCount = 0; - for (int i = headerBytes.length - 1; i >= 0; i--) { - final byte headerByte = headerBytes[i]; - - if (headerByte == '\r' || headerByte == '\n') { - headerNewLineByteCount++; - } else { - break; - } + long preAccumulatedLength = startOffset; + while ((splitInfo = SplitText.this.nextSplit(demarcator, startOffset, SplitText.this.lineCount, splitInfo, preAccumulatedLength)) != null) { + computedSplitsInfo.add(splitInfo); + startOffset += splitInfo.length; } - - if (headerNewLineByteCount == 0) { - headerNewLineBytes = null; - headerBytesWithoutTrailingNewLines = headerBytes; - } else { - headerNewLineBytes = new byte[headerNewLineByteCount]; - System.arraycopy(headerBytes, headerBytes.length - headerNewLineByteCount, headerNewLineBytes, 0, headerNewLineByteCount); - - headerBytesWithoutTrailingNewLines = new byte[headerBytes.length - headerNewLineByteCount]; - System.arraycopy(headerBytes, 0, headerBytesWithoutTrailingNewLines, 0, headerBytes.length - headerNewLineByteCount); + long stop = System.nanoTime(); + if (getLogger().isDebugEnabled()) { + getLogger().debug("Computed splits in " + (stop - start) + " milliseconds."); } - } else { - headerBytesWithoutTrailingNewLines = null; - headerNewLineBytes = null; } - - while (true) { - if (headerInfoLineCount > 0) { - // if we have header lines, create a new FlowFile, copy the header lines to that file, - // and then start copying lines - final AtomicInteger linesCopied = new AtomicInteger(0); - final AtomicLong bytesCopied = new AtomicLong(0L); - FlowFile splitFile = session.create(flowFile); - try { - splitFile = session.write(splitFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream rawOut) throws IOException { - try (final BufferedOutputStream out = new BufferedOutputStream(rawOut); - final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(out)) { - countingOut.write(headerBytesWithoutTrailingNewLines); - //readLines has an offset of countingOut.getBytesWritten() to allow for header bytes written already - linesCopied.set(readLines(in, maxLineCount, maxFragmentSize - countingOut.getBytesWritten(), countingOut, - includeLineDelimiter, headerNewLineBytes)); - bytesCopied.set(countingOut.getBytesWritten()); - } - } - }); - splitFile = session.putAttribute(splitFile, SPLIT_LINE_COUNT, String.valueOf(linesCopied.get())); - splitFile = session.putAttribute(splitFile, FRAGMENT_SIZE, String.valueOf(bytesCopied.get())); - logger.debug("Created Split File {} with {} lines, {} bytes", new Object[]{splitFile, linesCopied.get(), bytesCopied.get()}); - } finally { - if (linesCopied.get() > 0) { - splits.add(splitFile); + } + }); + if (!error.get()) { + FlowFile headerFlowFile = null; + long headerCrlfLength = 0; + if (headerSplitInfoRef.get() != null) { + headerFlowFile = session.clone(flowFile, headerSplitInfoRef.get().startOffset, headerSplitInfoRef.get().length); --- End diff -- I believe it's OK here since it is completely encapsulated to the SplitText's instance making it essentially an extension to SplitText. I would agree if SplitInfo was 'static class. . .'. > Improve performance of SplitText > -------------------------------- > > Key: NIFI-2851 > URL: https://issues.apache.org/jira/browse/NIFI-2851 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework > Reporter: Mark Payne > Assignee: Oleg Zhurakousky > Fix For: 1.1.0 > > > SplitText is fairly CPU-intensive and quite slow. A simple flow that splits a > 1.4 million line text file into 5k line chunks and then splits those 5k line > chunks into 1 line chunks is only capable of pushing through about 10k lines > per second. This equates to about 10 MB/sec. JVisualVM shows that the > majority of the time is spent in the locateSplitPoint() method. Isolating > this code and inspecting how it works, and using some micro-benchmarking, it > appears that if we refactor the calls to InputStream.read() to instead read > into a byte array, we can improve performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)