Russ,
Several comments here. I’ve included them inline, below.
Hope it’s helpful.
Thanks
-Mark
On Aug 25, 2020, at 2:09 PM, Russell Bateman <r...@windofkeltia.com> wrote:
Thanks for your suggestions, Matt.
I decided to keep the original flowfile only upon failure. So, I have the
embedded-document file and the serialized POJOs created from processing the non
embedded-document part as the result if successful. (Condensed code at end...)
Now I have three questions...
1. I seem not to have placated NiFi with the assurance that I have transferred
or disposed of all three flowfiles suitably. I get:
java.lang.AssertionError:
org.apache.nifi.processor.exception.FlowFileHandlingException: Cannot commit
session because the following FlowFiles have not been removed or transferred:
[2]
This is probably because at the end of the block, you catch Exception and then
route the original FlowFile to failure. But you’ve already cloned it and didn’t
deal with the clone.
*Which of the three flowfiles does [2] refer to? Or does it just mean I botched
two flowfiles. *
2. session.clone()generates a new flowfile with the identical uuid. I don't
think I want the result to be two flowfiles with the same uuid. I am binding
them together so I can associate them later using attribute embedded-document.
*Should I/How do I force cloning to acquire new **uuid**s?*
This appears to actually be a bug in the mock framework. It *should* have a
unique uuid, and would in a running NiFi instance. Feel free to file a Jira for
that.
3. A question on theory... *Wouldn't all of this cloning be expensive* and I
should just clone for one of the new files and then mangle the original
flowfile to become the other?
session.clone() is not particularly expensive. It’s just creating a new
FlowFile object. It doesn’t clone the FlowFile’s contents.
That said, it is probably more appropriate to call session.create(flowFile),
rather than session.clone(flowFile). It makes little difference in practice but
what you’re really doing is forking a child, and that will come across more
cleanly in the Provenance lineage that is generated if using
session.create(flowFile).
Additional comments in code below.
Thanks,
Russ
@Override
public void onTrigger( final ProcessContext context, final ProcessSession
session ) throws ProcessException
{
FlowFile flowfile = session.get();
if( flowfile == null )
{
context.yield();
No need to yield here. Let the framework handle the scheduling.
ProcessContext.yield() is meant for cases where you’re communicating with some
external service, for instance, and you know the service is unavailable or rate
limiting you or something like that. You can’t make any progress, so tell NiFi
to not bother wasting CPU cycles with this Processor.
return;
}
try
{
final String UUID = flowfile.getAttribute( NiFiUtilities.UUID );
FlowFile document = session.clone( flowfile );
* // excerpt and write the embedded document to a new flowfile...*
session.write( document, new OutputStreamCallback()
{
@Override public void process( OutputStream outputStream )
{
// read from the original flowfile copying to the output flowfile...
session.read( flowfile, new InputStreamCallback()
{
@Override public void process( InputStream inputStream ) throws
IOException
{
...
}
} );
}
} );
FlowFile concepts = session.clone( flowfile );
AtomicReference< ConceptList > conceptListHolder = new AtomicReference<>();
* // parse the concepts into a POJO list...*
session.read( concepts, new InputStreamCallback()
{
final ConceptList conceptList = conceptListHolder.get();
@Override public void process( InputStream inputStream ) throws
IOException
{
...
}
} );
* // write out the concept POJOs serialized...*
session.write( concepts, new OutputStreamCallback()
{
@Override public void process( OutputStream outputStream )
{
...
}
} );
At this point, you’ve written to the ‘document’ flowfile once, written to the
‘concepts’ flowfile once and read the original FlowFile twice (well read the
original flowfile once and read the clone once, which amounts to the same
thing).
You could instead do something like:
FlowFile document = session.create(flowFile);
FlowFile concepts = session.create(flowFile);
try (final InputStream input = session.read(flowFile)) {
try (final OutputStream documentOut = session.write(document);
final OutputStream conceptOut = session.write(concept)) {
// Perform processing.
}
}
In this way, you avoid reading the input FlowFile twice. Of course, you
provided an abstraction of the code, so it’s possible that this won’t actually
work, depending on what you’re doing to read the input...
document = session.putAttribute( document, "embedded-document", UUID );
concepts = session.putAttribute( document, "embedded-document", UUID );
session.transfer( document, DOCUMENT );
session.transfer( concepts, CONCEPTS );
session.remove( flowfile );
}
catch( Exception e )
{
session.transfer( flowfile, FAILURE );
}
}
On 8/24/20 4:52 PM, Matt Burgess wrote:
Russell,
session.read() won't overwrite any contents of the incoming flow file,
but write() will. For #2, are you doing any processing on the file? If
not, wouldn't that be the original flowfile anyway? Or do you want it
to be a different flowfile on purpose (so you can send the incoming
flowfile to a different relationship)? You can use session.clone() to
create a new flowfile that has the same content and attributes from
the incoming flowfile, then handle that separately from the incoming
(original) flowfile. For #1, you could clone() the original flowfile
and do the read/process/write as part of a session.write(FlowFile,
StreamCallback) call, then you're technically reading the "new" file
content (which is the same of course) and overwriting it on the way
out.
Regards,
Matt
On Mon, Aug 24, 2020 at 6:37 PM Russell Bateman <r...@windofkeltia.com> wrote:
I am writing a custom processor that, upon processing a flowfile,
results in two new flowfiles (neither keeping the exact, original
content) out two different relationships. I might like to route the
original flowfile to a separate relationship.
FlowFile original = session.get();
Do I need to call session.create()for the two new files?
1. session.read()of original file's contents, not all of the way
through, but send the processed output from what I do read as
flowfile 1.
2. session.read()of original file's contents and send resulting output
as flowfile 2.
3. session.transfer()of original flowfile.
I look at all of these session.read()and session.write()calls and I'm a
bit confused as to which to use that won't lose the original flowfile's
content after #1 so I can start over again in #2.
Thanks.