Github user Mageswaran1989 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173349239 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); - String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); - if (StringUtils.isEmpty(code)) { - try (InputStream inputStream = session.read(flowFile)) { - // If no code was provided, assume it is in the content of the incoming flow file - code = IOUtils.toString(inputStream, charset); - } catch (IOException ioe) { - log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - return; - } - } - code = StringEscapeUtils.escapeJavaScript(code); - String payload = "{\"code\":\"" + code + "\"}"; + try { - final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); - log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); - if (result == null) { - session.transfer(flowFile, REL_FAILURE); - } else { + + if (isBatchJob) { + + String jsonResponse = null; + + if (StringUtils.isEmpty(jsonResponse)) { --- End diff -- Once current approach is accepted this can be taken care
---