Hello,

Sorry to revive an older thread. I just created a feature ticket
<https://issues.apache.org/jira/browse/NIFI-8603>for using non-heap
collections in the StandardProcessSession based on our conversation here. I
think it would be beneficial if the session behaved similarly to the nifi
queues by having the ability to store at least some of the FlowFiles in the
session off-heap.

Thanks,
Eric

On Tue, Mar 9, 2021 at 10:25 AM Eric Secules <esecu...@gmail.com> wrote:

> Hi Mark,
>
> The NPE Came after trying to retrieve the parent flowfile from the
> outSession's records map. The parent flowfile was not migrated to
> outSession. I don't think it's right to migrate the parent flowfile to the
> output session as well.
>
> Thanks,
> Eric
>
> On Tue, Mar 9, 2021 at 10:02 AM Eric Secules <esecu...@gmail.com> wrote:
>
>> Hi Mark,
>>
>> Thanks for the details! With that in mind I can also make the operation
>> more transactional by using wait and notify (won't solve the duplication on
>> restart issue).
>>
>> I tried the example code and I got an NPE inside commit(). I initially
>> thought this was because I was running version 1.11.4. I  upgraded to NiFi
>> 1.13.0 and the problem still persists where repoRecord is Null on line 786
>> in StandardProcessSession.java
>>
>> Stack Trace:
>>
>>> 2021-03-09 17:41:44,013 ERROR [Timer-Driven Process Thread-7]
>>> c.m.p.p.MyProcessor MyProcessor[id=f3439a7a-cb5a-3c76-121e-60c448d1f910]
>>> Failure FF: null: java.lang.NullPointerException
>>> java.lang.NullPointerException: null
>>> at
>>> org.apache.nifi.controller.repository.StandardProcessSession.updateEventContentClaims(StandardProcessSession.java:786)
>>> at
>>> org.apache.nifi.controller.repository.StandardProcessSession.updateProvenanceRepo(StandardProcessSession.java:600)
>>> at
>>> org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:353)
>>> at
>>> org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:332)
>>>
>>> ... My code calling outSession.comit()
>>>
>>
>> This may be my issue since my code doesn't look exactly like the example,
>> but it would be good to have a better exception than NPE.
>>
>> Thanks,
>> Eric
>>
>>
>> On Mon., Mar. 8, 2021, 1:30 p.m. Mark Payne, <marka...@hotmail.com>
>> wrote:
>>
>>> Eric,
>>>
>>> The session holds onto all ‘active flowfiles’ as well as other
>>> information about them, such as which relationship they’ve been transferred
>>> to, which attributes have been added, removed, etc. So if you have a
>>> separate session for each outbound FlowFile, each time that session is
>>> committed, all those FlowFiles can be removed from the session and placed
>>> in the next queue. This is important because the session holds all
>>> FlowFiles it knows about in memory, but the queues will swap the flowfiles
>>> out - writing them to disk and then dropping them from memory/heap. This
>>> allows us to hold many millions of FlowFiles within NiFi at a time.
>>> However, if you’re trying to hold a million FlowFiles in heap, you’re going
>>> to use a huge amount of heap. This is predominantly due to the HashMap that
>>> is used to store attributes. Those can quickly use up a huge amount of heap.
>>>
>>> Thanks
>>> -Mark
>>>
>>>
>>> > On Mar 8, 2021, at 12:53 PM, Eric Secules <esecu...@gmail.com> wrote:
>>> >
>>> > Hi Mark,
>>> >
>>> > Thanks for the example code and the considerations. I see the error I
>>> made
>>> > before.
>>> > If I were to use the standard way and put this all in one session what
>>> are
>>> > the memory characteristics? What is tracked in memory for a session
>>> and is
>>> > this actually any different (in overall memory use) from my approach of
>>> > using a separate session for output?
>>> >
>>> > Thanks,
>>> > Eric
>>> >
>>> > On Sat, Mar 6, 2021 at 1:22 PM Mark Payne <marka...@hotmail.com>
>>> wrote:
>>> >
>>> >> Hi Eric,
>>> >>
>>> >> We should definitely throw a better Exception there, rather than
>>> >> NullPointerException. But what you’re looking to do can be done. It’s
>>> >> slightly more nuanced than that, though. What you’ll need to do, in
>>> order
>>> >> to maintain the lineage, is to create the child FlowFile in the
>>> session
>>> >> that owns the parent. You can then “migrate” the child FlowFile to a
>>> new
>>> >> session. Something to the effect of:
>>> >>
>>> >> -----------------
>>> >> ProcessSession inSession = sessionFactory.create();
>>> >> FlowFile flowFile = inSession.get();
>>> >> if (flowFile == null) {
>>> >>  return;
>>> >> }
>>> >>
>>> >> ProcessSession outSession = sessionFactory.create();
>>> >>
>>> >> try (InputStream in = inSession.read(flowFile)) {
>>> >>  Record record;
>>> >>  while ((record = getRecord(in)) {
>>> >>      FlowFile child = inSession.create(flowFile);
>>> >>      try (OutputStream out = inSession.write(child)) {
>>> >>                // write some data.
>>> >>      }
>>> >>
>>> >>      inSession.migrate(outSession, Collections.singleton(child));
>>> >>      outSession.transfer(child, REL_SUCCESS);
>>> >>      outSession.commit();
>>> >>  }
>>> >> }
>>> >>
>>> >> inSession.transfer(flowFIle, REL_ORIGINAL);
>>> >> inSession.commit();
>>> >> ———————
>>> >>
>>> >> Some things to keep in mind, though:
>>> >> - To get a session factory, you should extend
>>> >> AbstractSessionFactoryProcessor instead of AbstractProcessor.
>>> >> - This means you need to ensure that you always explicitly
>>> commit/rollback
>>> >> sessions and handle Exceptions properly, ensure that you account for
>>> any
>>> >> FlowFile that is created, etc.
>>> >> - If your incoming FlowFile has 1 million FlowFiles, and you process
>>> >> 900,000 of them and then NiFi is restarted, then on restart it’ll
>>> reprocess
>>> >> the incoming FlowFile, so you’ll end up with 900,000 duplicates.
>>> >> - Performance will be very subpar, as you’ll be creating and
>>> committing up
>>> >> to 1 million sessions per FlowFile. Perhaps this is okay if you only
>>> have
>>> >> to process one of these files per 24 hours. But it’s worth
>>> considering.
>>> >>
>>> >> Thanks
>>> >> -Mark
>>> >>
>>> >>
>>> >>
>>> >>> On Mar 5, 2021, at 5:02 PM, Eric Secules <esecu...@gmail.com> wrote:
>>> >>>
>>> >>> Hi Joe,
>>> >>>
>>> >>> I was able to get it working by using one session to manage the
>>> parent
>>> >>> flowfile and then one session per split file. I couldn't do
>>> >>> splitSession.create(inputFF), was getting an NPE and another
>>> exception,
>>> >> but
>>> >>> it worked with splitSession.create() the downside is I lose the
>>> lineage
>>> >>> connection to the parent.
>>> >>>
>>> >>> 2021-03-05 20:48:35,435 ERROR [Timer-Driven Process Thread-1]
>>> >>>> c.m.p.p.MyProcessor
>>> MyProcessor[id=0034a60d-0178-1000-7f91-47cf50e242e2]
>>> >>>> Failure FF: null: java.lang.NullPointerException
>>> >>>> java.lang.NullPointerException: null
>>> >>>> at
>>> >>>>
>>> >>
>>> org.apache.nifi.controller.repository.StandardProcessSession.updateEventContentClaims(StandardProcessSession.java:788)
>>> >>>> at
>>> >>>>
>>> >>
>>> org.apache.nifi.controller.repository.StandardProcessSession.registerForkEvent(StandardProcessSession.java:1852)
>>> >>>> at
>>> >>>>
>>> >>
>>> org.apache.nifi.controller.repository.StandardProcessSession.create(StandardProcessSession.java:1737)
>>> >>>>
>>> >>>
>>> >>> This log is from a 1.11.0 build of NiFi
>>> >>>
>>> >>> In my case the input is a proprietary text file with a gigantic
>>> schema
>>> >>> definition which we translate to JSON and split the results all at
>>> once.
>>> >> I
>>> >>> don't know whether record-based processing works for us because of
>>> how
>>> >>> fluid the json schema is.
>>> >>>
>>> >>> Thanks,
>>> >>> Eric
>>> >>>
>>> >>> On Fri, Mar 5, 2021 at 1:34 PM Joe Witt <joe.w...@gmail.com> wrote:
>>> >>>
>>> >>>> Eric,
>>> >>>>
>>> >>>> My point is that it sounds like you get handed an original document
>>> >>>> which is a JSON document.  It contains up to a million elements
>>> within
>>> >>>> it.  You would implement a record reader for your original doc
>>> >>>> structure and then you can use any of our current writers/etc..  But
>>> >>>> the important part is avoiding creating splits unless/until totally
>>> >>>> necessary/etc..
>>> >>>>
>>> >>>> Anyway if you go the route you're thinking of I think you'll need a
>>> >>>> different session for reading (single session for the entire source
>>> >>>> file) and a different session for all the splits you'll create. But
>>> I
>>> >>>> might be over complicating that.  MarkP could give better input.
>>> >>>>
>>> >>>> Thanks
>>> >>>>
>>> >>>> On Fri, Mar 5, 2021 at 2:18 PM Eric Secules <esecu...@gmail.com>
>>> wrote:
>>> >>>>>
>>> >>>>> Hi Joe,
>>> >>>>>
>>> >>>>> For my use case partial results are okay.
>>> >>>>> The files may contain up to a million records. But we have like a
>>> day
>>> >> to
>>> >>>>> process it. We will consider record-based processing. It might be a
>>> >>>> longer
>>> >>>>> task to convert our flows to consume records instead of single
>>> files.
>>> >>>>> Will I need to have multiple sessions to handle all this?
>>> >>>>>
>>> >>>>> Thanks,
>>> >>>>> Eric
>>> >>>>>
>>> >>>>> On Fri, Mar 5, 2021 at 12:30 PM Joe Witt <joe.w...@gmail.com>
>>> wrote:
>>> >>>>>
>>> >>>>>> Eric
>>> >>>>>>
>>> >>>>>> The ProcessSession follows a unit of work pattern.  You can do a
>>> lot
>>> >>>>>> of things but until you commit the session it wont actually
>>> commit the
>>> >>>>>> change(s).  So if you want the behavior you describe call commit
>>> after
>>> >>>>>> transfer each time.  This is done automatically for you in most
>>> cases
>>> >>>>>> but you can call it to control the boundary.  Just remember you
>>> risk
>>> >>>>>> partial results then.  Consider you're reading the input file
>>> which
>>> >>>>>> contains 100 records lets say.  On record 51 there is a processing
>>> >>>>>> issue.  What happens then?    I'd also suggest this pattern
>>> generally
>>> >>>>>> results in poor performance.  Can you not use the record
>>> >>>>>> reader/writers to accomplish this so you can avoid turning it
>>> into a
>>> >>>>>> bunch of tiny flowfiles?
>>> >>>>>>
>>> >>>>>> Thanks
>>> >>>>>>
>>> >>>>>> On Fri, Mar 5, 2021 at 1:19 PM Eric Secules <esecu...@gmail.com>
>>> >>>> wrote:
>>> >>>>>>>
>>> >>>>>>> Hello,
>>> >>>>>>>
>>> >>>>>>> I am trying to write a processor which parses an input file and
>>> >>>> emits one
>>> >>>>>>> JSON flowfile for each record in the input file. Currently we're
>>> >>>> calling
>>> >>>>>>> session.transfer() once we encounter a fragment we want to emit.
>>> But
>>> >>>> it's
>>> >>>>>>> not sending the new flowfiles to the next processor as it
>>> processes
>>> >>>> the
>>> >>>>>>> input flowfile. Instead it's holding everything until the input
>>> is
>>> >>>> fully
>>> >>>>>>> processed and releasing it all at once. Is there some way I can
>>> >>>> write the
>>> >>>>>>> processor to emit flowfiles as soon as possible rather than
>>> waiting
>>> >>>> for
>>> >>>>>>> everything to succeed?
>>> >>>>>>>
>>> >>>>>>> Thanks,
>>> >>>>>>> Eric
>>> >>>>>>
>>> >>>>
>>> >>
>>> >>
>>>
>>>

Reply via email to