Thanks for your suggestions, Matt.

I decided to keep the original flowfile only upon failure. So, I have the embedded-document file and the serialized POJOs created from processing the non embedded-document part as the result if successful. (Condensed code at end...)

Now I have three questions...

1. I seem not to have placated NiFi with the assurance that I have transferred or disposed of all three flowfiles suitably. I get:

java.lang.AssertionError: org.apache.nifi.processor.exception.FlowFileHandlingException: Cannot commit session because the following FlowFiles have not been removed or transferred: [2]

*Which of the three flowfiles does [2] refer to? Or does it just mean I botched two flowfiles. *

2. session.clone()generates a new flowfile with the identical uuid. I don't think I want the result to be two flowfiles with the same uuid. I am binding them together so I can associate them later using attribute embedded-document. *Should I/How do I force cloning to acquire new **uuid**s?*

3. A question on theory... *Wouldn't all of this cloning be expensive* and I should just clone for one of the new files and then mangle the original flowfile to become the other?

Thanks,
Russ


@Override
public void onTrigger( final ProcessContext context, final ProcessSession session ) throws ProcessException
{
  FlowFile flowfile = session.get();

  if( flowfile == null )
  {
    context.yield();
    return;
  }

  try
  {
    final String UUID = flowfile.getAttribute( NiFiUtilities.UUID );

    FlowFile document = session.clone( flowfile );

*    // excerpt and write the embedded document to a new flowfile...*
    session.write( document, new OutputStreamCallback()
    {
      @Override public void process( OutputStream outputStream )
      {
        // read from the original flowfile copying to the output flowfile...
        session.read( flowfile, new InputStreamCallback()
        {
          @Override public void process( InputStream inputStream ) throws IOException
          {
           ...
          }
        } );
      }
    } );

    FlowFile concepts = session.clone( flowfile );

    AtomicReference< ConceptList > conceptListHolder = new AtomicReference<>();

*    // parse the concepts into a POJO list...*
    session.read( concepts, new InputStreamCallback()
    {
      final ConceptList conceptList = conceptListHolder.get();

      @Override public void process( InputStream inputStream ) throws IOException
      {
        ...
      }
    } );

*    // write out the concept POJOs serialized...*
    session.write( concepts, new OutputStreamCallback()
    {
      @Override public void process( OutputStream outputStream )
      {
        ...
      }
    } );

    document = session.putAttribute( document, "embedded-document", UUID );
    concepts = session.putAttribute( document, "embedded-document", UUID );
    session.transfer( document, DOCUMENT );
    session.transfer( concepts, CONCEPTS );
    session.remove( flowfile );
  }
  catch( Exception e )
  {
    session.transfer( flowfile, FAILURE );
  }
}

On 8/24/20 4:52 PM, Matt Burgess wrote:
Russell,

session.read() won't overwrite any contents of the incoming flow file,
but write() will. For #2, are you doing any processing on the file? If
not, wouldn't that be the original flowfile anyway? Or do you want it
to be a different flowfile on purpose (so you can send the incoming
flowfile to a different relationship)? You can use session.clone() to
create a new flowfile that has the same content and attributes from
the incoming flowfile, then handle that separately from the incoming
(original) flowfile. For #1, you could clone() the original flowfile
and do the read/process/write as part of a session.write(FlowFile,
StreamCallback) call, then you're technically reading the "new" file
content (which is the same of course) and overwriting it on the way
out.

Regards,
Matt

On Mon, Aug 24, 2020 at 6:37 PM Russell Bateman <r...@windofkeltia.com> wrote:
I am writing a custom processor that, upon processing a flowfile,
results  in two new flowfiles (neither keeping the exact, original
content) out two different relationships. I might like to route the
original flowfile to a separate relationship.

FlowFile original = session.get();

Do I need to call session.create()for the two new files?

  1. session.read()of original file's contents, not all of the way
     through, but send the processed output from what I do read as
     flowfile 1.
  2. session.read()of original file's contents and send resulting output
     as flowfile 2.
  3. session.transfer()of original flowfile.

I look at all of these session.read()and session.write()calls and I'm a
bit confused as to which to use that won't lose the original flowfile's
content after #1 so I can start over again in #2.

Thanks.

Reply via email to