Also, here is a pattern we tend to use for reading using try-with
FlowFile output = session.create(inputFlowFile); //create linkage in provenance try (InputStream is = session.read(inputFlowFile); OutputStream os = session.write(output);) { //Do stuff with streams. is.close(); os.close(); //These need to be closed before transfer session.transfer(output, REL_SUCCESS); session.transfer(output, REL_ORIGINAL); } catch (Exception ex) { getLogger().error("", ex); session.remove(output); session.transfer(inputFlowFile, REL_ERROR); } On Wed, Jul 24, 2019 at 2:28 PM Mike Thomsen <mikerthom...@gmail.com> wrote: > Two problems I see right here are this: > > flowfile = session.create(); > flowfile = session.get(); > > > The second call will replace the newly created flowfile that comes from > session.create() with either null or the next element in the queue. That > will result in you having a flowfile created in the session that you cannot > either transfer to a downstream relationship with session.transfer() or > remove with session.remove(). End result there will be the session will > fail to commit because it doesn't know what it should do with that empty > flowfile. > > Second: > > new FileReader("JSON_PATH") > > > That will result in java.io.FileReader looking for a file named > "JSON_PATH" in the current working folder. Unless that is your intent, you > need to remove the quotes and make sure it's pointing to a real path on the > file system. > > On Tue, Jul 23, 2019 at 10:13 PM Farooq Mustafa <farooq.must...@gmail.com> > wrote: > >> Hello, >> >> >> I am writing my first custom processor. I followed the example here >> https://www.nifi.rocks/developing-a-custom-apache-nifi-processor-json/ >> >> >> In the settings, I have /tmp/jsoniput/sample1.json file >> >> >> The source code; >> >> public void onTrigger(final ProcessContext context, final ProcessSession >> session) throws ProcessException { >> >> final AtomicReference<String> value = new AtomicReference<>(); >> getLogger().warn("sample - Farooq - 1 , value = <" + value.toString() + >> ">"); >> >> FlowFile flowfile = session.get(); >> getLogger().warn("sample - Farooq - 2"); >> >> if ( null == flowfile) { >> getLogger().warn("sample - Farooq - 2.1 - flowfile is NULL *** "); >> flowfile = session.create(); >> flowfile = session.get(); >> getLogger().warn("sample - Farooq - 2.2 - session.create() / >> session.get() --- NOT SURE "); >> } >> getLogger().warn("sample - Farooq - 2.3 JSON_PATH :" + >> context.getProperty(JSON_PATH).getValue()); >> >> session.read(flowfile, new InputStreamCallback() { >> @Override >> public void process(InputStream in) throws IOException { >> >> try { >> getLogger().warn("sample - Farooq - 3 JSON_PATH :" + >> context.getProperty(JSON_PATH).getValue()); >> Object obj = new JSONParser().parse(new FileReader("JSON_PATH")); >> getLogger().warn("sample - Farooq - 3.1 "); >> >> >> >> The log output: >> >> 2019-07-20 13:58:42,625 WARN [Timer-Driven Process Thread-4] >> o.a.nifi.processors.sitc.JsonProcessor >> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] sample - Farooq - 1 >> , value = <null> >> >> 2019-07-20 13:58:42,625 WARN [Timer-Driven Process Thread-4] >> o.a.nifi.processors.sitc.JsonProcessor >> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] sample - Farooq - 2 >> >> 2019-07-20 13:58:42,625 WARN [Timer-Driven Process Thread-4] >> o.a.nifi.processors.sitc.JsonProcessor >> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] sample - Farooq - >> 2.1 - flowfile is NULL *** >> >> 2019-07-20 13:58:42,625 WARN [Timer-Driven Process Thread-4] >> o.a.nifi.processors.sitc.JsonProcessor >> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] sample - Farooq - >> 2.2 - session.create() / session.get() --- NOT SURE >> >> 2019-07-20 13:58:42,625 WARN [Timer-Driven Process Thread-4] >> o.a.nifi.processors.sitc.JsonProcessor >> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] sample - Farooq - >> 2.3 JSON_PATH :/tmp/jsoninput >> >> 2019-07-20 13:58:42,626 ERROR [Timer-Driven Process Thread-4] >> o.a.nifi.processors.sitc.JsonProcessor >> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] >> JsonProcessor[id=fd8fe4ca-016b-1000-a6ed-c0e224bc7267] failed to process >> session due to java.lang.NullPointerException; Processor Administratively >> Yielded for 1 sec: java.lang.NullPointerException >> >> java.lang.NullPointerException: null >> >> at >> org.apache.nifi.controller.repository.StandardProcessSession.getRecord(StandardProcessSession.java:574) >> >> at >> org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3132) >> >> at >> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2187) >> >> >> And the basic flow: >> [image: image.png] >> >> >> >> Any ideas or guidance will be appreciated. >> >> >> Thanks >> >> Farooq >> >