Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173213566 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); - String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); - if (StringUtils.isEmpty(code)) { - try (InputStream inputStream = session.read(flowFile)) { - // If no code was provided, assume it is in the content of the incoming flow file - code = IOUtils.toString(inputStream, charset); - } catch (IOException ioe) { - log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - return; - } - } - code = StringEscapeUtils.escapeJavaScript(code); - String payload = "{\"code\":\"" + code + "\"}"; + try { - final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); - log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); - if (result == null) { - session.transfer(flowFile, REL_FAILURE); - } else { + + if (isBatchJob) { + + String jsonResponse = null; + + if (StringUtils.isEmpty(jsonResponse)) { + try (InputStream inputStream = session.read(flowFile)) { + // If no code was provided, assume it is in the content of the incoming flow file + jsonResponse = IOUtils.toString(inputStream, charset); + } catch (IOException ioe) { + log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + } + + log.debug(" ====> jsonResponse: " + jsonResponse); + try { - final JSONObject output = result.getJSONObject("data"); - flowFile = session.write(flowFile, out -> out.write(output.toString().getBytes())); - flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); - session.transfer(flowFile, REL_SUCCESS); - } catch (JSONException je) { - // The result doesn't contain the data, just send the output object as the flow file content to failure (after penalizing) - log.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)"); - flowFile = session.write(flowFile, out -> out.write(result.toString().getBytes())); + + final JSONObject jsonResponseObj = new JSONObject(jsonResponse); + + Map<String, String> headers = new HashMap<>(); + headers.put("Content-Type", LivySessionService.APPLICATION_JSON); + headers.put("X-Requested-By", LivySessionService.USER); + headers.put("Accept", "application/json"); + + JSONObject jobInfo = readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, headers); + + flowFile = session.write(flowFile, out -> out.write(jobInfo.toString().getBytes())); flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); - flowFile = session.penalize(flowFile); + + Thread.sleep(statusCheckInterval); + + String state = jobInfo.getString("state"); + log.debug(" ====> jsonResponseObj State: " + state); + + switch (state) { + case "success": + log.debug(" ====> success State: " + state); + session.transfer(flowFile, REL_SUCCESS); + break; + case "dead": + log.debug(" ====> dead State: " + state); + session.transfer(flowFile, REL_FAILURE); + break; + default: + log.debug(" ====> default State: " + state); + session.transfer(flowFile, REL_WAIT); --- End diff -- Same as above for these log.debug messages as well.
---