Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r239241291 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); - String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); - if (StringUtils.isEmpty(code)) { - try (InputStream inputStream = session.read(flowFile)) { - // If no code was provided, assume it is in the content of the incoming flow file - code = IOUtils.toString(inputStream, charset); - } catch (IOException ioe) { - log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - return; - } - } - code = StringEscapeUtils.escapeJavaScript(code); - String payload = "{\"code\":\"" + code + "\"}"; + try { - final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); - log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); - if (result == null) { - session.transfer(flowFile, REL_FAILURE); - } else { + + if (isBatchJob) { + + String jsonResponse = null; + + if (StringUtils.isEmpty(jsonResponse)) { + try (InputStream inputStream = session.read(flowFile)) { + // If no code was provided, assume it is in the content of the incoming flow file + jsonResponse = IOUtils.toString(inputStream, charset); + } catch (IOException ioe) { + log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + } + + log.debug(" ====> jsonResponse: " + jsonResponse); + try { - final JSONObject output = result.getJSONObject("data"); - flowFile = session.write(flowFile, out -> out.write(output.toString().getBytes())); - flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); - session.transfer(flowFile, REL_SUCCESS); - } catch (JSONException je) { - // The result doesn't contain the data, just send the output object as the flow file content to failure (after penalizing) - log.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)"); - flowFile = session.write(flowFile, out -> out.write(result.toString().getBytes())); + + final JSONObject jsonResponseObj = new JSONObject(jsonResponse); + + Map<String, String> headers = new HashMap<>(); + headers.put("Content-Type", LivySessionService.APPLICATION_JSON); + headers.put("X-Requested-By", LivySessionService.USER); + headers.put("Accept", "application/json"); + + JSONObject jobInfo = readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, headers); + + flowFile = session.write(flowFile, out -> out.write(jobInfo.toString().getBytes())); flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); - flowFile = session.penalize(flowFile); + + Thread.sleep(statusCheckInterval); + + String state = jobInfo.getString("state"); + log.debug(" ====> jsonResponseObj State: " + state); + + switch (state) { + case "success": + log.debug(" ====> success State: " + state); + session.transfer(flowFile, REL_SUCCESS); + break; + case "dead": + log.debug(" ====> dead State: " + state); + session.transfer(flowFile, REL_FAILURE); + break; + default: + log.debug(" ====> default State: " + state); + session.transfer(flowFile, REL_WAIT); + break; + } + + } catch (JSONException | InterruptedException e) { + + //Incoming flow file is not an JSON file hence consider it to be an triggering point + + String batchPayload = "{ \"pyFiles\": [\"" +context.getProperty(PY_FILES).getValue()+ "\"], " + + "\"file\" : \""+context.getProperty(MAIN_PY_FILE).getValue()+"\" }"; --- End diff -- Since the current processor is called ExecuteSparkInteractive, you could move the batch functionality out into something called SubmitSparkJob or something. The outgoing flow file could contain the application ID and/or any other information that would allow you to monitor the job downstream (perhaps with InvokeHttp through Livy, e.g.) Are you still interested in working on this? It seems like very nice functionality to have in NiFi!
---