Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1116#discussion_r83260507 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -150,548 +145,320 @@ .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") .build(); - private List<PropertyDescriptor> properties; - private Set<Relationship> relationships; + private static final List<PropertyDescriptor> properties; + private static final Set<Relationship> relationships; - @Override - protected void init(final ProcessorInitializationContext context) { - final List<PropertyDescriptor> properties = new ArrayList<>(); + static { + properties = new ArrayList<>(); properties.add(LINE_SPLIT_COUNT); properties.add(FRAGMENT_MAX_SIZE); properties.add(HEADER_LINE_COUNT); properties.add(HEADER_MARKER); properties.add(REMOVE_TRAILING_NEWLINES); - this.properties = Collections.unmodifiableList(properties); - final Set<Relationship> relationships = new HashSet<>(); + relationships = new HashSet<>(); relationships.add(REL_ORIGINAL); relationships.add(REL_SPLITS); relationships.add(REL_FAILURE); - this.relationships = Collections.unmodifiableSet(relationships); } - @Override - protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { - List<ValidationResult> results = new ArrayList<>(); - - final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 - && !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); - - results.add(new ValidationResult.Builder() - .subject("Maximum Fragment Size") - .valid(!invalidState) - .explanation("Property must be specified when Line Split Count is 0") - .build() - ); - return results; - } - - @Override - public Set<Relationship> getRelationships() { - return relationships; - } - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return properties; - } - - private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out, - final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException { - final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - - byte[] leadingBytes = leadingNewLineBytes; - int numLines = 0; - long totalBytes = 0L; - for (int i = 0; i < maxNumLines; i++) { - final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes); - final long bytes = eolMarker.getBytesConsumed(); - leadingBytes = eolMarker.getLeadingNewLineBytes(); - - if (includeLineDelimiter && out != null) { - if (leadingBytes != null) { - out.write(leadingBytes); - leadingBytes = null; - } - eolBuffer.drainTo(out); - } - totalBytes += bytes; - if (bytes <= 0) { - return numLines; - } - numLines++; - if (totalBytes >= maxByteCount) { - break; - } - } - return numLines; - } - - private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize, - final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException { - long bytesRead = 0L; - final ByteArrayOutputStream buffer; - if (out != null) { - buffer = new ByteArrayOutputStream(); - } else { - buffer = null; - } - byte[] bytesToWriteFirst = leadingNewLineBytes; - - in.mark(Integer.MAX_VALUE); - while (true) { - final int nextByte = in.read(); - - // if we hit end of stream we're done - if (nextByte == -1) { - if (buffer != null) { - buffer.writeTo(out); - buffer.close(); - } - return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst); // bytesToWriteFirst should be "null"? - } + private volatile boolean removeTrailingNewLines; - // Verify leading bytes do not violate size limitation - if (bytesToWriteFirst != null && (bytesToWriteFirst.length + bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) { - return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes); - } - // Write leadingNewLines, if appropriate - if ( buffer != null && includeLineDelimiter && bytesToWriteFirst != null) { - bytesRead += bytesToWriteFirst.length; - buffer.write(bytesToWriteFirst); - bytesToWriteFirst = null; - } - // buffer the output - bytesRead++; - if (buffer != null && nextByte != '\n' && nextByte != '\r') { - if (bytesToWriteFirst != null) { - buffer.write(bytesToWriteFirst); - } - bytesToWriteFirst = null; - eolBuffer.drainTo(buffer); - eolBuffer.clear(); - buffer.write(nextByte); - } + private volatile long maxSplitSize; - // check the size limit - if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar > 0) { - in.reset(); - if (buffer != null) { - buffer.close(); - } - return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes); - } + private volatile int lineCount; - // if we have a new line, then we're done - if (nextByte == '\n') { - if (buffer != null) { - buffer.writeTo(out); - buffer.close(); - eolBuffer.addEndOfLine(false, true); - } - return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst); - } + private volatile int headerLineCount; - // Determine if \n follows \r; in either case, end of line has been reached - if (nextByte == '\r') { - if (buffer != null) { - buffer.writeTo(out); - buffer.close(); - } - in.mark(1); - final int lookAheadByte = in.read(); - if (lookAheadByte == '\n') { - eolBuffer.addEndOfLine(true, true); - return new EndOfLineMarker(bytesRead + 1, eolBuffer, false, bytesToWriteFirst); - } else { - in.reset(); - eolBuffer.addEndOfLine(true, false); - return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst); - } - } - } - } + private volatile String headerMarker; - private SplitInfo locateSplitPoint(final InputStream in, final int numLines, final boolean keepAllNewLines, final long maxSize, - final long bufferedBytes) throws IOException { - final SplitInfo info = new SplitInfo(); - final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - int lastByte = -1; - info.lengthBytes = bufferedBytes; - long lastEolBufferLength = 0L; - - while ((info.lengthLines < numLines || (info.lengthLines == numLines && lastByte == '\r')) - && (((info.lengthBytes + eolBuffer.length()) < maxSize) || info.lengthLines == 0) - && eolBuffer.length() < maxSize) { - in.mark(1); - final int nextByte = in.read(); - // Check for \n following \r on last line - if (info.lengthLines == numLines && lastByte == '\r' && nextByte != '\n') { - in.reset(); - break; - } - switch (nextByte) { - case -1: - info.endOfStream = true; - if (keepAllNewLines) { - info.lengthBytes += eolBuffer.length(); - } - if (lastByte != '\r') { - info.lengthLines++; - } - info.bufferedBytes = 0; - return info; - case '\r': - eolBuffer.addEndOfLine(true, false); - info.lengthLines++; - info.bufferedBytes = 0; - break; - case '\n': - eolBuffer.addEndOfLine(false, true); - if (lastByte != '\r') { - info.lengthLines++; - } - info.bufferedBytes = 0; - break; - default: - if (eolBuffer.length() > 0) { - info.lengthBytes += eolBuffer.length(); - lastEolBufferLength = eolBuffer.length(); - eolBuffer.clear(); - } - info.lengthBytes++; - info.bufferedBytes++; - break; - } - lastByte = nextByte; - } - // if current line exceeds size and not keeping eol characters, remove previously applied eol characters - if ((info.lengthBytes + eolBuffer.length()) >= maxSize && !keepAllNewLines) { - info.lengthBytes -= lastEolBufferLength; - } - if (keepAllNewLines) { - info.lengthBytes += eolBuffer.length(); - } - return info; + @Override + public Set<Relationship> getRelationships() { + return Collections.unmodifiableSet(relationships); } - private int countHeaderLines(final ByteCountingInputStream in, - final String headerMarker) throws IOException { - int headerInfo = 0; - - final BufferedReader br = new BufferedReader(new InputStreamReader(in)); - in.mark(Integer.MAX_VALUE); - String line = br.readLine(); - while (line != null) { - // if line is not a header line, reset stream and return header counts - if (!line.startsWith(headerMarker)) { - in.reset(); - return headerInfo; - } else { - headerInfo++; - } - line = br.readLine(); - } - in.reset(); - return headerInfo; + /** + * + */ + @OnScheduled + public void onSchedule(ProcessContext context) { + this.removeTrailingNewLines = context.getProperty(REMOVE_TRAILING_NEWLINES).isSet() + ? context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean() : false; + this.maxSplitSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet() + ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE; + this.lineCount = context.getProperty(LINE_SPLIT_COUNT).asInteger(); + this.headerLineCount = context.getProperty(HEADER_LINE_COUNT).asInteger(); + this.headerMarker = context.getProperty(HEADER_MARKER).getValue(); } + /** + * Will split the incoming stream releasing all splits as FlowFile at once. + */ @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) { - final FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - - final ComponentLog logger = getLogger(); - final int headerCount = context.getProperty(HEADER_LINE_COUNT).asInteger(); - final int maxLineCount = (context.getProperty(LINE_SPLIT_COUNT).asInteger() == 0) - ? Integer.MAX_VALUE : context.getProperty(LINE_SPLIT_COUNT).asInteger(); - final long maxFragmentSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet() - ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE; - final String headerMarker = context.getProperty(HEADER_MARKER).getValue(); - final boolean includeLineDelimiter = !context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean(); - - final AtomicReference<String> errorMessage = new AtomicReference<>(null); - final ArrayList<SplitInfo> splitInfos = new ArrayList<>(); - - final long startNanos = System.nanoTime(); - final List<FlowFile> splits = new ArrayList<>(); - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream rawIn) throws IOException { - try (final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn); - final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) { - - long bufferedPartialLine = 0; - - // if we have header lines, copy them into a ByteArrayOutputStream - final ByteArrayOutputStream headerStream = new ByteArrayOutputStream(); - // Determine the number of lines of header, priority given to HEADER_LINE_COUNT property - int headerInfoLineCount = 0; - if (headerCount > 0) { - headerInfoLineCount = headerCount; - } else { - if (headerMarker != null) { - headerInfoLineCount = countHeaderLines(in, headerMarker); + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile != null) { + AtomicBoolean error = new AtomicBoolean(); + List<FlowFile> splitFlowFiles = new ArrayList<>(); + List<SplitInfo> computedSplitsInfo = new ArrayList<>(); + AtomicReference<SplitInfo> headerSplitInfoRef = new AtomicReference<>(); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(InputStream in) throws IOException { + TextLineDemarcator demarcator = new TextLineDemarcator(in); + SplitInfo splitInfo = null; + long startOffset = 0; + + // Compute fragment representing the header (if available) + long start = System.nanoTime(); + try { + if (SplitText.this.headerLineCount > 0) { + splitInfo = SplitText.this.computeHeader(demarcator, startOffset, SplitText.this.headerLineCount, null, null); + if (splitInfo.lineCount < SplitText.this.headerLineCount) { + error.set(true); + getLogger().error("Unable to split " + flowFile + " due to insufficient amount of header lines. Required " + + SplitText.this.headerLineCount + " but was " + splitInfo.lineCount + ". Routing to failure."); + } + } else if (SplitText.this.headerMarker != null) { + splitInfo = SplitText.this.computeHeader(demarcator, startOffset, Long.MAX_VALUE, SplitText.this.headerMarker.getBytes(StandardCharsets.UTF_8), null); } + headerSplitInfoRef.set(splitInfo); + } catch (IllegalStateException e) { + error.set(true); + getLogger().error(e.getMessage() + " Routing to failure."); } - final byte[] headerNewLineBytes; - final byte[] headerBytesWithoutTrailingNewLines; - if (headerInfoLineCount > 0) { - final int headerLinesCopied = readLines(in, headerInfoLineCount, Long.MAX_VALUE, headerStream, true, null); - - if (headerLinesCopied < headerInfoLineCount) { - errorMessage.set("Header Line Count is set to " + headerInfoLineCount + " but file had only " + headerLinesCopied + " lines"); - return; + // Compute and collect fragments representing the individual splits + if (!error.get()) { + if (headerSplitInfoRef.get() != null) { + startOffset = headerSplitInfoRef.get().length; } - - // Break header apart into trailing newlines and remaining text - final byte[] headerBytes = headerStream.toByteArray(); - int headerNewLineByteCount = 0; - for (int i = headerBytes.length - 1; i >= 0; i--) { - final byte headerByte = headerBytes[i]; - - if (headerByte == '\r' || headerByte == '\n') { - headerNewLineByteCount++; - } else { - break; - } + long preAccumulatedLength = startOffset; + while ((splitInfo = SplitText.this.nextSplit(demarcator, startOffset, SplitText.this.lineCount, splitInfo, preAccumulatedLength)) != null) { + computedSplitsInfo.add(splitInfo); + startOffset += splitInfo.length; } - - if (headerNewLineByteCount == 0) { - headerNewLineBytes = null; - headerBytesWithoutTrailingNewLines = headerBytes; - } else { - headerNewLineBytes = new byte[headerNewLineByteCount]; - System.arraycopy(headerBytes, headerBytes.length - headerNewLineByteCount, headerNewLineBytes, 0, headerNewLineByteCount); - - headerBytesWithoutTrailingNewLines = new byte[headerBytes.length - headerNewLineByteCount]; - System.arraycopy(headerBytes, 0, headerBytesWithoutTrailingNewLines, 0, headerBytes.length - headerNewLineByteCount); + long stop = System.nanoTime(); + if (getLogger().isDebugEnabled()) { + getLogger().debug("Computed splits in " + (stop - start) + " milliseconds."); } - } else { - headerBytesWithoutTrailingNewLines = null; - headerNewLineBytes = null; } - - while (true) { - if (headerInfoLineCount > 0) { - // if we have header lines, create a new FlowFile, copy the header lines to that file, - // and then start copying lines - final AtomicInteger linesCopied = new AtomicInteger(0); - final AtomicLong bytesCopied = new AtomicLong(0L); - FlowFile splitFile = session.create(flowFile); - try { - splitFile = session.write(splitFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream rawOut) throws IOException { - try (final BufferedOutputStream out = new BufferedOutputStream(rawOut); - final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(out)) { - countingOut.write(headerBytesWithoutTrailingNewLines); - //readLines has an offset of countingOut.getBytesWritten() to allow for header bytes written already - linesCopied.set(readLines(in, maxLineCount, maxFragmentSize - countingOut.getBytesWritten(), countingOut, - includeLineDelimiter, headerNewLineBytes)); - bytesCopied.set(countingOut.getBytesWritten()); - } - } - }); - splitFile = session.putAttribute(splitFile, SPLIT_LINE_COUNT, String.valueOf(linesCopied.get())); - splitFile = session.putAttribute(splitFile, FRAGMENT_SIZE, String.valueOf(bytesCopied.get())); - logger.debug("Created Split File {} with {} lines, {} bytes", new Object[]{splitFile, linesCopied.get(), bytesCopied.get()}); - } finally { - if (linesCopied.get() > 0) { - splits.add(splitFile); + } + }); + if (!error.get()) { + FlowFile headerFlowFile = null; + long headerCrlfLength = 0; + if (headerSplitInfoRef.get() != null) { + headerFlowFile = session.clone(flowFile, headerSplitInfoRef.get().startOffset, headerSplitInfoRef.get().length); + headerCrlfLength = headerSplitInfoRef.get().trimmedLength; + } + int fragmentIndex = 1; // set to 1 to preserve the existing behavior *only*. Perhaps should be deprecated to follow the 0,1,2... scheme + String fragmentId = UUID.randomUUID().toString(); + + if (computedSplitsInfo.size() == 0) { + FlowFile splitFlowFile = session.clone(flowFile, 0, headerFlowFile.getSize() - headerCrlfLength); + splitFlowFile = SplitText.this.updateAttributes(session, splitFlowFile, 0, splitFlowFile.getSize(), + fragmentId, fragmentIndex++, 0, flowFile.getAttribute(CoreAttributes.FILENAME.key())); + splitFlowFiles.add(splitFlowFile); + } else { + for (SplitInfo computedSplitInfo : computedSplitsInfo) { + long length = SplitText.this.removeTrailingNewLines ? computedSplitInfo.trimmedLength : computedSplitInfo.length; + boolean proceedWithClone = headerFlowFile != null || length > 0; + if (proceedWithClone) { + FlowFile splitFlowFile = null; + if (headerFlowFile != null) { + if (length > 0) { + splitFlowFile = session.clone(flowFile, computedSplitInfo.startOffset, length); + splitFlowFile = session.merge( Arrays.asList(new FlowFile[] { headerFlowFile, splitFlowFile }), splitFlowFile); } else { - // if the number of content lines is a multiple of the SPLIT_LINE_COUNT, - // the last flow file will contain just a header; don't forward that one - session.remove(splitFile); + splitFlowFile = session.clone(flowFile, 0, headerFlowFile.getSize() - headerCrlfLength); // trim the last CRLF if split consists of only HEADER } - } - - // Check for EOF - in.mark(1); - if (in.read() == -1) { - break; - } - in.reset(); - - } else { - // We have no header lines, so we can simply demarcate the original File via the - // ProcessSession#clone method. - long beforeReadingLines = in.getBytesConsumed() - bufferedPartialLine; - final SplitInfo info = locateSplitPoint(in, maxLineCount, includeLineDelimiter, maxFragmentSize, bufferedPartialLine); - if (context.getProperty(FRAGMENT_MAX_SIZE).isSet()) { - bufferedPartialLine = info.bufferedBytes; - } - if (info.endOfStream) { - // stream is out of data - if (info.lengthBytes > 0) { - info.offsetBytes = beforeReadingLines; - splitInfos.add(info); - final long procNanos = System.nanoTime() - startNanos; - final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS); - logger.debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; " - + "total splits = {}; total processing time = {} ms", - new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis}); - } - break; } else { - if (info.lengthBytes != 0) { - info.offsetBytes = beforeReadingLines; - info.lengthBytes -= bufferedPartialLine; - splitInfos.add(info); - final long procNanos = System.nanoTime() - startNanos; - final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS); - logger.debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; " - + "total splits = {}; total processing time = {} ms", - new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis}); - } + splitFlowFile = session.clone(flowFile, computedSplitInfo.startOffset, length); } + + splitFlowFile = SplitText.this.updateAttributes(session, splitFlowFile, computedSplitInfo.lineCount, splitFlowFile.getSize(), fragmentId, fragmentIndex++, + computedSplitsInfo.size(), flowFile.getAttribute(CoreAttributes.FILENAME.key())); + splitFlowFiles.add(splitFlowFile); } } } - } - }); - if (errorMessage.get() != null) { - logger.error("Unable to split {} due to {}; routing to failure", new Object[]{flowFile, errorMessage.get()}); - session.transfer(flowFile, REL_FAILURE); - if (!splits.isEmpty()) { - session.remove(splits); + getLogger().info("Split " + flowFile + " into " + splitFlowFiles.size() + " flow files" + (headerFlowFile != null ? " containing headers." : ".")); + if (headerFlowFile != null) { + session.remove(headerFlowFile); + } } - return; - } - if (!splitInfos.isEmpty()) { - // Create the splits - for (final SplitInfo info : splitInfos) { - FlowFile split = session.clone(flowFile, info.offsetBytes, info.lengthBytes); - split = session.putAttribute(split, SPLIT_LINE_COUNT, String.valueOf(info.lengthLines)); - split = session.putAttribute(split, FRAGMENT_SIZE, String.valueOf(info.lengthBytes)); - splits.add(split); + if (error.get()) { + session.transfer(flowFile, REL_FAILURE); + } else { + session.transfer(flowFile, REL_ORIGINAL); + session.transfer(splitFlowFiles, REL_SPLITS); } - } - finishFragmentAttributes(session, flowFile, splits); - - if (splits.size() > 10) { - logger.info("Split {} into {} files", new Object[]{flowFile, splits.size()}); } else { - logger.info("Split {} into {} files: {}", new Object[]{flowFile, splits.size(), splits}); + context.yield(); } - - session.transfer(flowFile, REL_ORIGINAL); - session.transfer(splits, REL_SPLITS); } - private void finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List<FlowFile> splits) { - final String originalFilename = source.getAttribute(CoreAttributes.FILENAME.key()); - - final String fragmentId = UUID.randomUUID().toString(); - final ArrayList<FlowFile> newList = new ArrayList<>(splits); - splits.clear(); - for (int i = 1; i <= newList.size(); i++) { - FlowFile ff = newList.get(i - 1); - final Map<String, String> attributes = new HashMap<>(); - attributes.put(FRAGMENT_ID, fragmentId); - attributes.put(FRAGMENT_INDEX, String.valueOf(i)); - attributes.put(FRAGMENT_COUNT, String.valueOf(newList.size())); - attributes.put(SEGMENT_ORIGINAL_FILENAME, originalFilename); - FlowFile newFF = session.putAllAttributes(ff, attributes); - splits.add(newFF); - } + /** + * + */ + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + List<ValidationResult> results = new ArrayList<>(); + boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 + && !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); + results.add(new ValidationResult.Builder().subject("Maximum Fragment Size").valid(!invalidState) + .explanation("Property must be specified when Line Split Count is 0").build()); + return results; } - private static class SplitInfo { - - public long offsetBytes; - public long lengthBytes; - public long lengthLines; - public long bufferedBytes; - public boolean endOfStream; - - public SplitInfo() { - this.offsetBytes = 0L; - this.lengthBytes = 0L; - this.lengthLines = 0L; - this.bufferedBytes = 0L; - this.endOfStream = false; - } + /** + * + */ + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return Collections.unmodifiableList(properties); } - public static class EndOfLineBuffer { - private static final byte CARRIAGE_RETURN = (byte) '\r'; - private static final byte NEWLINE = (byte) '\n'; - - private final BitSet buffer = new BitSet(); - private int index = 0; - - public void clear() { - index = 0; - } - - public void addEndOfLine(final boolean carriageReturn, final boolean newLine) { - buffer.set(index++, carriageReturn); - buffer.set(index++, newLine); - } - - private void drainTo(final OutputStream out) throws IOException { - for (int i = 0; i < index; i += 2) { - final boolean cr = buffer.get(i); - final boolean nl = buffer.get(i + 1); - - // we've consumed all data in the buffer - if (!cr && !nl) { - return; - } - - if (cr) { - out.write(CARRIAGE_RETURN); + /** --- End diff -- Should remove blank javadoc lines
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---