Thanks for the response. It gave my additional context such that I can proceed with more confidence. I've been playing with creating additional sessions and I believe that it will work for my use case. But I will keep the duplicate flowfile potential issue in mind, though.
Thanks again. Devin On Sun, Mar 13, 2016 at 12:40 PM, Mark Payne <marka...@hotmail.com> wrote: > Devin, > > We do realize that we have some work to do in order to make it so that > a single Processor can buffer up hundreds of thousands or more FlowFiles. > The SplitText processor is very popular and suffers from this exact same > problem. > We want to have a mechanism for swapping those out of the Java Heap, > similar > to how we do when we have millions of FlowFiles sitting in a queue. There > is a ticket > here [1] to address this. However, this has turned out to be very time > consuming, and > not quite a straight-forward as we had hoped, so it's not been finished up > yet. > > In the meantime, you can use the approach that you described, using two > different > Process Sessions, by extending AbstractSessionFactoryProcessor instead of > AbstractProcessor. The downside to this approach, though, is that when > NiFi is restarted, > you could potentially have a lot of data duplication. > > As an example, let's imagine that you create a ProcessSession and use it > to create 10,000 FlowFiles > and then commit the session and create a new one. If you have an incoming > FlowFiles that has > 1 million rows in it, you may create 800,000 FlowFiles and send them out > and then NiFi gets restarted. > In this case, you will pick up the original FlowFile and begin processing > it again. But you've already sent > out those 800,000 FlowFiles. Depending on your requirements, this may or > may not be acceptable. > > One option that you could use is just to document that this behavior > exists and that SplitText should be > used ahead of you Processor in order to split the content into 10,000 line > chunks. This would avoid the > heap exhaustion. > > Another possible solution that you could use, though it's not as pretty as > I'd like: Process up to 10,000 FlowFiles > from an input FlowFile. Then, add an attribute to the input FlowFile > indicating your progress (for instance, > add an attribute named "rows.converted" and then do > "session.transfer(flowFile);" This will transfer the FlowFile > back into its input queue. You can then commit the session. Then, when you > call session.get() to get an input > FlowFile again, you can check for that attribute and skip that many rows. > This way, you won't end up with > data duplication. The downside here is that you would end up reading the > first N rows each time and ignoring > the content which can be expensive. A more optimized approach would be to > wrap the InputStream in > a ByteCountingInputStream and record the number of bytes consumed and use > that as an attribute, and then > for each subsequent iteration use StreamUtils.skip() to skip the > appropriate number of bytes. > > I know there's a lot of info here. Let me know if anything doesn't make > sense. > > I hope this helps! > -Mark > > > [1] https://issues.apache.org/jira/browse/NIFI-1008 < > https://issues.apache.org/jira/browse/NIFI-1008> > > > > On Mar 11, 2016, at 5:29 PM, Devin Fisher < > devin.fis...@perfectsearchcorp.com> wrote: > > > > I'm creating a processor that will read a customer csv and will create a > > new flowfile for each line in the form of XML. The CSV file will be quite > > large (100s of thousands of lines). I would like to commit a reasonable > > amount from time to time so that they can flow down to other processors. > > But looking at similar processors SplitText and SplitXml they save up all > > the created flowfiles and release them all at the end. In some trials, > I'm > > running out of memory doing that. But I can't commit the session early > > because I'm still reading the original CSV file. Is there a workflow > where > > I can read the incoming CSV flowfile but still release created flowfiles? > > I'm thinking of not using AbstractProcessor and instead > > use AbstractSessionFactoryProcessor and create two different sessions but > > is that advisable or possible? > > > > Devin > >