Hello, Sorry to revive an older thread. I just created a feature ticket <https://issues.apache.org/jira/browse/NIFI-8603>for using non-heap collections in the StandardProcessSession based on our conversation here. I think it would be beneficial if the session behaved similarly to the nifi queues by having the ability to store at least some of the FlowFiles in the session off-heap.
Thanks, Eric On Tue, Mar 9, 2021 at 10:25 AM Eric Secules <esecu...@gmail.com> wrote: > Hi Mark, > > The NPE Came after trying to retrieve the parent flowfile from the > outSession's records map. The parent flowfile was not migrated to > outSession. I don't think it's right to migrate the parent flowfile to the > output session as well. > > Thanks, > Eric > > On Tue, Mar 9, 2021 at 10:02 AM Eric Secules <esecu...@gmail.com> wrote: > >> Hi Mark, >> >> Thanks for the details! With that in mind I can also make the operation >> more transactional by using wait and notify (won't solve the duplication on >> restart issue). >> >> I tried the example code and I got an NPE inside commit(). I initially >> thought this was because I was running version 1.11.4. I upgraded to NiFi >> 1.13.0 and the problem still persists where repoRecord is Null on line 786 >> in StandardProcessSession.java >> >> Stack Trace: >> >>> 2021-03-09 17:41:44,013 ERROR [Timer-Driven Process Thread-7] >>> c.m.p.p.MyProcessor MyProcessor[id=f3439a7a-cb5a-3c76-121e-60c448d1f910] >>> Failure FF: null: java.lang.NullPointerException >>> java.lang.NullPointerException: null >>> at >>> org.apache.nifi.controller.repository.StandardProcessSession.updateEventContentClaims(StandardProcessSession.java:786) >>> at >>> org.apache.nifi.controller.repository.StandardProcessSession.updateProvenanceRepo(StandardProcessSession.java:600) >>> at >>> org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:353) >>> at >>> org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:332) >>> >>> ... My code calling outSession.comit() >>> >> >> This may be my issue since my code doesn't look exactly like the example, >> but it would be good to have a better exception than NPE. >> >> Thanks, >> Eric >> >> >> On Mon., Mar. 8, 2021, 1:30 p.m. Mark Payne, <marka...@hotmail.com> >> wrote: >> >>> Eric, >>> >>> The session holds onto all ‘active flowfiles’ as well as other >>> information about them, such as which relationship they’ve been transferred >>> to, which attributes have been added, removed, etc. So if you have a >>> separate session for each outbound FlowFile, each time that session is >>> committed, all those FlowFiles can be removed from the session and placed >>> in the next queue. This is important because the session holds all >>> FlowFiles it knows about in memory, but the queues will swap the flowfiles >>> out - writing them to disk and then dropping them from memory/heap. This >>> allows us to hold many millions of FlowFiles within NiFi at a time. >>> However, if you’re trying to hold a million FlowFiles in heap, you’re going >>> to use a huge amount of heap. This is predominantly due to the HashMap that >>> is used to store attributes. Those can quickly use up a huge amount of heap. >>> >>> Thanks >>> -Mark >>> >>> >>> > On Mar 8, 2021, at 12:53 PM, Eric Secules <esecu...@gmail.com> wrote: >>> > >>> > Hi Mark, >>> > >>> > Thanks for the example code and the considerations. I see the error I >>> made >>> > before. >>> > If I were to use the standard way and put this all in one session what >>> are >>> > the memory characteristics? What is tracked in memory for a session >>> and is >>> > this actually any different (in overall memory use) from my approach of >>> > using a separate session for output? >>> > >>> > Thanks, >>> > Eric >>> > >>> > On Sat, Mar 6, 2021 at 1:22 PM Mark Payne <marka...@hotmail.com> >>> wrote: >>> > >>> >> Hi Eric, >>> >> >>> >> We should definitely throw a better Exception there, rather than >>> >> NullPointerException. But what you’re looking to do can be done. It’s >>> >> slightly more nuanced than that, though. What you’ll need to do, in >>> order >>> >> to maintain the lineage, is to create the child FlowFile in the >>> session >>> >> that owns the parent. You can then “migrate” the child FlowFile to a >>> new >>> >> session. Something to the effect of: >>> >> >>> >> ----------------- >>> >> ProcessSession inSession = sessionFactory.create(); >>> >> FlowFile flowFile = inSession.get(); >>> >> if (flowFile == null) { >>> >> return; >>> >> } >>> >> >>> >> ProcessSession outSession = sessionFactory.create(); >>> >> >>> >> try (InputStream in = inSession.read(flowFile)) { >>> >> Record record; >>> >> while ((record = getRecord(in)) { >>> >> FlowFile child = inSession.create(flowFile); >>> >> try (OutputStream out = inSession.write(child)) { >>> >> // write some data. >>> >> } >>> >> >>> >> inSession.migrate(outSession, Collections.singleton(child)); >>> >> outSession.transfer(child, REL_SUCCESS); >>> >> outSession.commit(); >>> >> } >>> >> } >>> >> >>> >> inSession.transfer(flowFIle, REL_ORIGINAL); >>> >> inSession.commit(); >>> >> ——————— >>> >> >>> >> Some things to keep in mind, though: >>> >> - To get a session factory, you should extend >>> >> AbstractSessionFactoryProcessor instead of AbstractProcessor. >>> >> - This means you need to ensure that you always explicitly >>> commit/rollback >>> >> sessions and handle Exceptions properly, ensure that you account for >>> any >>> >> FlowFile that is created, etc. >>> >> - If your incoming FlowFile has 1 million FlowFiles, and you process >>> >> 900,000 of them and then NiFi is restarted, then on restart it’ll >>> reprocess >>> >> the incoming FlowFile, so you’ll end up with 900,000 duplicates. >>> >> - Performance will be very subpar, as you’ll be creating and >>> committing up >>> >> to 1 million sessions per FlowFile. Perhaps this is okay if you only >>> have >>> >> to process one of these files per 24 hours. But it’s worth >>> considering. >>> >> >>> >> Thanks >>> >> -Mark >>> >> >>> >> >>> >> >>> >>> On Mar 5, 2021, at 5:02 PM, Eric Secules <esecu...@gmail.com> wrote: >>> >>> >>> >>> Hi Joe, >>> >>> >>> >>> I was able to get it working by using one session to manage the >>> parent >>> >>> flowfile and then one session per split file. I couldn't do >>> >>> splitSession.create(inputFF), was getting an NPE and another >>> exception, >>> >> but >>> >>> it worked with splitSession.create() the downside is I lose the >>> lineage >>> >>> connection to the parent. >>> >>> >>> >>> 2021-03-05 20:48:35,435 ERROR [Timer-Driven Process Thread-1] >>> >>>> c.m.p.p.MyProcessor >>> MyProcessor[id=0034a60d-0178-1000-7f91-47cf50e242e2] >>> >>>> Failure FF: null: java.lang.NullPointerException >>> >>>> java.lang.NullPointerException: null >>> >>>> at >>> >>>> >>> >> >>> org.apache.nifi.controller.repository.StandardProcessSession.updateEventContentClaims(StandardProcessSession.java:788) >>> >>>> at >>> >>>> >>> >> >>> org.apache.nifi.controller.repository.StandardProcessSession.registerForkEvent(StandardProcessSession.java:1852) >>> >>>> at >>> >>>> >>> >> >>> org.apache.nifi.controller.repository.StandardProcessSession.create(StandardProcessSession.java:1737) >>> >>>> >>> >>> >>> >>> This log is from a 1.11.0 build of NiFi >>> >>> >>> >>> In my case the input is a proprietary text file with a gigantic >>> schema >>> >>> definition which we translate to JSON and split the results all at >>> once. >>> >> I >>> >>> don't know whether record-based processing works for us because of >>> how >>> >>> fluid the json schema is. >>> >>> >>> >>> Thanks, >>> >>> Eric >>> >>> >>> >>> On Fri, Mar 5, 2021 at 1:34 PM Joe Witt <joe.w...@gmail.com> wrote: >>> >>> >>> >>>> Eric, >>> >>>> >>> >>>> My point is that it sounds like you get handed an original document >>> >>>> which is a JSON document. It contains up to a million elements >>> within >>> >>>> it. You would implement a record reader for your original doc >>> >>>> structure and then you can use any of our current writers/etc.. But >>> >>>> the important part is avoiding creating splits unless/until totally >>> >>>> necessary/etc.. >>> >>>> >>> >>>> Anyway if you go the route you're thinking of I think you'll need a >>> >>>> different session for reading (single session for the entire source >>> >>>> file) and a different session for all the splits you'll create. But >>> I >>> >>>> might be over complicating that. MarkP could give better input. >>> >>>> >>> >>>> Thanks >>> >>>> >>> >>>> On Fri, Mar 5, 2021 at 2:18 PM Eric Secules <esecu...@gmail.com> >>> wrote: >>> >>>>> >>> >>>>> Hi Joe, >>> >>>>> >>> >>>>> For my use case partial results are okay. >>> >>>>> The files may contain up to a million records. But we have like a >>> day >>> >> to >>> >>>>> process it. We will consider record-based processing. It might be a >>> >>>> longer >>> >>>>> task to convert our flows to consume records instead of single >>> files. >>> >>>>> Will I need to have multiple sessions to handle all this? >>> >>>>> >>> >>>>> Thanks, >>> >>>>> Eric >>> >>>>> >>> >>>>> On Fri, Mar 5, 2021 at 12:30 PM Joe Witt <joe.w...@gmail.com> >>> wrote: >>> >>>>> >>> >>>>>> Eric >>> >>>>>> >>> >>>>>> The ProcessSession follows a unit of work pattern. You can do a >>> lot >>> >>>>>> of things but until you commit the session it wont actually >>> commit the >>> >>>>>> change(s). So if you want the behavior you describe call commit >>> after >>> >>>>>> transfer each time. This is done automatically for you in most >>> cases >>> >>>>>> but you can call it to control the boundary. Just remember you >>> risk >>> >>>>>> partial results then. Consider you're reading the input file >>> which >>> >>>>>> contains 100 records lets say. On record 51 there is a processing >>> >>>>>> issue. What happens then? I'd also suggest this pattern >>> generally >>> >>>>>> results in poor performance. Can you not use the record >>> >>>>>> reader/writers to accomplish this so you can avoid turning it >>> into a >>> >>>>>> bunch of tiny flowfiles? >>> >>>>>> >>> >>>>>> Thanks >>> >>>>>> >>> >>>>>> On Fri, Mar 5, 2021 at 1:19 PM Eric Secules <esecu...@gmail.com> >>> >>>> wrote: >>> >>>>>>> >>> >>>>>>> Hello, >>> >>>>>>> >>> >>>>>>> I am trying to write a processor which parses an input file and >>> >>>> emits one >>> >>>>>>> JSON flowfile for each record in the input file. Currently we're >>> >>>> calling >>> >>>>>>> session.transfer() once we encounter a fragment we want to emit. >>> But >>> >>>> it's >>> >>>>>>> not sending the new flowfiles to the next processor as it >>> processes >>> >>>> the >>> >>>>>>> input flowfile. Instead it's holding everything until the input >>> is >>> >>>> fully >>> >>>>>>> processed and releasing it all at once. Is there some way I can >>> >>>> write the >>> >>>>>>> processor to emit flowfiles as soon as possible rather than >>> waiting >>> >>>> for >>> >>>>>>> everything to succeed? >>> >>>>>>> >>> >>>>>>> Thanks, >>> >>>>>>> Eric >>> >>>>>> >>> >>>> >>> >> >>> >> >>> >>>