[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392271#comment-16392271 ]
ASF GitHub Bot commented on NIFI-4946: -------------------------------------- Github user Mageswaran1989 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173349239 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); - String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); - if (StringUtils.isEmpty(code)) { - try (InputStream inputStream = session.read(flowFile)) { - // If no code was provided, assume it is in the content of the incoming flow file - code = IOUtils.toString(inputStream, charset); - } catch (IOException ioe) { - log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - return; - } - } - code = StringEscapeUtils.escapeJavaScript(code); - String payload = "{\"code\":\"" + code + "\"}"; + try { - final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); - log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); - if (result == null) { - session.transfer(flowFile, REL_FAILURE); - } else { + + if (isBatchJob) { + + String jsonResponse = null; + + if (StringUtils.isEmpty(jsonResponse)) { --- End diff -- Once current approach is accepted this can be taken care > nifi-spark-bundle : Adding support for pyfiles, file, jars options > ------------------------------------------------------------------ > > Key: NIFI-4946 > URL: https://issues.apache.org/jira/browse/NIFI-4946 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions > Affects Versions: 1.6.0 > Environment: Ubuntu 16.04, IntelliJ > Reporter: Mageswaran > Priority: Major > Fix For: 1.6.0 > > Attachments: nifi-spark-options.png, nifi-spark.png > > > Adding support for submitting PySpark based Sparks jobs (which is normally > structured as modules) over Livy on existing "ExecuteSparkInteractive" > processor. > This is done by reading file paths for pyfiles and file and an option from > user whether the processor should trigger a batch job or not. > [https://livy.incubator.apache.org/docs/latest/rest-api.html] > *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 > )|https://github.com/apache/nifi/pull/2521]* > * Check whether the processor has to handle code or submit a Spark job > * Read incoming flow file > ** If batch == true > *** If flow file matches Livy `batches` JSON response through `wait` loop > **** Wait for Status Check Interval > **** Read the state > **** If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > *** Else > **** Ignore the flow file > **** Trigger the Spark job over Livy `batches` endpoint > **** Read the state of the submitted job > **** If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > ** Else: > *** Existing Logic to handle `Code` > > !nifi-spark-options.png! > !nifi-spark.png! > > Thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)