Devin, We do realize that we have some work to do in order to make it so that a single Processor can buffer up hundreds of thousands or more FlowFiles. The SplitText processor is very popular and suffers from this exact same problem. We want to have a mechanism for swapping those out of the Java Heap, similar to how we do when we have millions of FlowFiles sitting in a queue. There is a ticket here [1] to address this. However, this has turned out to be very time consuming, and not quite a straight-forward as we had hoped, so it's not been finished up yet.
In the meantime, you can use the approach that you described, using two different Process Sessions, by extending AbstractSessionFactoryProcessor instead of AbstractProcessor. The downside to this approach, though, is that when NiFi is restarted, you could potentially have a lot of data duplication. As an example, let's imagine that you create a ProcessSession and use it to create 10,000 FlowFiles and then commit the session and create a new one. If you have an incoming FlowFiles that has 1 million rows in it, you may create 800,000 FlowFiles and send them out and then NiFi gets restarted. In this case, you will pick up the original FlowFile and begin processing it again. But you've already sent out those 800,000 FlowFiles. Depending on your requirements, this may or may not be acceptable. One option that you could use is just to document that this behavior exists and that SplitText should be used ahead of you Processor in order to split the content into 10,000 line chunks. This would avoid the heap exhaustion. Another possible solution that you could use, though it's not as pretty as I'd like: Process up to 10,000 FlowFiles from an input FlowFile. Then, add an attribute to the input FlowFile indicating your progress (for instance, add an attribute named "rows.converted" and then do "session.transfer(flowFile);" This will transfer the FlowFile back into its input queue. You can then commit the session. Then, when you call session.get() to get an input FlowFile again, you can check for that attribute and skip that many rows. This way, you won't end up with data duplication. The downside here is that you would end up reading the first N rows each time and ignoring the content which can be expensive. A more optimized approach would be to wrap the InputStream in a ByteCountingInputStream and record the number of bytes consumed and use that as an attribute, and then for each subsequent iteration use StreamUtils.skip() to skip the appropriate number of bytes. I know there's a lot of info here. Let me know if anything doesn't make sense. I hope this helps! -Mark [1] https://issues.apache.org/jira/browse/NIFI-1008 <https://issues.apache.org/jira/browse/NIFI-1008> > On Mar 11, 2016, at 5:29 PM, Devin Fisher > <devin.fis...@perfectsearchcorp.com> wrote: > > I'm creating a processor that will read a customer csv and will create a > new flowfile for each line in the form of XML. The CSV file will be quite > large (100s of thousands of lines). I would like to commit a reasonable > amount from time to time so that they can flow down to other processors. > But looking at similar processors SplitText and SplitXml they save up all > the created flowfiles and release them all at the end. In some trials, I'm > running out of memory doing that. But I can't commit the session early > because I'm still reading the original CSV file. Is there a workflow where > I can read the incoming CSV flowfile but still release created flowfiles? > I'm thinking of not using AbstractProcessor and instead > use AbstractSessionFactoryProcessor and create two different sessions but > is that advisable or possible? > > Devin