Oh, sorry I did not understand that you needed to do that
On March 31, 2020 at 11:22:20, Russell Bateman (r...@windofkeltia.com) wrote: Yes, I though of that, but there's no way to insert completing XML structure into the input stream ahead of (<metadata>). SAX will choke if I just start feeding it the flowfile where I left off from copying up to </document>. On 3/30/20 8:25 PM, Otto Fowler wrote: > Can I ask why you would consume the whole stream when doing the non-sax > part? If you consume the stream right up to the sax part ( the stream POS > is at the start of the xml ) then you can just pass the stream to sax as is > can’t you? > > > > > On March 30, 2020 at 16:23:27, Russell Bateman (r...@windofkeltia.com) > wrote: > > If I haven't worn out my welcome, here is the simplified code that should > demonstrate either that I have miscoded your suggestions or that the API > doesn't in fact work as advertised. First, the output. The code, both JUnit > test and processor are attached and the files are pretty small. > > Much thanks, > Russ > > This is the input stream first time around (before copying) > =================================== > * * * session.read( flowfile ); > Here's what's in input stream: > *<cxml>* > * <document>* > * This is the original document.* > * </document>* > * <metadata>* > * <date_of_service>2016-06-28 13:23</date_of_service>* > * </metadata>* > * <demographics>* > * <date_of_birth>1980-07-01</date_of_birth>* > * <age>36</age>* > * </demographics>* > *</cxml>* > > And now, let's copy some of the input stream to the output stream > ============================= > * * * flowfile = session.write( flowfile, new StreamCallback() ... > Copying input stream to output stream up to </document>... > The output stream has in it at this point: > *<cxml>* > * <document>* > * This is the original document.* > * </document>* > > [1. When we examine the output stream, it has what we expect.] > > After copying, can we reopen input stream intact and does outputstream have > what we think? ==== > * * * flowfile = session.write( flowfile, new StreamCallback() ... > Here's what's in input stream: > *<cxml>* > * <document>* > * This is the original document.* > * </document>* > > [2. The input stream as reported just above is truncated by exactly the > content we did > not copy to the output stream. We expected to see the entire, > original file, but the > second half is gone.] > > Here's what's in the output stream at this point: > * (nothing)* > > [3. The content we copied to the output stream has disappeared. Does it > disappear simply > because we looked at it (printed it out here)?] > > > On 3/29/20 5:05 AM, Joe Witt wrote: > > Russell > > I recommend writing very simple code that does two successive read/write > operations on basic data so you can make sure the api work/as expected. > Then add the xml bits. > > Thanks > > On Sun, Mar 29, 2020 at 5:15 AM Mike Thomsen <mikerthom...@gmail.com> > <mikerthom...@gmail.com> wrote: > > > If these files are only a few MB at the most, you can also just export them > to a ByteArrayOutputStream. Just a thought. > > On Sun, Mar 29, 2020 at 12:16 AM Russell Bateman > <r...@windofkeltia.com> <r...@windofkeltia.com> > wrote: > > > Joe and Mike, > > Sadly, I was not able to get very far on this. It seems that the extend > to which I copy the first half of the contents of the input stream, I > lose what comes after when I try to read again, basically, the second > half comprising the <metadata>and <demographics>elements which I was > hoping to SAX-parse. Here's code and output. I have highlighted the > output to make it easier to read. > > ? <#> > |try| > |{| > |||InputStream inputStream = session.read( flowfile );| > |||System.out.println( ||"This is the input stream first time around > (before copying to output stream)..."| |);| > |||System.out.println( StreamUtilities.fromStream( inputStream ) );| > |||inputStream.close();| > |}| > |catch||( IOException e )| > |{| > |||e.printStackTrace();| > |}| > |flowfile = session.write( flowfile, ||new| |StreamCallback()| > |{| > |||@Override| > |||public| |void| |process( InputStream inputStream, OutputStream > outputStream ) ||throws| |IOException| > |||{| > |||System.out.println( ||"And now, let's copy..."| |);| > |||CxmlStreamUtilities.copyCxmlHeaderAndDocumentToOutput( inputStream, > outputStream );| > |||}| > |} );| > |try| > |{| > |||InputStream inputStream = session.read( flowfile );| > |||System.out.println( ||"This is the input stream second time around > (after copying)..."| |);| > |||System.out.println( StreamUtilities.fromStream( inputStream ) );| > |||inputStream.close();| > |}| > |catch||( IOException e )| > |{| > |||e.printStackTrace();| > |}| > |// ...on to SAX parser which dies because the input has been truncated > > to| > > |// exactly what was written out to the output stream| > > > Output of above: > > This is the input stream first time around (before copying to output > stream)... > <cxml> > <document> > This is the original document. > </document> > <metadata> > <date_of_service>2016-06-28 13:23</date_of_service> > </metadata> > <demographics> > <date_of_birth>1980-07-01</date_of_birth> > <age>36</age> > </demographics> > </cxml> > > And now, let's copy... > This is the input stream second time around (after copying)... > <cxml> > <document> > This is the original document. > </document> > And now, we'll go on to the SAX parser... > <cxml> <document> This is the original document. </document> > [pool-1-thread-1] ERROR [...] SAX ruleparser error: > org.xml.sax.SAXParseException; lineNumber: 4; columnNumber: 14; XML > document structures must start and end within the same entity. > > > I left off the code that prints, "And now, we'll go on to the SAX > parser..." It's in the next flowfile = session.write( ... ). I have unit > tests that verify the good functioning of > copyCxmlHeaderAndDocumentToOutput(). The SAX error occurs because the > "file" is truncated; SAX finds the first "half" just fine, but there is > no second "half". If I comment out copying from input stream to output > stream, the error doesn't occur--the whole document is there. > > Thanks for looking at this again if you can, > Russ > > On 3/27/20 3:08 PM, Joe Witt wrote: > > you should be able to call write as many times as you need. just keep > using the resulting flowfile reference into the next call. > > On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <r...@windofkeltia.com > > wrote: > > > Mike, > > Many thanks for responding. Do you mean to say that all I have to do > > is > > something like this? > > public void onTrigger( final ProcessContext context, final > ProcessSession session ) throws ProcessException > { > FlowFile flowfile = session.get(); > ... > > // this is will be our resulting flowfile... > AtomicReference< OutputStream > savedOutputStream = new > AtomicReference<>(); > > /* Do some processing on the in-coming flowfile then close its > input stream, but > * save the output stream for continued use. > */ > * session.write( flowfile, new InputStreamCallback()* > { > @Override > * public void process( InputStream inputStream, OutputStream > outputStream ) throws IOException* > { > savedOutputStream.set( outputStream ); > ... > > // processing puts some output on the output stream... > outputStream.write( etc. ); > > inputStream.close(); > } > * } );* > > /* Start over doing different processing on the > > (same/reopened) > > in-coming flowfile > * continuing to use the original output stream. It's our > responsibility to close > * the saved output stream, NiFi closes the unused output > > stream > > opened, but > * ignored by us. > */ > * session.write( flowfile, new StreamCallback()* > { > @Override > * public void process( InputStream inputStream, OutputStream > outputStream ) throws IOException* > { > outputStream = savedOutputStream.get(); // (discard the > > new > > output stream) > ... > > // processing puts (some more) output on the original > > output > > stream... > outputStream.write( etc. ); > > outputStream.close(); > } > * } );* > > session.transfer( flowfile, etc. ); > } > > I'm wondering if this will work to "discard" the new output stream > opened for me (the second time) and replace it with the original one > which was probably closed when the first call to > session.write()finished. What's on these streams is way too big for me > to put them into temporary memory, say, a ByteArrayOutputStream. > > Russ > > On 3/27/20 10:03 AM, Mike Thomsen wrote: > > session.read(FlowFile) just gives you an InputStream. You should be > > able > > to > > rerun that as many times as you want provided you properly close it. > > On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman < > > r...@windofkeltia.com> > > wrote: > > > In my custom processor, I'm using a SAX parser to process an > > incoming > > flowfile that's in XML. Except that, this particular XML is in > > essence > > two different files and I would like to split, read and process the > first "half", which starts a couple of lines (XML elements) into the > file) not using the SAX parser. At the end, I would stream the > > output > > of > > the first half, then the SAX-processed second half. > > So, in short: > > 1. process the incoming flowfile for the early content not using > > SAX, > > but merely copying as-is; at all cost I must avoid > > "reassembling" > > the first half using my SAX handler (what I'm doing now), > 2. output the first part down the output stream to the resulting > > flowfile, > > 3. (re)process the incoming flowfile using SAX (and I can just > > skip > > over the first bit) and spitting the result of this second > > part > > out > > down the output stream of the resulting flowfile. > > I guess this is tantamount to asking how, in Java, I can read an > > input > > stream twice (or one-half plus one times). Maybe it's less a NiFi > developer question and more a Java question. I have looked at it > > that > > way too, but, if one of you knows (particularly NiFi) best > > practice, I > > would very much like to hear about it. > > Thanks. >