I prefer to avoid the callbacks and do this: InputStream is = session.read(flowFile); .... some code.... is.close();
On Fri, Mar 27, 2020 at 5:22 PM Russell Bateman <r...@windofkeltia.com> wrote: > Joe, > > Ah, thanks. I think I have learned a lot about what's going on down > inside session.read/write()today. I don't have to stand on my head. For > completeness if anyone else looks for this answer, here's my code amended: > > public void onTrigger( final ProcessContext context, final ProcessSession > session ) throws ProcessException > { > FlowFile flowfile = session.get(); > ... > > // Do some processing on the in-coming flowfile then close its input > stream > flowfile = session.write( flowfile, new InputStreamCallback() > { > @Override > public void process( InputStream inputStream, OutputStream > outputStream ) throws IOException > { > ... > > // processing puts some output on the output stream... > outputStream.write( etc. ); > > inputStream.close(); > } > } ); > > // Start over doing different processing on the (same/reopened) > in-coming flowfile > // continuing to use the (same, also reopened, but appended to) output > stream. > flowfile = session.write( flowfile, new StreamCallback() > { > @Override > public void process( InputStream inputStream, OutputStream > outputStream ) throws IOException > { > ... > > // processing puts (some more) output on the flowfile's output > stream... > outputStream.write( etc. ); > } > } ); > > session.transfer( flowfile, etc. ); > } > > As I'm fond of saying, NiFi just rocks because there's always a solution! > > Russ > > On 3/27/20 3:08 PM, Joe Witt wrote: > > you should be able to call write as many times as you need. just keep > > using the resulting flowfile reference into the next call. > > > > On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <r...@windofkeltia.com> > > wrote: > > > >> Mike, > >> > >> Many thanks for responding. Do you mean to say that all I have to do is > >> something like this? > >> > >> public void onTrigger( final ProcessContext context, final > >> ProcessSession session ) throws ProcessException > >> { > >> FlowFile flowfile = session.get(); > >> ... > >> > >> // this is will be our resulting flowfile... > >> AtomicReference< OutputStream > savedOutputStream = new > >> AtomicReference<>(); > >> > >> /* Do some processing on the in-coming flowfile then close its > >> input stream, but > >> * save the output stream for continued use. > >> */ > >> * session.write( flowfile, new InputStreamCallback()* > >> { > >> @Override > >> * public void process( InputStream inputStream, OutputStream > >> outputStream ) throws IOException* > >> { > >> savedOutputStream.set( outputStream ); > >> ... > >> > >> // processing puts some output on the output stream... > >> outputStream.write( etc. ); > >> > >> inputStream.close(); > >> } > >> * } );* > >> > >> /* Start over doing different processing on the (same/reopened) > >> in-coming flowfile > >> * continuing to use the original output stream. It's our > >> responsibility to close > >> * the saved output stream, NiFi closes the unused output stream > >> opened, but > >> * ignored by us. > >> */ > >> * session.write( flowfile, new StreamCallback()* > >> { > >> @Override > >> * public void process( InputStream inputStream, OutputStream > >> outputStream ) throws IOException* > >> { > >> outputStream = savedOutputStream.get(); // (discard the new > >> output stream) > >> ... > >> > >> // processing puts (some more) output on the original output > >> stream... > >> outputStream.write( etc. ); > >> > >> outputStream.close(); > >> } > >> * } );* > >> > >> session.transfer( flowfile, etc. ); > >> } > >> > >> I'm wondering if this will work to "discard" the new output stream > >> opened for me (the second time) and replace it with the original one > >> which was probably closed when the first call to > >> session.write()finished. What's on these streams is way too big for me > >> to put them into temporary memory, say, a ByteArrayOutputStream. > >> > >> Russ > >> > >> On 3/27/20 10:03 AM, Mike Thomsen wrote: > >>> session.read(FlowFile) just gives you an InputStream. You should be > able > >> to > >>> rerun that as many times as you want provided you properly close it. > >>> > >>> On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman < > r...@windofkeltia.com> > >>> wrote: > >>> > >>>> In my custom processor, I'm using a SAX parser to process an incoming > >>>> flowfile that's in XML. Except that, this particular XML is in essence > >>>> two different files and I would like to split, read and process the > >>>> first "half", which starts a couple of lines (XML elements) into the > >>>> file) not using the SAX parser. At the end, I would stream the output > of > >>>> the first half, then the SAX-processed second half. > >>>> > >>>> So, in short: > >>>> > >>>> 1. process the incoming flowfile for the early content not using > SAX, > >>>> but merely copying as-is; at all cost I must avoid > "reassembling" > >>>> the first half using my SAX handler (what I'm doing now), > >>>> 2. output the first part down the output stream to the resulting > >> flowfile, > >>>> 3. (re)process the incoming flowfile using SAX (and I can just skip > >>>> over the first bit) and spitting the result of this second part > out > >>>> down the output stream of the resulting flowfile. > >>>> > >>>> I guess this is tantamount to asking how, in Java, I can read an input > >>>> stream twice (or one-half plus one times). Maybe it's less a NiFi > >>>> developer question and more a Java question. I have looked at it that > >>>> way too, but, if one of you knows (particularly NiFi) best practice, I > >>>> would very much like to hear about it. > >>>> > >>>> Thanks. > >>>> > >>>> > >> > >