No, this test file is tiny. The real thing is usually megabytes in size. On Sun, Mar 29, 2020 at 3:15 AM Mike Thomsen <mikerthom...@gmail.com> wrote:
> If these files are only a few MB at the most, you can also just export them > to a ByteArrayOutputStream. Just a thought. > > On Sun, Mar 29, 2020 at 12:16 AM Russell Bateman <r...@windofkeltia.com> > wrote: > > > Joe and Mike, > > > > Sadly, I was not able to get very far on this. It seems that the extend > > to which I copy the first half of the contents of the input stream, I > > lose what comes after when I try to read again, basically, the second > > half comprising the <metadata>and <demographics>elements which I was > > hoping to SAX-parse. Here's code and output. I have highlighted the > > output to make it easier to read. > > > > ? <#> > > |try| > > |{| > > |||InputStream inputStream = session.read( flowfile );| > > |||System.out.println( ||"This is the input stream first time around > > (before copying to output stream)..."| |);| > > |||System.out.println( StreamUtilities.fromStream( inputStream ) );| > > |||inputStream.close();| > > |}| > > |catch||( IOException e )| > > |{| > > |||e.printStackTrace();| > > |}| > > |flowfile = session.write( flowfile, ||new| |StreamCallback()| > > |{| > > |||@Override| > > |||public| |void| |process( InputStream inputStream, OutputStream > > outputStream ) ||throws| |IOException| > > |||{| > > |||System.out.println( ||"And now, let's copy..."| |);| > > |||CxmlStreamUtilities.copyCxmlHeaderAndDocumentToOutput( inputStream, > > outputStream );| > > |||}| > > |} );| > > |try| > > |{| > > |||InputStream inputStream = session.read( flowfile );| > > |||System.out.println( ||"This is the input stream second time around > > (after copying)..."| |);| > > |||System.out.println( StreamUtilities.fromStream( inputStream ) );| > > |||inputStream.close();| > > |}| > > |catch||( IOException e )| > > |{| > > |||e.printStackTrace();| > > |}| > > |// ...on to SAX parser which dies because the input has been truncated > to| > > |// exactly what was written out to the output stream| > > > > > > Output of above: > > > > This is the input stream first time around (before copying to output > > stream)... > > <cxml> > > <document> > > This is the original document. > > </document> > > <metadata> > > <date_of_service>2016-06-28 13:23</date_of_service> > > </metadata> > > <demographics> > > <date_of_birth>1980-07-01</date_of_birth> > > <age>36</age> > > </demographics> > > </cxml> > > > > And now, let's copy... > > This is the input stream second time around (after copying)... > > <cxml> > > <document> > > This is the original document. > > </document> > > And now, we'll go on to the SAX parser... > > <cxml> <document> This is the original document. </document> > > [pool-1-thread-1] ERROR [...] SAX ruleparser error: > > org.xml.sax.SAXParseException; lineNumber: 4; columnNumber: 14; XML > > document structures must start and end within the same entity. > > > > > > I left off the code that prints, "And now, we'll go on to the SAX > > parser..." It's in the next flowfile = session.write( ... ). I have unit > > tests that verify the good functioning of > > copyCxmlHeaderAndDocumentToOutput(). The SAX error occurs because the > > "file" is truncated; SAX finds the first "half" just fine, but there is > > no second "half". If I comment out copying from input stream to output > > stream, the error doesn't occur--the whole document is there. > > > > Thanks for looking at this again if you can, > > Russ > > > > On 3/27/20 3:08 PM, Joe Witt wrote: > > > you should be able to call write as many times as you need. just keep > > > using the resulting flowfile reference into the next call. > > > > > > On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <r...@windofkeltia.com > > > > > wrote: > > > > > >> Mike, > > >> > > >> Many thanks for responding. Do you mean to say that all I have to do > is > > >> something like this? > > >> > > >> public void onTrigger( final ProcessContext context, final > > >> ProcessSession session ) throws ProcessException > > >> { > > >> FlowFile flowfile = session.get(); > > >> ... > > >> > > >> // this is will be our resulting flowfile... > > >> AtomicReference< OutputStream > savedOutputStream = new > > >> AtomicReference<>(); > > >> > > >> /* Do some processing on the in-coming flowfile then close its > > >> input stream, but > > >> * save the output stream for continued use. > > >> */ > > >> * session.write( flowfile, new InputStreamCallback()* > > >> { > > >> @Override > > >> * public void process( InputStream inputStream, OutputStream > > >> outputStream ) throws IOException* > > >> { > > >> savedOutputStream.set( outputStream ); > > >> ... > > >> > > >> // processing puts some output on the output stream... > > >> outputStream.write( etc. ); > > >> > > >> inputStream.close(); > > >> } > > >> * } );* > > >> > > >> /* Start over doing different processing on the > (same/reopened) > > >> in-coming flowfile > > >> * continuing to use the original output stream. It's our > > >> responsibility to close > > >> * the saved output stream, NiFi closes the unused output > stream > > >> opened, but > > >> * ignored by us. > > >> */ > > >> * session.write( flowfile, new StreamCallback()* > > >> { > > >> @Override > > >> * public void process( InputStream inputStream, OutputStream > > >> outputStream ) throws IOException* > > >> { > > >> outputStream = savedOutputStream.get(); // (discard the > new > > >> output stream) > > >> ... > > >> > > >> // processing puts (some more) output on the original > output > > >> stream... > > >> outputStream.write( etc. ); > > >> > > >> outputStream.close(); > > >> } > > >> * } );* > > >> > > >> session.transfer( flowfile, etc. ); > > >> } > > >> > > >> I'm wondering if this will work to "discard" the new output stream > > >> opened for me (the second time) and replace it with the original one > > >> which was probably closed when the first call to > > >> session.write()finished. What's on these streams is way too big for me > > >> to put them into temporary memory, say, a ByteArrayOutputStream. > > >> > > >> Russ > > >> > > >> On 3/27/20 10:03 AM, Mike Thomsen wrote: > > >>> session.read(FlowFile) just gives you an InputStream. You should be > > able > > >> to > > >>> rerun that as many times as you want provided you properly close it. > > >>> > > >>> On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman < > > r...@windofkeltia.com> > > >>> wrote: > > >>> > > >>>> In my custom processor, I'm using a SAX parser to process an > incoming > > >>>> flowfile that's in XML. Except that, this particular XML is in > essence > > >>>> two different files and I would like to split, read and process the > > >>>> first "half", which starts a couple of lines (XML elements) into the > > >>>> file) not using the SAX parser. At the end, I would stream the > output > > of > > >>>> the first half, then the SAX-processed second half. > > >>>> > > >>>> So, in short: > > >>>> > > >>>> 1. process the incoming flowfile for the early content not using > > SAX, > > >>>> but merely copying as-is; at all cost I must avoid > > "reassembling" > > >>>> the first half using my SAX handler (what I'm doing now), > > >>>> 2. output the first part down the output stream to the resulting > > >> flowfile, > > >>>> 3. (re)process the incoming flowfile using SAX (and I can just > skip > > >>>> over the first bit) and spitting the result of this second > part > > out > > >>>> down the output stream of the resulting flowfile. > > >>>> > > >>>> I guess this is tantamount to asking how, in Java, I can read an > input > > >>>> stream twice (or one-half plus one times). Maybe it's less a NiFi > > >>>> developer question and more a Java question. I have looked at it > that > > >>>> way too, but, if one of you knows (particularly NiFi) best > practice, I > > >>>> would very much like to hear about it. > > >>>> > > >>>> Thanks. > > >>>> > > >>>> > > >> > > > > >