I needed to get back here...

I took this advice to heart and finished my processor. Thanks to Matt and Mark for all their suggestions! They cleared up a few things. There was one bug in the code that was mine, small, but significant in its effect on the rest. That mistake also explained why I thought the uuidwas identical between at least two of the cloned flowfiles. What I would wish for, and am probably not strong enough to write, would be a synthesis of the session methods read() and write() and how best to use them (one-to-one, one-to-many, etc.). Javadoc is too paratactic by nature, the NiFi Developer's Guide almost silent on these methods. If it were not for the many existing examples using these methods, it would be hard to learn to do even simple things. I did look for something closer to what I needed to do, but unsuccessfully.

Thanks again. If anything, the NiFi mailing lists are a place both for great information and being treated well.

Russ

On 8/25/20 12:24 PM, Mark Payne wrote:
Russ,

Several comments here. I’ve included them inline, below.

Hope it’s helpful.

Thanks
-Mark


On Aug 25, 2020, at 2:09 PM, Russell Bateman <r...@windofkeltia.com> wrote:

Thanks for your suggestions, Matt.

I decided to keep the original flowfile only upon failure. So, I have the 
embedded-document file and the serialized POJOs created from processing the non 
embedded-document part as the result if successful. (Condensed code at end...)

Now I have three questions...

1. I seem not to have placated NiFi with the assurance that I have transferred 
or disposed of all three flowfiles suitably. I get:

java.lang.AssertionError: 
org.apache.nifi.processor.exception.FlowFileHandlingException: Cannot commit 
session because the following FlowFiles have not been removed or transferred: 
[2]
This is probably because at the end of the block, you catch Exception and then 
route the original FlowFile to failure. But you’ve already cloned it and didn’t 
deal with the clone.
*Which of the three flowfiles does [2] refer to? Or does it just mean I botched 
two flowfiles. *

2. session.clone()generates a new flowfile with the identical uuid. I don't 
think I want the result to be two flowfiles with the same uuid. I am binding 
them together so I can associate them later using attribute embedded-document. 
*Should I/How do I force cloning to acquire new **uuid**s?*
This appears to actually be a bug in the mock framework. It *should* have a 
unique uuid, and would in a running NiFi instance. Feel free to file a Jira for 
that.
3. A question on theory... *Wouldn't all of this cloning be expensive* and I 
should just clone for one of the new files and then mangle the original 
flowfile to become the other?
session.clone() is not particularly expensive. It’s just creating a new 
FlowFile object. It doesn’t clone the FlowFile’s contents.
That said, it is probably more appropriate to call session.create(flowFile), 
rather than session.clone(flowFile). It makes little difference in practice but 
what you’re really doing is forking a child, and that will come across more 
cleanly in the Provenance lineage that is generated if using 
session.create(flowFile).

Additional comments in code below.


Thanks,
Russ


@Override
public void onTrigger( final ProcessContext context, final ProcessSession 
session ) throws ProcessException
{
   FlowFile flowfile = session.get();

   if( flowfile == null )
   {
     context.yield();
No need to yield here. Let the framework handle the scheduling. 
ProcessContext.yield() is meant for cases where you’re communicating with some 
external service, for instance, and you know the service is unavailable or rate 
limiting you or something like that. You can’t make any progress, so tell NiFi 
to not bother wasting CPU cycles with this Processor.
     return;
   }

   try
   {
     final String UUID = flowfile.getAttribute( NiFiUtilities.UUID );

     FlowFile document = session.clone( flowfile );

*    // excerpt and write the embedded document to a new flowfile...*
     session.write( document, new OutputStreamCallback()
     {
       @Override public void process( OutputStream outputStream )
       {
         // read from the original flowfile copying to the output flowfile...
         session.read( flowfile, new InputStreamCallback()
         {
           @Override public void process( InputStream inputStream ) throws 
IOException
           {
            ...
           }
         } );
       }
     } );

     FlowFile concepts = session.clone( flowfile );

     AtomicReference< ConceptList > conceptListHolder = new AtomicReference<>();

*    // parse the concepts into a POJO list...*
     session.read( concepts, new InputStreamCallback()
     {
       final ConceptList conceptList = conceptListHolder.get();

       @Override public void process( InputStream inputStream ) throws 
IOException
       {
         ...
       }
     } );

*    // write out the concept POJOs serialized...*
     session.write( concepts, new OutputStreamCallback()
     {
       @Override public void process( OutputStream outputStream )
       {
         ...
       }
     } );
At this point, you’ve written to the ‘document’ flowfile once, written to the 
‘concepts’ flowfile once and read the original FlowFile twice (well read the 
original flowfile once and read the clone once, which amounts to the same 
thing).
You could instead do something like:

FlowFile document = session.create(flowFile);
FlowFile concepts = session.create(flowFile);

try (final InputStream input = session.read(flowFile)) {
     try (final OutputStream documentOut = session.write(document);
           final OutputStream conceptOut = session.write(concept)) {

          // Perform processing.

     }
}

In this way, you avoid reading the input FlowFile twice. Of course, you 
provided an abstraction of the code, so it’s possible that this won’t actually 
work, depending on what you’re doing to read the input...

     document = session.putAttribute( document, "embedded-document", UUID );
     concepts = session.putAttribute( document, "embedded-document", UUID );
     session.transfer( document, DOCUMENT );
     session.transfer( concepts, CONCEPTS );
     session.remove( flowfile );
   }
   catch( Exception e )
   {
     session.transfer( flowfile, FAILURE );
   }
}

On 8/24/20 4:52 PM, Matt Burgess wrote:
Russell,

session.read() won't overwrite any contents of the incoming flow file,
but write() will. For #2, are you doing any processing on the file? If
not, wouldn't that be the original flowfile anyway? Or do you want it
to be a different flowfile on purpose (so you can send the incoming
flowfile to a different relationship)? You can use session.clone() to
create a new flowfile that has the same content and attributes from
the incoming flowfile, then handle that separately from the incoming
(original) flowfile. For #1, you could clone() the original flowfile
and do the read/process/write as part of a session.write(FlowFile,
StreamCallback) call, then you're technically reading the "new" file
content (which is the same of course) and overwriting it on the way
out.

Regards,
Matt

On Mon, Aug 24, 2020 at 6:37 PM Russell Bateman <r...@windofkeltia.com> wrote:
I am writing a custom processor that, upon processing a flowfile,
results  in two new flowfiles (neither keeping the exact, original
content) out two different relationships. I might like to route the
original flowfile to a separate relationship.

FlowFile original = session.get();

Do I need to call session.create()for the two new files?

  1. session.read()of original file's contents, not all of the way
     through, but send the processed output from what I do read as
     flowfile 1.
  2. session.read()of original file's contents and send resulting output
     as flowfile 2.
  3. session.transfer()of original flowfile.

I look at all of these session.read()and session.write()calls and I'm a
bit confused as to which to use that won't lose the original flowfile's
content after #1 so I can start over again in #2.

Thanks.

Reply via email to