Github user Mageswaran1989 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2521#discussion_r175496952
  
    --- Diff: 
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
 ---
    @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final 
ProcessSession session) thro
     
             String sessionId = livyController.get("sessionId");
             String livyUrl = livyController.get("livyUrl");
    -        String code = 
context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
    -        if (StringUtils.isEmpty(code)) {
    -            try (InputStream inputStream = session.read(flowFile)) {
    -                // If no code was provided, assume it is in the content of 
the incoming flow file
    -                code = IOUtils.toString(inputStream, charset);
    -            } catch (IOException ioe) {
    -                log.error("Error reading input flowfile, penalizing and 
routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
    -                flowFile = session.penalize(flowFile);
    -                session.transfer(flowFile, REL_FAILURE);
    -                return;
    -            }
    -        }
     
    -        code = StringEscapeUtils.escapeJavaScript(code);
    -        String payload = "{\"code\":\"" + code + "\"}";
    +
             try {
    -            final JSONObject result = submitAndHandleJob(livyUrl, 
livySessionService, sessionId, payload, statusCheckInterval);
    -            log.debug("ExecuteSparkInteractive Result of Job Submit: " + 
result);
    -            if (result == null) {
    -                session.transfer(flowFile, REL_FAILURE);
    -            } else {
    +
    +            if (isBatchJob) {
    +
    +                String jsonResponse = null;
    +
    +                if (StringUtils.isEmpty(jsonResponse)) {
    +                    try (InputStream inputStream = session.read(flowFile)) 
{
    +                        // If no code was provided, assume it is in the 
content of the incoming flow file
    +                        jsonResponse = IOUtils.toString(inputStream, 
charset);
    +                    } catch (IOException ioe) {
    +                        log.error("Error reading input flowfile, 
penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, 
ioe);
    +                        flowFile = session.penalize(flowFile);
    +                        session.transfer(flowFile, REL_FAILURE);
    +                        return;
    +                    }
    +                }
    +
    +                log.debug(" ====> jsonResponse: " + jsonResponse);
    +
                     try {
    -                    final JSONObject output = result.getJSONObject("data");
    -                    flowFile = session.write(flowFile, out -> 
out.write(output.toString().getBytes()));
    -                    flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
    -                    session.transfer(flowFile, REL_SUCCESS);
    -                } catch (JSONException je) {
    -                    // The result doesn't contain the data, just send the 
output object as the flow file content to failure (after penalizing)
    -                    log.error("Spark Session returned an error, sending 
the output JSON object as the flow file content to failure (after penalizing)");
    -                    flowFile = session.write(flowFile, out -> 
out.write(result.toString().getBytes()));
    +
    +                    final JSONObject jsonResponseObj = new 
JSONObject(jsonResponse);
    +
    +                    Map<String, String> headers = new HashMap<>();
    +                    headers.put("Content-Type", 
LivySessionService.APPLICATION_JSON);
    +                    headers.put("X-Requested-By", LivySessionService.USER);
    +                    headers.put("Accept", "application/json");
    +
    +                    JSONObject jobInfo = 
readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, 
headers);
    +
    +                    flowFile = session.write(flowFile, out -> 
out.write(jobInfo.toString().getBytes()));
                         flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
    -                    flowFile = session.penalize(flowFile);
    +
    +                    Thread.sleep(statusCheckInterval);
    +
    +                    String state  = jobInfo.getString("state");
    +                    log.debug(" ====> jsonResponseObj State: " + state);
    +
    +                    switch (state) {
    +                        case "success":
    +                            log.debug(" ====> success State: " + state);
    +                            session.transfer(flowFile, REL_SUCCESS);
    +                            break;
    +                        case "dead":
    +                            log.debug(" ====> dead State: " + state);
    +                            session.transfer(flowFile, REL_FAILURE);
    +                            break;
    +                        default:
    +                            log.debug(" ====> default State: " + state);
    +                            session.transfer(flowFile, REL_WAIT);
    +                            break;
    +                    }
    +
    +                } catch (JSONException | InterruptedException e) {
    +
    +                    //Incoming flow file is not an JSON file hence 
consider it to be an triggering point
    +
    +                    String batchPayload = "{ \"pyFiles\": [\"" 
+context.getProperty(PY_FILES).getValue()+ "\"], " +
    +                            "\"file\" : 
\""+context.getProperty(MAIN_PY_FILE).getValue()+"\" }";
    --- End diff --
    
    Sometime this week I am planning to add support for jar files, args and 
application name over the Livy options.
    
    The catch here is unlike plain Spark code, batch process code will take 
time to finish which is expected one as we know. So as a hack I was re-routing 
the Json response after batch submission to itself, where I poll the incoming 
flowfile and check whether it is a Json file and if so I will try to get the 
"livy url" to post again to know the status of the batch job as long as it 
runs. After knowing the the job finished, the success route is triggered.
    
    That was the reason the I have made an assumption if the incoming file is 
Json, it is from previous batch job submission.
    
    In short, the flow file :
    - Is considered as a triggering point (or)
    - Is considered as plain Spark code that compiles over Livy (or)
    - Is a Livy Json response, which can further be used to check the status of 
long running Spark batch job
    
    I was looking for the right Nifi way of handling this? 
    
    I feel I am too conservative and trying to fit all the functionalities on 
one processor.
    - Flow file/property can be used to run a Spark code
    - Pyfiles can be used to run Spark batch job
    - Jars can be used to run Spark batch job
    - Args options for batch mode
    - By rerouting the success to itself, it can monitor the long running job 
over Livy rest APIs


---

Reply via email to