Geoffrey The process session is requiring you to account for all flow files you've either created or pulled from the queue.
You have logic which pulls up to 2 things. It could pull one in which case you are returning. You would get the above error from that in those cases. In the case you get two items you read from 1 and 2 and create a 3rd. You now appear to remove 1 and 2 then transfer the 3rd. That should be fine. Thanks On Wed, Nov 18, 2020 at 1:50 PM Greene (US), Geoffrey N < geoffrey.n.gre...@boeing.com> wrote: > Session.remove()! That’s very helpful, and it makes my numbers come out > correctly. I’m Still getting “transfer relationship not specified”, though. > > Here’s where I’m at now: > > > > session.read(flowFile1, {inputStream -> > > *def* slurper1 = *new* groovy.json.JsonSlurper() > > *def* json1 = slurper1.parse(inputStream) > > } *as* InputStreamCallback) > > > > session.read(flowFile2, {inputStream -> > > d*ef* slurper2 = *new* groovy.json.JsonSlurper() > > *def* json2 = slurper2.parse(inputStream) > > } *as* InputStreamCallback) > > > > *def* mergedFile = session.create() > > mergedFile = session.write(mergedFile, {outputStream -> > > outputStream.write(“new information".bytes) > > } *as* OutputStreamCallback) > > session.remove (flowFile1) > > session.remove(flowFile2) > > session.transfer(mergedFile, REL_SUCCESS) > > > > > > > > Geoffrey Greene > > Senior Software Ninjaneer > > (703) 414 2421 > > The Boeing Company > > > > *From:* Chris Sampson [mailto:chris.samp...@naimuri.com] > *Sent:* Wednesday, November 18, 2020 3:33 PM > *To:* users@nifi.apache.org > *Subject:* Re: [EXTERNAL] Re: horizontal merge > > > > This message was sent from outside of Boeing. Please do not click links or > open attachments unless you recognize the sender and know that the content > is safe. > > > > > You may want a call to `session.remove(flowFile1)` instead of transferring > it. > > > > Cheers, > > Chris Sampson > > > > On Wed, 18 Nov 2020, 20:03 Greene (US), Geoffrey N, < > geoffrey.n.gre...@boeing.com> wrote: > > I've gotten closer with grabbing two files and processing them. I still > have something wrong in the paradigm though. Here's what I've got it > narrowed down to (This is in an ExecuteGroovyScript, BTW. I hope to > translate it to an InvokeScriptedProcessor later on, so I can define the > transfer end points) > > // get two files, always two. Read file1, and write certain fields to > file2 > flowFileList = session.get(2) > if (flowFileList.size() != 2) return > > flowFile1 = flowFileList.get(0) > flowFile2 = flowFileList.get(1) > > if(!flowFile1) return > if(!flowFile2) return > > flowFile1 = session.read(flowFile1, {inputStream -> > def slurper = new groovy.json.JsonSlurper() > json1 = slurper.parse(inputStream1) > } as OutputStreamCallback) > > flowFile2 = session.write(flowFile2, {outputStream -> > outputStream.write("foo plus some data from json1".bytes) > } as OutputStreamCallback) > > // I really don't want to TRANSFER flow file 1, I want it to go AWAY, ,but > // I have to do this > session.transfer(flowFile1, REL_SUCCESS) // << isn't needed > session.transfer(flowFile2, REL_SUCCESS) > > > Both files do get read correctly, and output to success, but Nifi always > throws the error that "transfer relationship not specified", which I > gather, means that the call to transfer failed because one file (probably > flowFile1) is not up to date.... > > Any thoughts? How do you grab two files at once and then transfer them? > > I really only want to transfer just the ONE out, since the data was merged > in, but I can manage with two files if I have to make # inputs = # outputs > > Thanks > > > -----Original Message----- > From: Greene (US), Geoffrey N > Sent: Tuesday, November 17, 2020 8:30 PM > To: users@nifi.apache.org > Subject: RE: [EXTERNAL] Re: horizontal merge > > The data is actually coming from a rest call, which provides json. That > is pretty smooth at this point. > > The challenge seems to be one of associating two different numbers > records, and combing them back into one single file (like how do I know > when I've processed all the records in BOTH files to give one single output > file. I like your suggestion of rolling back the sesion and returning, > though, I will look into that, (though it might mean the file has to be > processed as one single file, rather than handling them as splits/merges > > I've also been playing with MergeRecords and MergeContent too, and I might > be making some progress. My struggle now is trying to figure out when I > know all records are processed, since I don't have a constant number of > results to watch for. I may end up writing a file appender, and just > appending "as I go", so I don't have to do a count. > > -----Original Message----- > From: Matt Burgess [mailto:mattyb...@apache.org] > Sent: Tuesday, November 17, 2020 4:22 PM > To: users@nifi.apache.org > Subject: [EXTERNAL] Re: horizontal merge > > > Geoffrey, > > Where are the two flowfiles coming from? This use case is often handled in > NiFi using LookupRecord with one of the LookupService implementations > (REST, RDBMS, CSV, etc.). We don't currently have a mechanism (besides > scripting) to do enrichment/lookups from flowfiles. > > For your script, you can do session.get(2) and then check the size of the > returned List. If it is less than 2, you can rollback the session and > return (possibly yielding first if you don't want to check again rapidly). > > Regards, > Matt > > On Tue, Nov 17, 2020 at 4:13 PM Greene (US), Geoffrey N < > geoffrey.n.gre...@boeing.com> wrote: > > > > I’m trying to glue two flow files together HORIZONTALLY. That is, > > > > Flowfile1 > > > > ID,STARTINGLETTER > > > > 1,A > > > > 2,B > > > > > > > > And flowfile2: > > > > ID, WORD > > > > 1,Apple > > > > 2, Ball > > > > 3, Cat > > > > > > > > I want it to become: > > > > ID, STARTINGLETTER, WORD > > > > 1,A,Apple > > > > 2,B,Ball > > > > 3,,Cat > > > > > > > > The only way I’ve been able to figure out how to do this is to write a > custom “InvokeGroovyProcessor” that takes two flowfiles reads them both, > and then concatenates them. > > > > > > > > I’m having trouble figuring out how to pop TWO flow files off the > > queue, (The order doesn’t really matter for now), and write one. I’ve > > tried > > > > > > > > @Override > > > > void onTrigger(ProcessContext context, ProcessSessionFactory > > sessionFactory) throws ProcessException { > > > > try { > > > > def session = sessionFactory.createSession() > > > > while (session.getQueueSize() != 2) > > > > { > > > > Thread.sleep(1000) > > > > log.debug("sleeping") > > > > } > > > > // we never get here > > > > log.debug(“found two flow files") > > > > // get BOTH flowfiles > > > > def flowFileList = session.get(2) > > > > def flowFile1 = flowFileList.get(0) > > > > def flowFile2 = flowFileList.get(1) > > > > // now do the glue > > > > > > > > Or Is there a better way? > > > > Thanks! > > > > > > > > > > > > > >