Thanks for your suggestions, Matt.
I decided to keep the original flowfile only upon failure. So, I have
the embedded-document file and the serialized POJOs created from
processing the non embedded-document part as the result if successful.
(Condensed code at end...)
Now I have three questions...
1. I seem not to have placated NiFi with the assurance that I have
transferred or disposed of all three flowfiles suitably. I get:
java.lang.AssertionError:
org.apache.nifi.processor.exception.FlowFileHandlingException: Cannot
commit session because the following FlowFiles have not been removed or
transferred: [2]
*Which of the three flowfiles does [2] refer to? Or does it just mean I
botched two flowfiles. *
2. session.clone()generates a new flowfile with the identical uuid. I
don't think I want the result to be two flowfiles with the same uuid. I
am binding them together so I can associate them later using attribute
embedded-document. *Should I/How do I force cloning to acquire new
**uuid**s?*
3. A question on theory... *Wouldn't all of this cloning be expensive*
and I should just clone for one of the new files and then mangle the
original flowfile to become the other?
Thanks,
Russ
@Override
public void onTrigger( final ProcessContext context, final
ProcessSession session ) throws ProcessException
{
FlowFile flowfile = session.get();
if( flowfile == null )
{
context.yield();
return;
}
try
{
final String UUID = flowfile.getAttribute( NiFiUtilities.UUID );
FlowFile document = session.clone( flowfile );
* // excerpt and write the embedded document to a new flowfile...*
session.write( document, new OutputStreamCallback()
{
@Override public void process( OutputStream outputStream )
{
// read from the original flowfile copying to the output
flowfile...
session.read( flowfile, new InputStreamCallback()
{
@Override public void process( InputStream inputStream )
throws IOException
{
...
}
} );
}
} );
FlowFile concepts = session.clone( flowfile );
AtomicReference< ConceptList > conceptListHolder = new
AtomicReference<>();
* // parse the concepts into a POJO list...*
session.read( concepts, new InputStreamCallback()
{
final ConceptList conceptList = conceptListHolder.get();
@Override public void process( InputStream inputStream ) throws
IOException
{
...
}
} );
* // write out the concept POJOs serialized...*
session.write( concepts, new OutputStreamCallback()
{
@Override public void process( OutputStream outputStream )
{
...
}
} );
document = session.putAttribute( document, "embedded-document", UUID );
concepts = session.putAttribute( document, "embedded-document", UUID );
session.transfer( document, DOCUMENT );
session.transfer( concepts, CONCEPTS );
session.remove( flowfile );
}
catch( Exception e )
{
session.transfer( flowfile, FAILURE );
}
}
On 8/24/20 4:52 PM, Matt Burgess wrote:
Russell,
session.read() won't overwrite any contents of the incoming flow file,
but write() will. For #2, are you doing any processing on the file? If
not, wouldn't that be the original flowfile anyway? Or do you want it
to be a different flowfile on purpose (so you can send the incoming
flowfile to a different relationship)? You can use session.clone() to
create a new flowfile that has the same content and attributes from
the incoming flowfile, then handle that separately from the incoming
(original) flowfile. For #1, you could clone() the original flowfile
and do the read/process/write as part of a session.write(FlowFile,
StreamCallback) call, then you're technically reading the "new" file
content (which is the same of course) and overwriting it on the way
out.
Regards,
Matt
On Mon, Aug 24, 2020 at 6:37 PM Russell Bateman <r...@windofkeltia.com> wrote:
I am writing a custom processor that, upon processing a flowfile,
results in two new flowfiles (neither keeping the exact, original
content) out two different relationships. I might like to route the
original flowfile to a separate relationship.
FlowFile original = session.get();
Do I need to call session.create()for the two new files?
1. session.read()of original file's contents, not all of the way
through, but send the processed output from what I do read as
flowfile 1.
2. session.read()of original file's contents and send resulting output
as flowfile 2.
3. session.transfer()of original flowfile.
I look at all of these session.read()and session.write()calls and I'm a
bit confused as to which to use that won't lose the original flowfile's
content after #1 so I can start over again in #2.
Thanks.