[ 
https://issues.apache.org/jira/browse/NIFI-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17351090#comment-17351090
 ] 

Mark Payne commented on NIFI-8620:
----------------------------------

It is certainly not good that we are throwing a NullPointerException in this 
situation. We need to clean that up and throw a more appropriate Exception 
(FlowFileHandlingException). The underlying issue is that you cannot migrate 
children that were forked from a parent FlowFile without also migrating the 
parent FlowFile. I will update the Exception to ensure that it's more clear.

However, you can work around this restriction by first cloning the incoming 
FlowFile, and then migrating the clone to the second session, then use the 
second session to create the children, and you can then remove the clone:
{code:java}
final ProcessSession mainSession = sessionFactory.createSession(); 

FlowFile inputFlowFile = mainSession.get();
if (inputFlowFile == null) {
  return;
}

final ProcessSession secondSession = sessionFactory.createSession();

try (InputStream in = mainSession.read(clone)) {
  while (stillSomethingToRead) {

    // Create a clone and migrate it to the second session that we are able to 
fork children from it.
    FlowFile clone = session.clone(inputFlowFile);
    session.migrate(secondSession, clone);

    // read and process data
    inputData = in.read(...);
    transformedData = transform(inputData);

    // Create output flowfile
    FlowFile outputflowFile = secondSession.create(inputFlowfile);
    // write transformedData to outputflowFile content
    [...]
    // also put some attributes on outputflowFile
    [...]

    // Output the results without waiting
    secondSession.transfer(outputflowFiles, successRelationship);

    // Ensure that we remove the clone since we don't want to transfer it out 
anywhere.
    secondSession.remove(clone);


    secondSession.commit();
  }
}

mainsession.remove(inputFlowFile);  // Or transfer to an 'original' relationship

mainsession.commit()
{code}

> NullPointerException on commit with multiple ProcessSession
> -----------------------------------------------------------
>
>                 Key: NIFI-8620
>                 URL: https://issues.apache.org/jira/browse/NIFI-8620
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Core Framework
>    Affects Versions: 1.13.0
>         Environment: Kubernetes on linux
>            Reporter: Stanislas Deneuville
>            Priority: Major
>
> {color:#000000}For my particular use case I read and process a big file and 
> produce smaller results along the way. I wanted to be able to regularly 
> commit what has already been done before the end of the whole process.{color}
> {color:#000000}So I inspired myself from the BinManager and created a custom 
> processor that use 2 ProcessSessions:{color}
>  * {color:#000000}a first main session for the read input flowfile and 
> creating new ones forked from it{color}
>  * {color:#000000}a second session for committing on the fly{color}
>   
> {color:#000000}The workflow is something like that:{color}
> {code:java}
> final ProcessSession mainSession = sessionFactory.createSession(); 
> final ProcessSession secondSession = sessionFactory.createSession();
> FlowFile inputFlowFile = mainSession.get();
> try (InputStream in = mainSession.read(inputFlowFile)) {
>   while (stillSomethingToRead) {
>     // read and process data
>     inputData = in.read(...);
>     transformedData = transform(inputData);
>     // Create output flowfile
>     FlowFile outputflowFile = mainSession.create(inputFlowfile);
>     // write transformedData to outputflowFile content
>     [...]
>     // also put some attributes on outputflowFile
>     [...]
>     // Output the results without waiting
>     mainSession.migrate(secondSession, outputflowFiles);
>     secondSession.transfer(outputflowFiles, successRelationship);
>     secondSession.commit();
>   }
> }
> mainsession.commit()
> {code}
>  
> It works well on Nifi Mock, however in a real Nifi environment I get a null 
> pointer exception during the commit.
> {noformat}
> [id=9f6342ac-ae78-30f7-22cf-6d7517618f19] Unknown error occurred: 
> java.lang.NullPointerException java.lang.NullPointerException: null
>         at 
> org.apache.nifi.controller.repository.StandardProcessSession.updateEventContentClaims(StandardProcessSession.java:786)
>         at 
> org.apache.nifi.controller.repository.StandardProcessSession.updateProvenanceRepo(StandardProcessSession.java:600)
>         at 
> org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:353)
>         at 
> org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:332){noformat}
>  
>  {color:#000000}Note: I don't do anything related to Data Provenance in my 
> code.{color}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to