Also, here is a pattern we tend to use for reading using try-with

FlowFile output = session.create(inputFlowFile); //create linkage in
provenance
try (InputStream is = session.read(inputFlowFile); OutputStream os =
session.write(output);) {
    //Do stuff with streams.

    is.close();
   os.close(); //These need to be closed before transfer
   session.transfer(output, REL_SUCCESS);
   session.transfer(output, REL_ORIGINAL);
} catch (Exception ex) {
getLogger().error("", ex);
    session.remove(output);
    session.transfer(inputFlowFile, REL_ERROR);
}

On Wed, Jul 24, 2019 at 2:28 PM Mike Thomsen <mikerthom...@gmail.com> wrote:

> Two problems I see right here are this:
>
> flowfile = session.create();
>     flowfile = session.get();
>
>
> The second call will replace the newly created flowfile that comes from
> session.create() with either null or the next element in the queue. That
> will result in you having a flowfile created in the session that you cannot
> either transfer to a downstream relationship with session.transfer() or
> remove with session.remove(). End result there will be the session will
> fail to commit because it doesn't know what it should do with that empty
> flowfile.
>
> Second:
>
> new FileReader("JSON_PATH")
>
>
> That will result in java.io.FileReader looking for a file named
> "JSON_PATH" in the current working folder. Unless that is your intent, you
> need to remove the quotes and make sure it's pointing to a real path on the
> file system.
>
> On Tue, Jul 23, 2019 at 10:13 PM Farooq Mustafa <farooq.must...@gmail.com>
> wrote:
>
>> Hello,
>>
>>
>> I am writing my first custom processor. I followed the example here
>> https://www.nifi.rocks/developing-a-custom-apache-nifi-processor-json/
>>
>>
>> In the settings, I have /tmp/jsoniput/sample1.json file
>>
>>
>> The source code;
>>
>> public void onTrigger(final ProcessContext context, final ProcessSession 
>> session) throws ProcessException {
>>
>>   final AtomicReference<String> value = new AtomicReference<>();
>>   getLogger().warn("sample - Farooq - 1 , value = <" + value.toString() + 
>> ">");
>>
>>   FlowFile flowfile = session.get();
>>   getLogger().warn("sample - Farooq - 2");
>>
>>   if ( null == flowfile) {
>>     getLogger().warn("sample - Farooq - 2.1 - flowfile is NULL *** ");
>>     flowfile = session.create();
>>     flowfile = session.get();
>>     getLogger().warn("sample - Farooq - 2.2 - session.create() / 
>> session.get() --- NOT SURE ");
>>   }
>>   getLogger().warn("sample - Farooq - 2.3 JSON_PATH :" + 
>> context.getProperty(JSON_PATH).getValue());
>>
>>   session.read(flowfile, new InputStreamCallback() {
>>     @Override
>>     public void process(InputStream in) throws IOException {
>>
>>       try {
>>         getLogger().warn("sample - Farooq - 3 JSON_PATH :" + 
>> context.getProperty(JSON_PATH).getValue());
>>         Object obj = new JSONParser().parse(new FileReader("JSON_PATH"));
>>         getLogger().warn("sample - Farooq - 3.1 ");
>>
>>
>>
>> The log output:
>>
>> 2019-07-20 13:58:42,625 WARN [Timer-Driven Process Thread-4]
>> o.a.nifi.processors.sitc.JsonProcessor
>> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] sample - Farooq - 1
>> , value = <null>
>>
>> 2019-07-20 13:58:42,625 WARN [Timer-Driven Process Thread-4]
>> o.a.nifi.processors.sitc.JsonProcessor
>> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] sample - Farooq - 2
>>
>> 2019-07-20 13:58:42,625 WARN [Timer-Driven Process Thread-4]
>> o.a.nifi.processors.sitc.JsonProcessor
>> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] sample - Farooq -
>> 2.1 - flowfile is NULL ***
>>
>> 2019-07-20 13:58:42,625 WARN [Timer-Driven Process Thread-4]
>> o.a.nifi.processors.sitc.JsonProcessor
>> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] sample - Farooq -
>> 2.2 - session.create() / session.get() --- NOT SURE
>>
>> 2019-07-20 13:58:42,625 WARN [Timer-Driven Process Thread-4]
>> o.a.nifi.processors.sitc.JsonProcessor
>> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] sample - Farooq -
>> 2.3 JSON_PATH :/tmp/jsoninput
>>
>> 2019-07-20 13:58:42,626 ERROR [Timer-Driven Process Thread-4]
>> o.a.nifi.processors.sitc.JsonProcessor
>> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267]
>> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] failed to process
>> session due to java.lang.NullPointerException; Processor Administratively
>> Yielded for 1 sec: java.lang.NullPointerException
>>
>> java.lang.NullPointerException: null
>>
>> at
>> org.apache.nifi.controller.repository.StandardProcessSession.getRecord(StandardProcessSession.java:574)
>>
>> at
>> org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3132)
>>
>> at
>> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2187)
>>
>>
>> And the basic flow:
>> [image: image.png]
>>
>>
>>
>> Any ideas or guidance will be appreciated.
>>
>>
>> Thanks
>>
>> Farooq
>>
>

Reply via email to