Geoffrey

The process session is requiring you to account for all flow files you've
either created or pulled from the queue.

You have logic which pulls up to 2 things.  It could pull one in which case
you are returning.  You would get the above error from that in those cases.

In the case you get two items you read from 1 and 2 and create a 3rd.  You
now appear to remove 1 and 2 then transfer the 3rd.  That should be fine.

Thanks

On Wed, Nov 18, 2020 at 1:50 PM Greene (US), Geoffrey N <
geoffrey.n.gre...@boeing.com> wrote:

> Session.remove()!  That’s very helpful, and it makes my numbers come out
> correctly.  I’m Still getting “transfer relationship not specified”, though.
>
> Here’s where I’m at now:
>
>
>
> session.read(flowFile1, {inputStream ->
>
>     *def* slurper1 = *new* groovy.json.JsonSlurper()
>
>     *def* json1 = slurper1.parse(inputStream)
>
> }  *as* InputStreamCallback)
>
>
>
> session.read(flowFile2, {inputStream ->
>
>     d*ef* slurper2 = *new* groovy.json.JsonSlurper()
>
>     *def* json2 = slurper2.parse(inputStream)
>
> }  *as* InputStreamCallback)
>
>
>
> *def*  mergedFile = session.create()
>
> mergedFile = session.write(mergedFile, {outputStream ->
>
>     outputStream.write(“new information".bytes)
>
> } *as* OutputStreamCallback)
>
> session.remove (flowFile1)
>
> session.remove(flowFile2)
>
> session.transfer(mergedFile, REL_SUCCESS)
>
>
>
>
>
>
>
> Geoffrey Greene
>
> Senior Software Ninjaneer
>
> (703) 414 2421
>
> The Boeing Company
>
>
>
> *From:* Chris Sampson [mailto:chris.samp...@naimuri.com]
> *Sent:* Wednesday, November 18, 2020 3:33 PM
> *To:* users@nifi.apache.org
> *Subject:* Re: [EXTERNAL] Re: horizontal merge
>
>
>
> This message was sent from outside of Boeing. Please do not click links or
> open attachments unless you recognize the sender and know that the content
> is safe.
>
>
>
>
> You may want a call to `session.remove(flowFile1)` instead of transferring
> it.
>
>
>
> Cheers,
>
> Chris Sampson
>
>
>
> On Wed, 18 Nov 2020, 20:03 Greene (US), Geoffrey N, <
> geoffrey.n.gre...@boeing.com> wrote:
>
> I've gotten closer with grabbing two files and processing them.  I still
> have something wrong in the paradigm though.  Here's what I've got it
> narrowed down to (This is in an ExecuteGroovyScript, BTW.  I hope to
> translate it to an InvokeScriptedProcessor later on, so I can define the
> transfer end points)
>
> // get two files, always two.  Read file1, and write certain fields to
> file2
> flowFileList = session.get(2)
> if (flowFileList.size() != 2)  return
>
> flowFile1 = flowFileList.get(0)
> flowFile2 = flowFileList.get(1)
>
> if(!flowFile1) return
> if(!flowFile2) return
>
> flowFile1 = session.read(flowFile1, {inputStream ->
>                def slurper = new groovy.json.JsonSlurper()
>         json1 = slurper.parse(inputStream1)
> }  as OutputStreamCallback)
>
> flowFile2 = session.write(flowFile2, {outputStream ->
>         outputStream.write("foo plus some data from json1".bytes)
> }  as OutputStreamCallback)
>
> // I really don't want to TRANSFER flow file 1, I want it to go AWAY, ,but
> // I have to do this
> session.transfer(flowFile1, REL_SUCCESS) // << isn't needed
> session.transfer(flowFile2, REL_SUCCESS)
>
>
> Both files  do get read correctly, and output to success, but Nifi always
> throws the error that "transfer relationship not specified", which I
> gather, means that the call to transfer failed because one file (probably
> flowFile1) is not up to date....
>
> Any thoughts?  How do you grab two files at once and then transfer them?
>
> I really only want to transfer just the ONE out, since the data was merged
> in, but I can manage with two files if I have to make # inputs = # outputs
>
> Thanks
>
>
> -----Original Message-----
> From: Greene (US), Geoffrey N
> Sent: Tuesday, November 17, 2020 8:30 PM
> To: users@nifi.apache.org
> Subject: RE: [EXTERNAL] Re: horizontal merge
>
> The data is actually coming from a rest call,  which provides json.  That
> is pretty smooth at this point.
>
> The challenge seems to be one of associating two different numbers
> records, and combing them back into one single file (like how do I know
> when I've processed all the records in BOTH files to give one single output
> file.  I like your suggestion of rolling back the sesion and returning,
> though, I will look into that, (though it might mean the file has to be
> processed as one single file, rather than handling them as splits/merges
>
> I've also been playing with MergeRecords and MergeContent too, and I might
> be making some progress.  My struggle now is trying to figure out when I
> know all records are processed, since I don't have a constant number of
> results to watch for. I may end up writing a file appender, and just
> appending "as I go", so I don't have to do a count.
>
> -----Original Message-----
> From: Matt Burgess [mailto:mattyb...@apache.org]
> Sent: Tuesday, November 17, 2020 4:22 PM
> To: users@nifi.apache.org
> Subject: [EXTERNAL] Re: horizontal merge
>
>
> Geoffrey,
>
> Where are the two flowfiles coming from? This use case is often handled in
> NiFi using LookupRecord with one of the LookupService implementations
> (REST, RDBMS, CSV, etc.). We don't currently have a mechanism (besides
> scripting) to do enrichment/lookups from flowfiles.
>
> For your script, you can do session.get(2) and then check the size of the
> returned List. If it is less than 2, you can rollback the session and
> return (possibly yielding first if you don't want to check again rapidly).
>
> Regards,
> Matt
>
> On Tue, Nov 17, 2020 at 4:13 PM Greene (US), Geoffrey N <
> geoffrey.n.gre...@boeing.com> wrote:
> >
> > I’m trying to glue two flow files together HORIZONTALLY.  That is,
> >
> > Flowfile1
> >
> > ID,STARTINGLETTER
> >
> > 1,A
> >
> > 2,B
> >
> >
> >
> > And flowfile2:
> >
> > ID, WORD
> >
> > 1,Apple
> >
> > 2, Ball
> >
> > 3, Cat
> >
> >
> >
> > I want it to become:
> >
> > ID, STARTINGLETTER, WORD
> >
> > 1,A,Apple
> >
> > 2,B,Ball
> >
> > 3,,Cat
> >
> >
> >
> > The only way I’ve been able to figure out how to do this is to write a
> custom “InvokeGroovyProcessor” that takes two flowfiles reads them both,
> and then concatenates them.
> >
> >
> >
> > I’m having trouble figuring out how to pop TWO flow files off the
> > queue, (The order doesn’t really matter for now), and write one. I’ve
> > tried
> >
> >
> >
> >     @Override
> >
> >     void onTrigger(ProcessContext context, ProcessSessionFactory
> > sessionFactory) throws ProcessException {
> >
> >         try {
> >
> >             def session = sessionFactory.createSession()
> >
> >              while (session.getQueueSize() != 2)
> >
> >              {
> >
> >                      Thread.sleep(1000)
> >
> >                      log.debug("sleeping")
> >
> >               }
> >
> >              // we never get here
> >
> >               log.debug(“found two flow files")
> >
> >               // get BOTH flowfiles
> >
> >              def flowFileList = session.get(2)
> >
> >               def flowFile1 = flowFileList.get(0)
> >
> >               def flowFile2 = flowFileList.get(1)
> >
> >                                // now do the glue
> >
> >
> >
> > Or Is there a better way?
> >
> > Thanks!
> >
> >
> >
> >
> >
> >
>
>

Reply via email to