[ 
https://issues.apache.org/jira/browse/NIFI-1118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15310953#comment-15310953
 ] 

ASF GitHub Bot commented on NIFI-1118:
--------------------------------------

Github user mosermw commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/444#discussion_r65425886
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
 ---
    @@ -147,138 +193,200 @@ protected void init(final 
ProcessorInitializationContext context) {
             return properties;
         }
     
    -    private int readLines(final InputStream in, final int maxNumLines, 
final OutputStream out, final boolean keepAllNewLines, final byte[] 
leadingNewLineBytes) throws IOException {
    +    private int readLines(final InputStream in, final int maxNumLines, 
final long maxByteCount, final OutputStream out,
    +                          final boolean includeLineDelimiter, final byte[] 
leadingNewLineBytes) throws IOException {
             final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
     
    -        int numLines = 0;
             byte[] leadingBytes = leadingNewLineBytes;
    +        int numLines = 0;
    +        long totalBytes = 0L;
             for (int i = 0; i < maxNumLines; i++) {
    -            final EndOfLineMarker eolMarker = locateEndOfLine(in, out, 
false, eolBuffer, leadingBytes);
    +            final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, 
out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
    +            final long bytes = eolMarker.getBytesConsumed();
                 leadingBytes = eolMarker.getLeadingNewLineBytes();
     
    -            if (keepAllNewLines && out != null) {
    +            if (includeLineDelimiter && out != null) {
                     if (leadingBytes != null) {
                         out.write(leadingBytes);
                         leadingBytes = null;
                     }
    -
                     eolBuffer.drainTo(out);
                 }
    -
    -            if (eolBuffer.length() > 0 || eolMarker.getBytesConsumed() > 
0L) {
    -                numLines++;
    +            totalBytes += bytes;
    +            if (bytes <= 0) {
    +                return numLines;
                 }
    -
    -            if (eolMarker.isStreamEnded()) {
    +            numLines++;
    +            if (totalBytes >= maxByteCount) {
                     break;
                 }
             }
    -
             return numLines;
         }
     
    -    private EndOfLineMarker locateEndOfLine(final InputStream in, final 
OutputStream out, final boolean includeLineDelimiter,
    -        final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) 
throws IOException {
    -
    +    private EndOfLineMarker countBytesToSplitPoint(final InputStream in, 
final OutputStream out, final long bytesReadSoFar, final long maxSize,
    +                                                   final boolean 
includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] 
leadingNewLineBytes) throws IOException {
             int lastByte = -1;
             long bytesRead = 0L;
    +        final ByteArrayOutputStream buffer;
    +        if (out != null) {
    +            buffer = new ByteArrayOutputStream();
    +        } else {
    +            buffer = null;
    +        }
             byte[] bytesToWriteFirst = leadingNewLineBytes;
     
    +        in.mark(Integer.MAX_VALUE);
             while (true) {
    -            in.mark(1);
                 final int nextByte = in.read();
     
    -            final boolean isNewLineChar = nextByte == '\r' || nextByte == 
'\n';
    -
    -            // if we hit end of stream or new line we're done
    +            // if we hit end of stream we're done
                 if (nextByte == -1) {
    -                if (lastByte == '\r') {
    -                    eolBuffer.addEndOfLine(true, false);
    +                if (buffer != null) {
    +                    buffer.writeTo(out);
    +                    buffer.close();
                     }
    -
    -                return new EndOfLineMarker(bytesRead, eolBuffer, true, 
bytesToWriteFirst);
    +                return new EndOfLineMarker(bytesRead, eolBuffer, true, 
bytesToWriteFirst);  // bytesToWriteFirst should be "null"?
                 }
     
    -            // If we get a character that's not an end-of-line char, then
    -            // we need to write out the EOL's that we have buffered (if 
out != null).
    -            // Then, we need to reset our EOL buffer because we no longer 
have consecutive EOL's
    -            if (!isNewLineChar) {
    +            // Verify leading bytes do not violate size limitation
    +            if (bytesToWriteFirst != null && (bytesToWriteFirst.length + 
bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) {
    +                return new EndOfLineMarker(-1, eolBuffer, false, 
leadingNewLineBytes);
    +            }
    +            // Write leadingNewLines, if appropriate
    +            if ( buffer != null && includeLineDelimiter && 
bytesToWriteFirst != null) {
    +                bytesRead += bytesToWriteFirst.length;
    +                buffer.write(bytesToWriteFirst);
    +                bytesToWriteFirst = null;
    +            }
    +            // buffer the output
    +            bytesRead++;
    +            if (buffer != null && nextByte != '\n' && nextByte != '\r') {
                     if (bytesToWriteFirst != null) {
    -                    if (out != null) {
    -                        out.write(bytesToWriteFirst);
    -                    }
    -
    -                    bytesToWriteFirst = null;
    -                }
    -
    -                if (out != null) {
    -                    eolBuffer.drainTo(out);
    +                    buffer.write(bytesToWriteFirst);
                     }
    -
    +                bytesToWriteFirst = null;
    +                eolBuffer.drainTo(buffer);
                     eolBuffer.clear();
    +                buffer.write(nextByte);
                 }
     
    -            // if there's an OutputStream to copy the data to, copy it, if 
appropriate.
    -            // "if appropriate" means that it's not a line delimiter or 
that we want to copy line delimiters
    -            bytesRead++;
    -            if (out != null && (includeLineDelimiter || !isNewLineChar)) {
    -                if (bytesToWriteFirst != null) {
    -                    out.write(bytesToWriteFirst);
    -                    bytesToWriteFirst = null;
    +            // check the size limit
    +            if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar > 
0) {
    +                in.reset();
    +                if (buffer != null) {
    +                    buffer.close();
                     }
    -
    -                out.write(nextByte);
    +                return new EndOfLineMarker(-1, eolBuffer, false, 
leadingNewLineBytes);
                 }
     
                 // if we have a new line, then we're done
                 if (nextByte == '\n') {
    -                eolBuffer.addEndOfLine(lastByte == '\r', true);
    +                if (buffer != null) {
    +                    buffer.writeTo(out);
    +                    buffer.close();
    +                    eolBuffer.addEndOfLine(false, true);  //TODO: verify 
"false" is equivalent to "lastByte == '\r'"
    --- End diff --
    
    Is this TODO a reminder to future @markobean?  Does a current unit test 
verify it?


> Enable SplitText processor to limit line length and filter header lines
> -----------------------------------------------------------------------
>
>                 Key: NIFI-1118
>                 URL: https://issues.apache.org/jira/browse/NIFI-1118
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Mark Bean
>            Assignee: Mark Bean
>             Fix For: 0.7.0
>
>
> Include the following functionality to the SplitText processor:
> 1) Maximum size limit of the split file(s)
> A new split file will be created if the next line to be added to the current 
> split file exceeds a user-defined maximum file size
> 2) Header line marker
> User-defined character(s) can be used to identify the header line(s) of the 
> data file rather than a predetermined number of lines
> These changes are additions, not a replacement of any property or behavior. 
> In the case of header line marker, the existing property "Header Line Count" 
> must be zero for the new property and behavior to be used.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to