No, this test file is tiny. The real thing is usually megabytes in size.

On Sun, Mar 29, 2020 at 3:15 AM Mike Thomsen <mikerthom...@gmail.com> wrote:

> If these files are only a few MB at the most, you can also just export them
> to a ByteArrayOutputStream. Just a thought.
>
> On Sun, Mar 29, 2020 at 12:16 AM Russell Bateman <r...@windofkeltia.com>
> wrote:
>
> > Joe and Mike,
> >
> > Sadly, I was not able to get very far on this. It seems that the extend
> > to which I copy the first half of the contents of the input stream, I
> > lose what comes after when I try to read again, basically, the second
> > half comprising the <metadata>and <demographics>elements which I was
> > hoping to SAX-parse. Here's code and output. I have highlighted the
> > output to make it easier to read.
> >
> > ? <#>
> > |try|
> > |{|
> > |||InputStream inputStream = session.read( flowfile );|
> > |||System.out.println( ||"This is the input stream first time around
> > (before copying to output stream)..."| |);|
> > |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
> > |||inputStream.close();|
> > |}|
> > |catch||( IOException e )|
> > |{|
> > |||e.printStackTrace();|
> > |}|
> > |flowfile = session.write( flowfile, ||new| |StreamCallback()|
> > |{|
> > |||@Override|
> > |||public| |void| |process( InputStream inputStream, OutputStream
> > outputStream ) ||throws| |IOException|
> > |||{|
> > |||System.out.println( ||"And now, let's copy..."| |);|
> > |||CxmlStreamUtilities.copyCxmlHeaderAndDocumentToOutput( inputStream,
> > outputStream );|
> > |||}|
> > |} );|
> > |try|
> > |{|
> > |||InputStream inputStream = session.read( flowfile );|
> > |||System.out.println( ||"This is the input stream second time around
> > (after copying)..."| |);|
> > |||System.out.println( StreamUtilities.fromStream( inputStream ) );|
> > |||inputStream.close();|
> > |}|
> > |catch||( IOException e )|
> > |{|
> > |||e.printStackTrace();|
> > |}|
> > |// ...on to SAX parser which dies because the input has been truncated
> to|
> > |// exactly what was written out to the output stream|
> >
> >
> > Output of above:
> >
> > This is the input stream first time around (before copying to output
> > stream)...
> > <cxml>
> >    <document>
> >      This is the original document.
> >    </document>
> >    <metadata>
> >      <date_of_service>2016-06-28 13:23</date_of_service>
> >    </metadata>
> >    <demographics>
> >      <date_of_birth>1980-07-01</date_of_birth>
> >      <age>36</age>
> >    </demographics>
> > </cxml>
> >
> > And now, let's copy...
> > This is the input stream second time around (after copying)...
> > <cxml>
> >    <document>
> >      This is the original document.
> >    </document>
> > And now, we'll go on to the SAX parser...
> > <cxml> <document> This is the original document. </document>
> > [pool-1-thread-1] ERROR [...] SAX ruleparser error:
> > org.xml.sax.SAXParseException; lineNumber: 4; columnNumber: 14; XML
> > document structures must start and end within the same entity.
> >
> >
> > I left off the code that prints, "And now, we'll go on to the SAX
> > parser..." It's in the next flowfile = session.write( ... ). I have unit
> > tests that verify the good functioning of
> > copyCxmlHeaderAndDocumentToOutput(). The SAX error occurs because the
> > "file" is truncated; SAX finds the first "half" just fine, but there is
> > no second "half". If I comment out copying from input stream to output
> > stream, the error doesn't occur--the whole document is there.
> >
> > Thanks for looking at this again if you can,
> > Russ
> >
> > On 3/27/20 3:08 PM, Joe Witt wrote:
> > > you should be able to call write as many times as you need.  just keep
> > > using the resulting flowfile reference into the next call.
> > >
> > > On Fri, Mar 27, 2020 at 5:06 PM Russell Bateman <r...@windofkeltia.com
> >
> > > wrote:
> > >
> > >> Mike,
> > >>
> > >> Many thanks for responding. Do you mean to say that all I have to do
> is
> > >> something like this?
> > >>
> > >>      public void onTrigger( final ProcessContext context, final
> > >>      ProcessSession session ) throws ProcessException
> > >>      {
> > >>         FlowFile flowfile = session.get();
> > >>         ...
> > >>
> > >>         // this is will be our resulting flowfile...
> > >>         AtomicReference< OutputStream > savedOutputStream = new
> > >>      AtomicReference<>();
> > >>
> > >>         /* Do some processing on the in-coming flowfile then close its
> > >>      input stream, but
> > >>          * save the output stream for continued use.
> > >>          */
> > >>      *  session.write( flowfile, new InputStreamCallback()*
> > >>         {
> > >>           @Override
> > >>      *    public void process( InputStream inputStream, OutputStream
> > >>      outputStream ) throws IOException*
> > >>           {
> > >>             savedOutputStream.set( outputStream );
> > >>             ...
> > >>
> > >>             // processing puts some output on the output stream...
> > >>             outputStream.write( etc. );
> > >>
> > >>             inputStream.close();
> > >>           }
> > >>      *  } );*
> > >>
> > >>         /* Start over doing different processing on the
> (same/reopened)
> > >>      in-coming flowfile
> > >>          * continuing to use the original output stream. It's our
> > >>      responsibility to close
> > >>          * the saved output stream, NiFi closes the unused output
> stream
> > >>      opened, but
> > >>          * ignored by us.
> > >>          */
> > >>      *  session.write( flowfile, new StreamCallback()*
> > >>         {
> > >>           @Override
> > >>      *    public void process( InputStream inputStream, OutputStream
> > >>      outputStream ) throws IOException*
> > >>           {
> > >>             outputStream = savedOutputStream.get(); // (discard the
> new
> > >>      output stream)
> > >>             ...
> > >>
> > >>             // processing puts (some more) output on the original
> output
> > >>      stream...
> > >>             outputStream.write( etc. );
> > >>
> > >>             outputStream.close();
> > >>           }
> > >>      *  } );*
> > >>
> > >>         session.transfer( flowfile, etc. );
> > >>      }
> > >>
> > >> I'm wondering if this will work to "discard" the new output stream
> > >> opened for me (the second time) and replace it with the original one
> > >> which was probably closed when the first call to
> > >> session.write()finished. What's on these streams is way too big for me
> > >> to put them into temporary memory, say, a ByteArrayOutputStream.
> > >>
> > >> Russ
> > >>
> > >> On 3/27/20 10:03 AM, Mike Thomsen wrote:
> > >>> session.read(FlowFile) just gives you an InputStream. You should be
> > able
> > >> to
> > >>> rerun that as many times as you want provided you properly close it.
> > >>>
> > >>> On Fri, Mar 27, 2020 at 11:25 AM Russell Bateman <
> > r...@windofkeltia.com>
> > >>> wrote:
> > >>>
> > >>>> In my custom processor, I'm using a SAX parser to process an
> incoming
> > >>>> flowfile that's in XML. Except that, this particular XML is in
> essence
> > >>>> two different files and I would like to split, read and process the
> > >>>> first "half", which starts a couple of lines (XML elements) into the
> > >>>> file) not using the SAX parser. At the end, I would stream the
> output
> > of
> > >>>> the first half, then the SAX-processed second half.
> > >>>>
> > >>>> So, in short:
> > >>>>
> > >>>>    1. process the incoming flowfile for the early content not using
> > SAX,
> > >>>>       but merely copying as-is; at all cost I must avoid
> > "reassembling"
> > >>>>       the first half using my SAX handler (what I'm doing now),
> > >>>>    2. output the first part down the output stream to the resulting
> > >> flowfile,
> > >>>>    3. (re)process the incoming flowfile using SAX (and I can just
> skip
> > >>>>       over the first bit) and spitting the result of this second
> part
> > out
> > >>>>       down the output stream of the resulting flowfile.
> > >>>>
> > >>>> I guess this is tantamount to asking how, in Java, I can read an
> input
> > >>>> stream twice (or one-half plus one times). Maybe it's less a NiFi
> > >>>> developer question and more a Java question. I have looked at it
> that
> > >>>> way too, but, if one of you knows (particularly NiFi) best
> practice, I
> > >>>> would very much like to hear about it.
> > >>>>
> > >>>> Thanks.
> > >>>>
> > >>>>
> > >>
> >
> >
>

Reply via email to