Github user markobean commented on a diff in the pull request: https://github.com/apache/nifi/pull/135#discussion_r50989494 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java --- @@ -230,100 +270,112 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final ProcessorLog logger = getLogger(); final int headerCount = context.getProperty(HEADER_LINE_COUNT).asInteger(); final int splitCount = context.getProperty(LINE_SPLIT_COUNT).asInteger(); + final double maxFragmentSize; + if (context.getProperty(FRAGMENT_MAX_SIZE).isSet()) { + maxFragmentSize = context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B); + } else { + maxFragmentSize = Integer.MAX_VALUE; + } + final String headerMarker = context.getProperty(HEADER_MARKER).getValue(); final boolean removeTrailingNewlines = context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean(); - final ObjectHolder<String> errorMessage = new ObjectHolder<>(null); - final ArrayList<SplitInfo> splitInfos = new ArrayList<>(); - final long startNanos = System.nanoTime(); final List<FlowFile> splits = new ArrayList<>(); + session.read(flowFile, new InputStreamCallback() { @Override public void process(final InputStream rawIn) throws IOException { try (final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn); final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) { - // if we have header lines, copy them into a ByteArrayOutputStream + // Identify header, if any final ByteArrayOutputStream headerStream = new ByteArrayOutputStream(); - final int headerLinesCopied = readLines(in, headerCount, headerStream, true); - if (headerLinesCopied < headerCount) { - errorMessage.set("Header Line Count is set to " + headerCount + " but file had only " + headerLinesCopied + " lines"); + final SplitInfo headerInfo = readHeader(headerCount, headerMarker, in, headerStream, true); + if (headerInfo.lengthLines < headerCount) { + errorMessage.set("Header Line Count is set to " + headerCount + " but file had only " + + headerInfo.lengthLines + " lines"); return; } while (true) { - if (headerCount > 0) { - // if we have header lines, create a new FlowFile, copy the header lines to that file, - // and then start copying lines - final IntegerHolder linesCopied = new IntegerHolder(0); - FlowFile splitFile = session.create(flowFile); - try { - splitFile = session.write(splitFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream rawOut) throws IOException { - try (final BufferedOutputStream out = new BufferedOutputStream(rawOut)) { + FlowFile splitFile = session.create(flowFile); + final SplitInfo flowFileInfo = new SplitInfo(); + + // if we have header lines, write them out + // and then start copying lines + try { + splitFile = session.write(splitFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream rawOut) throws IOException { + try (final BufferedOutputStream out = new BufferedOutputStream(rawOut)) { + long lineCount = 0; + long byteCount = 0; + // Process header + if (headerInfo.lengthLines > 0) { + flowFileInfo.lengthBytes = headerInfo.lengthBytes; + byteCount = headerInfo.lengthBytes; headerStream.writeTo(out); - linesCopied.set(readLines(in, splitCount, out, !removeTrailingNewlines)); + } + + // Process body + while (true) { + final ByteArrayOutputStream dataStream = new ByteArrayOutputStream(); + in.mark(1); --- End diff -- This "in.mark()" is used to rollback the entire line if its inclusion in the current split exceeds maxFragmentSize. (Reset on line 336.) However, its readlimit should not be 1; it should be the maximum possible length of a line. Since the length of a given line is unknown, recommend using "Integer.MAX_VALUE."
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---