Devin,

We do realize that we have some work to do in order to make it so that
a single Processor can buffer up hundreds of thousands or more FlowFiles.
The SplitText processor is very popular and suffers from this exact same 
problem.
We want to have a mechanism for swapping those out of the Java Heap, similar
to how we do when we have millions of FlowFiles sitting in a queue. There is a 
ticket
here [1] to address this. However, this has turned out to be very time 
consuming, and
not quite a straight-forward as we had hoped, so it's not been finished up yet.

In the meantime, you can use the approach that you described, using two 
different
Process Sessions, by extending AbstractSessionFactoryProcessor instead of
AbstractProcessor. The downside to this approach, though, is that when NiFi is 
restarted,
you could potentially have a lot of data duplication.

As an example, let's imagine that you create a ProcessSession and use it to 
create 10,000 FlowFiles
and then commit the session and create a new one. If you have an incoming 
FlowFiles that has
1 million rows in it, you may create 800,000 FlowFiles and send them out and 
then NiFi gets restarted.
In this case, you will pick up the original FlowFile and begin processing it 
again. But you've already sent
out those 800,000 FlowFiles. Depending on your requirements, this may or may 
not be acceptable.

One option that you could use is just to document that this behavior exists and 
that SplitText should be
used ahead of you Processor in order to split the content into 10,000 line 
chunks. This would avoid the
heap exhaustion.

Another possible solution that you could use, though it's not as pretty as I'd 
like: Process up to 10,000 FlowFiles
from an input FlowFile. Then, add an attribute to the input FlowFile indicating 
your progress (for instance,
add an attribute named "rows.converted" and then do 
"session.transfer(flowFile);" This will transfer the FlowFile
back into its input queue. You can then commit the session. Then, when you call 
session.get() to get an input
FlowFile again, you can check for that attribute and skip that many rows. This 
way, you won't end up with
data duplication. The downside here is that you would end up reading the first 
N rows each time and ignoring
the content which can be expensive. A more optimized approach would be to wrap 
the InputStream in
a ByteCountingInputStream and record the number of bytes consumed and use that 
as an attribute, and then
for each subsequent iteration use StreamUtils.skip() to skip the appropriate 
number of bytes.

I know there's a lot of info here. Let me know if anything doesn't make sense.

I hope this helps!
-Mark


[1] https://issues.apache.org/jira/browse/NIFI-1008 
<https://issues.apache.org/jira/browse/NIFI-1008>


> On Mar 11, 2016, at 5:29 PM, Devin Fisher 
> <devin.fis...@perfectsearchcorp.com> wrote:
> 
> I'm creating a processor that will read a customer csv and will create a
> new flowfile for each line in the form of XML. The CSV file will be quite
> large (100s of thousands of lines). I would like to commit a reasonable
> amount from time to time so that they can flow down to other processors.
> But looking at similar processors SplitText and SplitXml they save up all
> the created flowfiles and release them all at the end.  In some trials, I'm
> running out of memory doing that. But I can't commit the session early
> because I'm still reading the original CSV file.  Is there a workflow where
> I can read the incoming CSV flowfile but still release created flowfiles?
> I'm thinking of not using AbstractProcessor and instead
> use AbstractSessionFactoryProcessor and create two different sessions but
> is that advisable or possible?
> 
> Devin

Reply via email to