[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392281#comment-16392281 ] ASF GitHub Bot commented on NIFI-4946: -- Github user Mageswaran1989 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173350325 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); -String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); -if (StringUtils.isEmpty(code)) { -try (InputStream inputStream = session.read(flowFile)) { -// If no code was provided, assume it is in the content of the incoming flow file -code = IOUtils.toString(inputStream, charset); -} catch (IOException ioe) { -log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); -flowFile = session.penalize(flowFile); -session.transfer(flowFile, REL_FAILURE); -return; -} -} -code = StringEscapeUtils.escapeJavaScript(code); -String payload = "{\"code\":\"" + code + "\"}"; + try { -final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); -log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); -if (result == null) { -session.transfer(flowFile, REL_FAILURE); -} else { + +if (isBatchJob) { + +String jsonResponse = null; + +if (StringUtils.isEmpty(jsonResponse)) { +try (InputStream inputStream = session.read(flowFile)) { +// If no code was provided, assume it is in the content of the incoming flow file +jsonResponse = IOUtils.toString(inputStream, charset); +} catch (IOException ioe) { +log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); +flowFile = session.penalize(flowFile); +session.transfer(flowFile, REL_FAILURE); +return; +} +} + +log.debug(" > jsonResponse: " + jsonResponse); + try { -final JSONObject output = result.getJSONObject("data"); -flowFile = session.write(flowFile, out -> out.write(output.toString().getBytes())); -flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -session.transfer(flowFile, REL_SUCCESS); -} catch (JSONException je) { -// The result doesn't contain the data, just send the output object as the flow file content to failure (after penalizing) -log.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)"); -flowFile = session.write(flowFile, out -> out.write(result.toString().getBytes())); + +final JSONObject jsonResponseObj = new JSONObject(jsonResponse); + +Mapheaders = new HashMap<>(); +headers.put("Content-Type", LivySessionService.APPLICATION_JSON); +headers.put("X-Requested-By", LivySessionService.USER); +headers.put("Accept", "application/json"); + +JSONObject jobInfo = readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, headers); + +flowFile = session.write(flowFile, out -> out.write(jobInfo.toString().getBytes())); flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -flowFile = session.penalize(flowFile); + +Thread.sleep(statusCheckInterval); + +String state = jobInfo.getString("state"); +log.debug(" > jsonResponseObj State: " + state); +
[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...
Github user Mageswaran1989 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173350325 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); -String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); -if (StringUtils.isEmpty(code)) { -try (InputStream inputStream = session.read(flowFile)) { -// If no code was provided, assume it is in the content of the incoming flow file -code = IOUtils.toString(inputStream, charset); -} catch (IOException ioe) { -log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); -flowFile = session.penalize(flowFile); -session.transfer(flowFile, REL_FAILURE); -return; -} -} -code = StringEscapeUtils.escapeJavaScript(code); -String payload = "{\"code\":\"" + code + "\"}"; + try { -final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); -log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); -if (result == null) { -session.transfer(flowFile, REL_FAILURE); -} else { + +if (isBatchJob) { + +String jsonResponse = null; + +if (StringUtils.isEmpty(jsonResponse)) { +try (InputStream inputStream = session.read(flowFile)) { +// If no code was provided, assume it is in the content of the incoming flow file +jsonResponse = IOUtils.toString(inputStream, charset); +} catch (IOException ioe) { +log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); +flowFile = session.penalize(flowFile); +session.transfer(flowFile, REL_FAILURE); +return; +} +} + +log.debug(" > jsonResponse: " + jsonResponse); + try { -final JSONObject output = result.getJSONObject("data"); -flowFile = session.write(flowFile, out -> out.write(output.toString().getBytes())); -flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -session.transfer(flowFile, REL_SUCCESS); -} catch (JSONException je) { -// The result doesn't contain the data, just send the output object as the flow file content to failure (after penalizing) -log.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)"); -flowFile = session.write(flowFile, out -> out.write(result.toString().getBytes())); + +final JSONObject jsonResponseObj = new JSONObject(jsonResponse); + +Mapheaders = new HashMap<>(); +headers.put("Content-Type", LivySessionService.APPLICATION_JSON); +headers.put("X-Requested-By", LivySessionService.USER); +headers.put("Accept", "application/json"); + +JSONObject jobInfo = readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, headers); + +flowFile = session.write(flowFile, out -> out.write(jobInfo.toString().getBytes())); flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -flowFile = session.penalize(flowFile); + +Thread.sleep(statusCheckInterval); + +String state = jobInfo.getString("state"); +log.debug(" > jsonResponseObj State: " + state); + +switch (state) { +case "success": +log.debug(" > success State: " + state); +session.transfer(flowFile, REL_SUCCESS);
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392271#comment-16392271 ] ASF GitHub Bot commented on NIFI-4946: -- Github user Mageswaran1989 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173349239 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); -String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); -if (StringUtils.isEmpty(code)) { -try (InputStream inputStream = session.read(flowFile)) { -// If no code was provided, assume it is in the content of the incoming flow file -code = IOUtils.toString(inputStream, charset); -} catch (IOException ioe) { -log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); -flowFile = session.penalize(flowFile); -session.transfer(flowFile, REL_FAILURE); -return; -} -} -code = StringEscapeUtils.escapeJavaScript(code); -String payload = "{\"code\":\"" + code + "\"}"; + try { -final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); -log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); -if (result == null) { -session.transfer(flowFile, REL_FAILURE); -} else { + +if (isBatchJob) { + +String jsonResponse = null; + +if (StringUtils.isEmpty(jsonResponse)) { --- End diff -- Once current approach is accepted this can be taken care > nifi-spark-bundle : Adding support for pyfiles, file, jars options > -- > > Key: NIFI-4946 > URL: https://issues.apache.org/jira/browse/NIFI-4946 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 > Environment: Ubuntu 16.04, IntelliJ >Reporter: Mageswaran >Priority: Major > Fix For: 1.6.0 > > Attachments: nifi-spark-options.png, nifi-spark.png > > > Adding support for submitting PySpark based Sparks jobs (which is normally > structured as modules) over Livy on existing "ExecuteSparkInteractive" > processor. > This is done by reading file paths for pyfiles and file and an option from > user whether the processor should trigger a batch job or not. > [https://livy.incubator.apache.org/docs/latest/rest-api.html] > *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 > )|https://github.com/apache/nifi/pull/2521]* > * Check whether the processor has to handle code or submit a Spark job > * Read incoming flow file > ** If batch == true > *** If flow file matches Livy `batches` JSON response through `wait` loop > Wait for Status Check Interval > Read the state > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > *** Else > Ignore the flow file > Trigger the Spark job over Livy `batches` endpoint > Read the state of the submitted job > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > ** Else: > *** Existing Logic to handle `Code` > > !nifi-spark-options.png! > !nifi-spark.png! > > Thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...
Github user Mageswaran1989 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173349239 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); -String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); -if (StringUtils.isEmpty(code)) { -try (InputStream inputStream = session.read(flowFile)) { -// If no code was provided, assume it is in the content of the incoming flow file -code = IOUtils.toString(inputStream, charset); -} catch (IOException ioe) { -log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); -flowFile = session.penalize(flowFile); -session.transfer(flowFile, REL_FAILURE); -return; -} -} -code = StringEscapeUtils.escapeJavaScript(code); -String payload = "{\"code\":\"" + code + "\"}"; + try { -final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); -log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); -if (result == null) { -session.transfer(flowFile, REL_FAILURE); -} else { + +if (isBatchJob) { + +String jsonResponse = null; + +if (StringUtils.isEmpty(jsonResponse)) { --- End diff -- Once current approach is accepted this can be taken care ---
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392268#comment-16392268 ] ASF GitHub Bot commented on NIFI-4946: -- Github user Mageswaran1989 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173349052 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -83,6 +83,62 @@ .expressionLanguageSupported(true) .build(); +public static final PropertyDescriptor IS_BATCH_JOB = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-is_batch_job") +.displayName("Is Batch Job") +.description("If true, the `Code` part is ignored and the flow file from previous stage is considered " ++ "as a triggering event and not as code for Spark session. When `Wait` state is self routed" ++ "the livy json response flow file from previous Spark job is used to poll the job status" ++ "for sucess or failure") +.required(true) +.allowableValues("true", "false") +.defaultValue("false") +.build(); + +public static final PropertyDescriptor PY_FILES = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-pyfiles") +.displayName("pyFiles") +.description("Python files to be used in this batch session that includes *.py, *.zip files") +.required(false) +.addValidator(StandardValidators.createURLorFileValidator()) +.expressionLanguageSupported(false) +.build(); + +public static final PropertyDescriptor JAR_FILES = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-jarfiles") +.displayName("jars") +.description("jars to be used in this batch session") +.required(false) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.expressionLanguageSupported(false) +.build(); + +public static final PropertyDescriptor NAME = new PropertyDescriptor.Builder() --- End diff -- Like said before, not yet considered. We just wanted to get a hang of the code with our basic requirements > nifi-spark-bundle : Adding support for pyfiles, file, jars options > -- > > Key: NIFI-4946 > URL: https://issues.apache.org/jira/browse/NIFI-4946 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 > Environment: Ubuntu 16.04, IntelliJ >Reporter: Mageswaran >Priority: Major > Fix For: 1.6.0 > > Attachments: nifi-spark-options.png, nifi-spark.png > > > Adding support for submitting PySpark based Sparks jobs (which is normally > structured as modules) over Livy on existing "ExecuteSparkInteractive" > processor. > This is done by reading file paths for pyfiles and file and an option from > user whether the processor should trigger a batch job or not. > [https://livy.incubator.apache.org/docs/latest/rest-api.html] > *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 > )|https://github.com/apache/nifi/pull/2521]* > * Check whether the processor has to handle code or submit a Spark job > * Read incoming flow file > ** If batch == true > *** If flow file matches Livy `batches` JSON response through `wait` loop > Wait for Status Check Interval > Read the state > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > *** Else > Ignore the flow file > Trigger the Spark job over Livy `batches` endpoint > Read the state of the submitted job > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > ** Else: > *** Existing Logic to handle `Code` > > !nifi-spark-options.png! > !nifi-spark.png! > > Thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...
Github user Mageswaran1989 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173349052 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -83,6 +83,62 @@ .expressionLanguageSupported(true) .build(); +public static final PropertyDescriptor IS_BATCH_JOB = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-is_batch_job") +.displayName("Is Batch Job") +.description("If true, the `Code` part is ignored and the flow file from previous stage is considered " ++ "as a triggering event and not as code for Spark session. When `Wait` state is self routed" ++ "the livy json response flow file from previous Spark job is used to poll the job status" ++ "for sucess or failure") +.required(true) +.allowableValues("true", "false") +.defaultValue("false") +.build(); + +public static final PropertyDescriptor PY_FILES = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-pyfiles") +.displayName("pyFiles") +.description("Python files to be used in this batch session that includes *.py, *.zip files") +.required(false) +.addValidator(StandardValidators.createURLorFileValidator()) +.expressionLanguageSupported(false) +.build(); + +public static final PropertyDescriptor JAR_FILES = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-jarfiles") +.displayName("jars") +.description("jars to be used in this batch session") +.required(false) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.expressionLanguageSupported(false) +.build(); + +public static final PropertyDescriptor NAME = new PropertyDescriptor.Builder() --- End diff -- Like said before, not yet considered. We just wanted to get a hang of the code with our basic requirements ---
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392267#comment-16392267 ] ASF GitHub Bot commented on NIFI-4946: -- Github user Mageswaran1989 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173348751 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -83,6 +83,62 @@ .expressionLanguageSupported(true) .build(); +public static final PropertyDescriptor IS_BATCH_JOB = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-is_batch_job") +.displayName("Is Batch Job") +.description("If true, the `Code` part is ignored and the flow file from previous stage is considered " ++ "as a triggering event and not as code for Spark session. When `Wait` state is self routed" ++ "the livy json response flow file from previous Spark job is used to poll the job status" ++ "for sucess or failure") +.required(true) +.allowableValues("true", "false") +.defaultValue("false") +.build(); + +public static final PropertyDescriptor PY_FILES = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-pyfiles") +.displayName("pyFiles") +.description("Python files to be used in this batch session that includes *.py, *.zip files") +.required(false) +.addValidator(StandardValidators.createURLorFileValidator()) +.expressionLanguageSupported(false) +.build(); + +public static final PropertyDescriptor JAR_FILES = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-jarfiles") +.displayName("jars") --- End diff -- Sure, will do that. > nifi-spark-bundle : Adding support for pyfiles, file, jars options > -- > > Key: NIFI-4946 > URL: https://issues.apache.org/jira/browse/NIFI-4946 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 > Environment: Ubuntu 16.04, IntelliJ >Reporter: Mageswaran >Priority: Major > Fix For: 1.6.0 > > Attachments: nifi-spark-options.png, nifi-spark.png > > > Adding support for submitting PySpark based Sparks jobs (which is normally > structured as modules) over Livy on existing "ExecuteSparkInteractive" > processor. > This is done by reading file paths for pyfiles and file and an option from > user whether the processor should trigger a batch job or not. > [https://livy.incubator.apache.org/docs/latest/rest-api.html] > *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 > )|https://github.com/apache/nifi/pull/2521]* > * Check whether the processor has to handle code or submit a Spark job > * Read incoming flow file > ** If batch == true > *** If flow file matches Livy `batches` JSON response through `wait` loop > Wait for Status Check Interval > Read the state > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > *** Else > Ignore the flow file > Trigger the Spark job over Livy `batches` endpoint > Read the state of the submitted job > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > ** Else: > *** Existing Logic to handle `Code` > > !nifi-spark-options.png! > !nifi-spark.png! > > Thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...
Github user Mageswaran1989 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173348751 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -83,6 +83,62 @@ .expressionLanguageSupported(true) .build(); +public static final PropertyDescriptor IS_BATCH_JOB = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-is_batch_job") +.displayName("Is Batch Job") +.description("If true, the `Code` part is ignored and the flow file from previous stage is considered " ++ "as a triggering event and not as code for Spark session. When `Wait` state is self routed" ++ "the livy json response flow file from previous Spark job is used to poll the job status" ++ "for sucess or failure") +.required(true) +.allowableValues("true", "false") +.defaultValue("false") +.build(); + +public static final PropertyDescriptor PY_FILES = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-pyfiles") +.displayName("pyFiles") +.description("Python files to be used in this batch session that includes *.py, *.zip files") +.required(false) +.addValidator(StandardValidators.createURLorFileValidator()) +.expressionLanguageSupported(false) +.build(); + +public static final PropertyDescriptor JAR_FILES = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-jarfiles") +.displayName("jars") --- End diff -- Sure, will do that. ---
[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...
Github user Mageswaran1989 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173348709 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); -String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); -if (StringUtils.isEmpty(code)) { -try (InputStream inputStream = session.read(flowFile)) { -// If no code was provided, assume it is in the content of the incoming flow file -code = IOUtils.toString(inputStream, charset); -} catch (IOException ioe) { -log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); -flowFile = session.penalize(flowFile); -session.transfer(flowFile, REL_FAILURE); -return; -} -} -code = StringEscapeUtils.escapeJavaScript(code); -String payload = "{\"code\":\"" + code + "\"}"; + try { -final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); -log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); -if (result == null) { -session.transfer(flowFile, REL_FAILURE); -} else { + +if (isBatchJob) { + +String jsonResponse = null; + +if (StringUtils.isEmpty(jsonResponse)) { +try (InputStream inputStream = session.read(flowFile)) { +// If no code was provided, assume it is in the content of the incoming flow file +jsonResponse = IOUtils.toString(inputStream, charset); +} catch (IOException ioe) { +log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); +flowFile = session.penalize(flowFile); +session.transfer(flowFile, REL_FAILURE); +return; +} +} + +log.debug(" > jsonResponse: " + jsonResponse); --- End diff -- Sure, this will be removed in the next commit. ---
[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...
Github user Mageswaran1989 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173348636 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -129,7 +185,13 @@ public void init(final ProcessorInitializationContext context) { List properties = new ArrayList<>(); properties.add(LIVY_CONTROLLER_SERVICE); +properties.add(IS_BATCH_JOB); +properties.add(PY_FILES); +//properties.add(JAR_FILES); +properties.add(MAIN_PY_FILE); +properties.add(NAME); properties.add(CODE); +//properties.add(ARGS); --- End diff -- Only `pyfiles` and `file` options are tested. Rest are yet to be tested. Plan was to go with implementing test modules and test other features, since the current manual testing takes a long routine of compile, copy and restart of the Nifi. ---
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392266#comment-16392266 ] ASF GitHub Bot commented on NIFI-4946: -- Github user Mageswaran1989 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173348709 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); -String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); -if (StringUtils.isEmpty(code)) { -try (InputStream inputStream = session.read(flowFile)) { -// If no code was provided, assume it is in the content of the incoming flow file -code = IOUtils.toString(inputStream, charset); -} catch (IOException ioe) { -log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); -flowFile = session.penalize(flowFile); -session.transfer(flowFile, REL_FAILURE); -return; -} -} -code = StringEscapeUtils.escapeJavaScript(code); -String payload = "{\"code\":\"" + code + "\"}"; + try { -final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); -log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); -if (result == null) { -session.transfer(flowFile, REL_FAILURE); -} else { + +if (isBatchJob) { + +String jsonResponse = null; + +if (StringUtils.isEmpty(jsonResponse)) { +try (InputStream inputStream = session.read(flowFile)) { +// If no code was provided, assume it is in the content of the incoming flow file +jsonResponse = IOUtils.toString(inputStream, charset); +} catch (IOException ioe) { +log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); +flowFile = session.penalize(flowFile); +session.transfer(flowFile, REL_FAILURE); +return; +} +} + +log.debug(" > jsonResponse: " + jsonResponse); --- End diff -- Sure, this will be removed in the next commit. > nifi-spark-bundle : Adding support for pyfiles, file, jars options > -- > > Key: NIFI-4946 > URL: https://issues.apache.org/jira/browse/NIFI-4946 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 > Environment: Ubuntu 16.04, IntelliJ >Reporter: Mageswaran >Priority: Major > Fix For: 1.6.0 > > Attachments: nifi-spark-options.png, nifi-spark.png > > > Adding support for submitting PySpark based Sparks jobs (which is normally > structured as modules) over Livy on existing "ExecuteSparkInteractive" > processor. > This is done by reading file paths for pyfiles and file and an option from > user whether the processor should trigger a batch job or not. > [https://livy.incubator.apache.org/docs/latest/rest-api.html] > *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 > )|https://github.com/apache/nifi/pull/2521]* > * Check whether the processor has to handle code or submit a Spark job > * Read incoming flow file > ** If batch == true > *** If flow file matches Livy `batches` JSON response through `wait` loop > Wait for Status Check Interval > Read the state > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > *** Else > Ignore the flow file > Trigger the Spark job over Livy `batches` endpoint > Read the state of the submitted job > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > ** Else: > *** Existing Logic to handle `Code` > > !nifi-spark-options.png! > !nifi-spark.png! > > Thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392264#comment-16392264 ] ASF GitHub Bot commented on NIFI-4946: -- Github user Mageswaran1989 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173348636 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -129,7 +185,13 @@ public void init(final ProcessorInitializationContext context) { List properties = new ArrayList<>(); properties.add(LIVY_CONTROLLER_SERVICE); +properties.add(IS_BATCH_JOB); +properties.add(PY_FILES); +//properties.add(JAR_FILES); +properties.add(MAIN_PY_FILE); +properties.add(NAME); properties.add(CODE); +//properties.add(ARGS); --- End diff -- Only `pyfiles` and `file` options are tested. Rest are yet to be tested. Plan was to go with implementing test modules and test other features, since the current manual testing takes a long routine of compile, copy and restart of the Nifi. > nifi-spark-bundle : Adding support for pyfiles, file, jars options > -- > > Key: NIFI-4946 > URL: https://issues.apache.org/jira/browse/NIFI-4946 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 > Environment: Ubuntu 16.04, IntelliJ >Reporter: Mageswaran >Priority: Major > Fix For: 1.6.0 > > Attachments: nifi-spark-options.png, nifi-spark.png > > > Adding support for submitting PySpark based Sparks jobs (which is normally > structured as modules) over Livy on existing "ExecuteSparkInteractive" > processor. > This is done by reading file paths for pyfiles and file and an option from > user whether the processor should trigger a batch job or not. > [https://livy.incubator.apache.org/docs/latest/rest-api.html] > *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 > )|https://github.com/apache/nifi/pull/2521]* > * Check whether the processor has to handle code or submit a Spark job > * Read incoming flow file > ** If batch == true > *** If flow file matches Livy `batches` JSON response through `wait` loop > Wait for Status Check Interval > Read the state > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > *** Else > Ignore the flow file > Trigger the Spark job over Livy `batches` endpoint > Read the state of the submitted job > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > ** Else: > *** Existing Logic to handle `Code` > > !nifi-spark-options.png! > !nifi-spark.png! > > Thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4516) Add FetchSolr processor
[ https://issues.apache.org/jira/browse/NIFI-4516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392245#comment-16392245 ] ASF GitHub Bot commented on NIFI-4516: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173345509 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/FetchSolr.java --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonWriter; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.response.FacetField; +import org.apache.solr.client.solrj.response.FieldStatsInfo; +import org.apache.solr.client.solrj.response.IntervalFacet; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.response.RangeFacet; +import org.apache.solr.client.solrj.response.RangeFacet.Count; +import org.apache.solr.common.params.FacetParams; +import org.apache.solr.common.params.MultiMapSolrParams; +import org.apache.solr.common.params.StatsParams; +import org.apache.solr.servlet.SolrRequestParsers; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static
[GitHub] nifi pull request #2517: NIFI-4516 FetchSolr Processor
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173346825 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestFetchSolr.java --- @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonReader; +import org.apache.nifi.json.JsonRecordSetWriter; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.common.SolrInputDocument; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.xmlunit.matchers.CompareMatcher; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.Locale; +import java.util.TimeZone; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; + +public class TestFetchSolr { +static final String DEFAULT_SOLR_CORE = "testCollection"; + +private static final SimpleDateFormat df = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.US); +static { +df.setTimeZone(TimeZone.getTimeZone("GMT")); +} + +private SolrClient solrClient; + +@Before +public void setup() { + +try { + +// create an EmbeddedSolrServer for the processor to use +String relPath = getClass().getProtectionDomain().getCodeSource() +.getLocation().getFile() + "../../target"; + +solrClient = EmbeddedSolrServerFactory.create(EmbeddedSolrServerFactory.DEFAULT_SOLR_HOME, +DEFAULT_SOLR_CORE, relPath); + +for (int i = 0; i < 10; i++) { +SolrInputDocument doc = new SolrInputDocument(); +doc.addField("id", "doc" + i); +Date date = new Date(); +doc.addField("created", df.format(date)); +doc.addField("string_single", "single" + i + ".1"); +doc.addField("string_multi", "multi" + i + ".1"); +doc.addField("string_multi", "multi" + i + ".2"); +doc.addField("integer_single", i); +doc.addField("integer_multi", 1); +doc.addField("integer_multi", 2); +doc.addField("integer_multi", 3); +doc.addField("double_single", 0.5 + i); + +solrClient.add(doc); +System.out.println(doc.getField("created").getValue()); + +} +solrClient.commit(); +} catch (Exception e) { +e.printStackTrace(); +Assert.fail(e.getMessage()); +} +} + +@After +public void teardown() { +try { +solrClient.close(); +} catch (Exception e) { +} +} + +@Test +public void testAllFacetCategories() throws IOException { +final TestableProcessor proc = new TestableProcessor(solrClient); + +TestRunner runner = TestRunners.newTestRunner(proc); +runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_CLOUD.getValue()); +runner.setProperty(SolrUtils.SOLR_LOCATION,
[jira] [Commented] (NIFI-4516) Add FetchSolr processor
[ https://issues.apache.org/jira/browse/NIFI-4516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392253#comment-16392253 ] ASF GitHub Bot commented on NIFI-4516: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173346578 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/FetchSolr.java --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonWriter; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.response.FacetField; +import org.apache.solr.client.solrj.response.FieldStatsInfo; +import org.apache.solr.client.solrj.response.IntervalFacet; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.response.RangeFacet; +import org.apache.solr.client.solrj.response.RangeFacet.Count; +import org.apache.solr.common.params.FacetParams; +import org.apache.solr.common.params.MultiMapSolrParams; +import org.apache.solr.common.params.StatsParams; +import org.apache.solr.servlet.SolrRequestParsers; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static
[GitHub] nifi pull request #2517: NIFI-4516 FetchSolr Processor
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173344378 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java --- @@ -66,6 +67,15 @@ public static final AllowableValue SOLR_TYPE_STANDARD = new AllowableValue( "Standard", "Standard", "A stand-alone Solr instance."); +public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor --- End diff -- I think this will work for now, but it would be great to have the Solr CRUD functions moved over to a controller service. ---
[GitHub] nifi pull request #2517: NIFI-4516 FetchSolr Processor
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173346578 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/FetchSolr.java --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonWriter; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.response.FacetField; +import org.apache.solr.client.solrj.response.FieldStatsInfo; +import org.apache.solr.client.solrj.response.IntervalFacet; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.response.RangeFacet; +import org.apache.solr.client.solrj.response.RangeFacet.Count; +import org.apache.solr.common.params.FacetParams; +import org.apache.solr.common.params.MultiMapSolrParams; +import org.apache.solr.common.params.StatsParams; +import org.apache.solr.servlet.SolrRequestParsers; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD; +import static org.apache.nifi.processors.solr.SolrUtils.RECORD_WRITER; + +@Tags({"Apache", "Solr", "Get", "Query", "Records"})
[GitHub] nifi pull request #2517: NIFI-4516 FetchSolr Processor
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173347073 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestFetchSolr.java --- @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonReader; +import org.apache.nifi.json.JsonRecordSetWriter; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.common.SolrInputDocument; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.xmlunit.matchers.CompareMatcher; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.Locale; +import java.util.TimeZone; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; + +public class TestFetchSolr { +static final String DEFAULT_SOLR_CORE = "testCollection"; + +private static final SimpleDateFormat df = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.US); +static { +df.setTimeZone(TimeZone.getTimeZone("GMT")); +} + +private SolrClient solrClient; + +@Before +public void setup() { + +try { + +// create an EmbeddedSolrServer for the processor to use +String relPath = getClass().getProtectionDomain().getCodeSource() +.getLocation().getFile() + "../../target"; + +solrClient = EmbeddedSolrServerFactory.create(EmbeddedSolrServerFactory.DEFAULT_SOLR_HOME, +DEFAULT_SOLR_CORE, relPath); + +for (int i = 0; i < 10; i++) { +SolrInputDocument doc = new SolrInputDocument(); +doc.addField("id", "doc" + i); +Date date = new Date(); +doc.addField("created", df.format(date)); +doc.addField("string_single", "single" + i + ".1"); +doc.addField("string_multi", "multi" + i + ".1"); +doc.addField("string_multi", "multi" + i + ".2"); +doc.addField("integer_single", i); +doc.addField("integer_multi", 1); +doc.addField("integer_multi", 2); +doc.addField("integer_multi", 3); +doc.addField("double_single", 0.5 + i); + +solrClient.add(doc); +System.out.println(doc.getField("created").getValue()); + +} +solrClient.commit(); +} catch (Exception e) { +e.printStackTrace(); +Assert.fail(e.getMessage()); +} +} + +@After +public void teardown() { +try { +solrClient.close(); +} catch (Exception e) { +} +} + +@Test +public void testAllFacetCategories() throws IOException { +final TestableProcessor proc = new TestableProcessor(solrClient); + +TestRunner runner = TestRunners.newTestRunner(proc); +runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_CLOUD.getValue()); +runner.setProperty(SolrUtils.SOLR_LOCATION,
[jira] [Commented] (NIFI-4516) Add FetchSolr processor
[ https://issues.apache.org/jira/browse/NIFI-4516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392250#comment-16392250 ] ASF GitHub Bot commented on NIFI-4516: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173345859 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/FetchSolr.java --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonWriter; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.response.FacetField; +import org.apache.solr.client.solrj.response.FieldStatsInfo; +import org.apache.solr.client.solrj.response.IntervalFacet; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.response.RangeFacet; +import org.apache.solr.client.solrj.response.RangeFacet.Count; +import org.apache.solr.common.params.FacetParams; +import org.apache.solr.common.params.MultiMapSolrParams; +import org.apache.solr.common.params.StatsParams; +import org.apache.solr.servlet.SolrRequestParsers; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static
[GitHub] nifi pull request #2517: NIFI-4516 FetchSolr Processor
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173345203 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/FetchSolr.java --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonWriter; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.response.FacetField; +import org.apache.solr.client.solrj.response.FieldStatsInfo; +import org.apache.solr.client.solrj.response.IntervalFacet; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.response.RangeFacet; +import org.apache.solr.client.solrj.response.RangeFacet.Count; +import org.apache.solr.common.params.FacetParams; +import org.apache.solr.common.params.MultiMapSolrParams; +import org.apache.solr.common.params.StatsParams; +import org.apache.solr.servlet.SolrRequestParsers; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD; +import static org.apache.nifi.processors.solr.SolrUtils.RECORD_WRITER; + +@Tags({"Apache", "Solr", "Get", "Query", "Records"})
[jira] [Commented] (NIFI-4516) Add FetchSolr processor
[ https://issues.apache.org/jira/browse/NIFI-4516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392244#comment-16392244 ] ASF GitHub Bot commented on NIFI-4516: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173344830 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/FetchSolr.java --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonWriter; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.response.FacetField; +import org.apache.solr.client.solrj.response.FieldStatsInfo; +import org.apache.solr.client.solrj.response.IntervalFacet; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.response.RangeFacet; +import org.apache.solr.client.solrj.response.RangeFacet.Count; +import org.apache.solr.common.params.FacetParams; +import org.apache.solr.common.params.MultiMapSolrParams; +import org.apache.solr.common.params.StatsParams; +import org.apache.solr.servlet.SolrRequestParsers; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static
[jira] [Commented] (NIFI-4516) Add FetchSolr processor
[ https://issues.apache.org/jira/browse/NIFI-4516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392246#comment-16392246 ] ASF GitHub Bot commented on NIFI-4516: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173345203 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/FetchSolr.java --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonWriter; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.response.FacetField; +import org.apache.solr.client.solrj.response.FieldStatsInfo; +import org.apache.solr.client.solrj.response.IntervalFacet; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.response.RangeFacet; +import org.apache.solr.client.solrj.response.RangeFacet.Count; +import org.apache.solr.common.params.FacetParams; +import org.apache.solr.common.params.MultiMapSolrParams; +import org.apache.solr.common.params.StatsParams; +import org.apache.solr.servlet.SolrRequestParsers; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static
[jira] [Commented] (NIFI-4516) Add FetchSolr processor
[ https://issues.apache.org/jira/browse/NIFI-4516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392247#comment-16392247 ] ASF GitHub Bot commented on NIFI-4516: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173344971 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/FetchSolr.java --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonWriter; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.response.FacetField; +import org.apache.solr.client.solrj.response.FieldStatsInfo; +import org.apache.solr.client.solrj.response.IntervalFacet; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.response.RangeFacet; +import org.apache.solr.client.solrj.response.RangeFacet.Count; +import org.apache.solr.common.params.FacetParams; +import org.apache.solr.common.params.MultiMapSolrParams; +import org.apache.solr.common.params.StatsParams; +import org.apache.solr.servlet.SolrRequestParsers; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static
[jira] [Commented] (NIFI-4516) Add FetchSolr processor
[ https://issues.apache.org/jira/browse/NIFI-4516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392255#comment-16392255 ] ASF GitHub Bot commented on NIFI-4516: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173345878 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/FetchSolr.java --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonWriter; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.response.FacetField; +import org.apache.solr.client.solrj.response.FieldStatsInfo; +import org.apache.solr.client.solrj.response.IntervalFacet; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.response.RangeFacet; +import org.apache.solr.client.solrj.response.RangeFacet.Count; +import org.apache.solr.common.params.FacetParams; +import org.apache.solr.common.params.MultiMapSolrParams; +import org.apache.solr.common.params.StatsParams; +import org.apache.solr.servlet.SolrRequestParsers; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static
[GitHub] nifi pull request #2517: NIFI-4516 FetchSolr Processor
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173347053 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestFetchSolr.java --- @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonReader; +import org.apache.nifi.json.JsonRecordSetWriter; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.common.SolrInputDocument; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.xmlunit.matchers.CompareMatcher; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.Locale; +import java.util.TimeZone; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; + +public class TestFetchSolr { +static final String DEFAULT_SOLR_CORE = "testCollection"; + +private static final SimpleDateFormat df = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.US); +static { +df.setTimeZone(TimeZone.getTimeZone("GMT")); +} + +private SolrClient solrClient; + +@Before +public void setup() { + +try { + +// create an EmbeddedSolrServer for the processor to use +String relPath = getClass().getProtectionDomain().getCodeSource() +.getLocation().getFile() + "../../target"; + +solrClient = EmbeddedSolrServerFactory.create(EmbeddedSolrServerFactory.DEFAULT_SOLR_HOME, +DEFAULT_SOLR_CORE, relPath); + +for (int i = 0; i < 10; i++) { +SolrInputDocument doc = new SolrInputDocument(); +doc.addField("id", "doc" + i); +Date date = new Date(); +doc.addField("created", df.format(date)); +doc.addField("string_single", "single" + i + ".1"); +doc.addField("string_multi", "multi" + i + ".1"); +doc.addField("string_multi", "multi" + i + ".2"); +doc.addField("integer_single", i); +doc.addField("integer_multi", 1); +doc.addField("integer_multi", 2); +doc.addField("integer_multi", 3); +doc.addField("double_single", 0.5 + i); + +solrClient.add(doc); +System.out.println(doc.getField("created").getValue()); + +} +solrClient.commit(); +} catch (Exception e) { +e.printStackTrace(); +Assert.fail(e.getMessage()); +} +} + +@After +public void teardown() { +try { +solrClient.close(); +} catch (Exception e) { +} +} + +@Test +public void testAllFacetCategories() throws IOException { +final TestableProcessor proc = new TestableProcessor(solrClient); + +TestRunner runner = TestRunners.newTestRunner(proc); +runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_CLOUD.getValue()); +runner.setProperty(SolrUtils.SOLR_LOCATION,
[GitHub] nifi pull request #2517: NIFI-4516 FetchSolr Processor
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173344971 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/FetchSolr.java --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonWriter; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.response.FacetField; +import org.apache.solr.client.solrj.response.FieldStatsInfo; +import org.apache.solr.client.solrj.response.IntervalFacet; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.response.RangeFacet; +import org.apache.solr.client.solrj.response.RangeFacet.Count; +import org.apache.solr.common.params.FacetParams; +import org.apache.solr.common.params.MultiMapSolrParams; +import org.apache.solr.common.params.StatsParams; +import org.apache.solr.servlet.SolrRequestParsers; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD; +import static org.apache.nifi.processors.solr.SolrUtils.RECORD_WRITER; + +@Tags({"Apache", "Solr", "Get", "Query", "Records"})
[GitHub] nifi pull request #2517: NIFI-4516 FetchSolr Processor
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173345878 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/FetchSolr.java --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonWriter; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.response.FacetField; +import org.apache.solr.client.solrj.response.FieldStatsInfo; +import org.apache.solr.client.solrj.response.IntervalFacet; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.response.RangeFacet; +import org.apache.solr.client.solrj.response.RangeFacet.Count; +import org.apache.solr.common.params.FacetParams; +import org.apache.solr.common.params.MultiMapSolrParams; +import org.apache.solr.common.params.StatsParams; +import org.apache.solr.servlet.SolrRequestParsers; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD; +import static org.apache.nifi.processors.solr.SolrUtils.RECORD_WRITER; + +@Tags({"Apache", "Solr", "Get", "Query", "Records"})
[jira] [Commented] (NIFI-4516) Add FetchSolr processor
[ https://issues.apache.org/jira/browse/NIFI-4516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392248#comment-16392248 ] ASF GitHub Bot commented on NIFI-4516: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173345772 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/FetchSolr.java --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonWriter; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.response.FacetField; +import org.apache.solr.client.solrj.response.FieldStatsInfo; +import org.apache.solr.client.solrj.response.IntervalFacet; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.response.RangeFacet; +import org.apache.solr.client.solrj.response.RangeFacet.Count; +import org.apache.solr.common.params.FacetParams; +import org.apache.solr.common.params.MultiMapSolrParams; +import org.apache.solr.common.params.StatsParams; +import org.apache.solr.servlet.SolrRequestParsers; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static
[jira] [Commented] (NIFI-4516) Add FetchSolr processor
[ https://issues.apache.org/jira/browse/NIFI-4516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392249#comment-16392249 ] ASF GitHub Bot commented on NIFI-4516: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173347024 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestFetchSolr.java --- @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonReader; +import org.apache.nifi.json.JsonRecordSetWriter; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.common.SolrInputDocument; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.xmlunit.matchers.CompareMatcher; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.Locale; +import java.util.TimeZone; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; + +public class TestFetchSolr { +static final String DEFAULT_SOLR_CORE = "testCollection"; + +private static final SimpleDateFormat df = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.US); +static { +df.setTimeZone(TimeZone.getTimeZone("GMT")); +} + +private SolrClient solrClient; + +@Before +public void setup() { + +try { + +// create an EmbeddedSolrServer for the processor to use +String relPath = getClass().getProtectionDomain().getCodeSource() +.getLocation().getFile() + "../../target"; + +solrClient = EmbeddedSolrServerFactory.create(EmbeddedSolrServerFactory.DEFAULT_SOLR_HOME, +DEFAULT_SOLR_CORE, relPath); + +for (int i = 0; i < 10; i++) { +SolrInputDocument doc = new SolrInputDocument(); +doc.addField("id", "doc" + i); +Date date = new Date(); +doc.addField("created", df.format(date)); +doc.addField("string_single", "single" + i + ".1"); +doc.addField("string_multi", "multi" + i + ".1"); +doc.addField("string_multi", "multi" + i + ".2"); +doc.addField("integer_single", i); +doc.addField("integer_multi", 1); +doc.addField("integer_multi", 2); +doc.addField("integer_multi", 3); +doc.addField("double_single", 0.5 + i); + +solrClient.add(doc); +System.out.println(doc.getField("created").getValue()); + +} +solrClient.commit(); +} catch (Exception e) { +e.printStackTrace(); +Assert.fail(e.getMessage()); +} +} + +@After +public void teardown() { +try { +solrClient.close(); +} catch (Exception e) { +} +} + +@Test +public void testAllFacetCategories() throws IOException { +final TestableProcessor proc = new TestableProcessor(solrClient); + +
[jira] [Commented] (NIFI-4516) Add FetchSolr processor
[ https://issues.apache.org/jira/browse/NIFI-4516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392251#comment-16392251 ] ASF GitHub Bot commented on NIFI-4516: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173346459 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/FetchSolr.java --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonWriter; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.response.FacetField; +import org.apache.solr.client.solrj.response.FieldStatsInfo; +import org.apache.solr.client.solrj.response.IntervalFacet; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.response.RangeFacet; +import org.apache.solr.client.solrj.response.RangeFacet.Count; +import org.apache.solr.common.params.FacetParams; +import org.apache.solr.common.params.MultiMapSolrParams; +import org.apache.solr.common.params.StatsParams; +import org.apache.solr.servlet.SolrRequestParsers; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static
[jira] [Commented] (NIFI-4516) Add FetchSolr processor
[ https://issues.apache.org/jira/browse/NIFI-4516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392256#comment-16392256 ] ASF GitHub Bot commented on NIFI-4516: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173347053 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestFetchSolr.java --- @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonReader; +import org.apache.nifi.json.JsonRecordSetWriter; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.common.SolrInputDocument; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.xmlunit.matchers.CompareMatcher; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.Locale; +import java.util.TimeZone; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; + +public class TestFetchSolr { +static final String DEFAULT_SOLR_CORE = "testCollection"; + +private static final SimpleDateFormat df = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.US); +static { +df.setTimeZone(TimeZone.getTimeZone("GMT")); +} + +private SolrClient solrClient; + +@Before +public void setup() { + +try { + +// create an EmbeddedSolrServer for the processor to use +String relPath = getClass().getProtectionDomain().getCodeSource() +.getLocation().getFile() + "../../target"; + +solrClient = EmbeddedSolrServerFactory.create(EmbeddedSolrServerFactory.DEFAULT_SOLR_HOME, +DEFAULT_SOLR_CORE, relPath); + +for (int i = 0; i < 10; i++) { +SolrInputDocument doc = new SolrInputDocument(); +doc.addField("id", "doc" + i); +Date date = new Date(); +doc.addField("created", df.format(date)); +doc.addField("string_single", "single" + i + ".1"); +doc.addField("string_multi", "multi" + i + ".1"); +doc.addField("string_multi", "multi" + i + ".2"); +doc.addField("integer_single", i); +doc.addField("integer_multi", 1); +doc.addField("integer_multi", 2); +doc.addField("integer_multi", 3); +doc.addField("double_single", 0.5 + i); + +solrClient.add(doc); +System.out.println(doc.getField("created").getValue()); + +} +solrClient.commit(); +} catch (Exception e) { +e.printStackTrace(); +Assert.fail(e.getMessage()); +} +} + +@After +public void teardown() { +try { +solrClient.close(); +} catch (Exception e) { +} +} + +@Test +public void testAllFacetCategories() throws IOException { +final TestableProcessor proc = new TestableProcessor(solrClient); + +
[jira] [Commented] (NIFI-4516) Add FetchSolr processor
[ https://issues.apache.org/jira/browse/NIFI-4516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392252#comment-16392252 ] ASF GitHub Bot commented on NIFI-4516: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173347073 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestFetchSolr.java --- @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonReader; +import org.apache.nifi.json.JsonRecordSetWriter; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.common.SolrInputDocument; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.xmlunit.matchers.CompareMatcher; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.Locale; +import java.util.TimeZone; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; + +public class TestFetchSolr { +static final String DEFAULT_SOLR_CORE = "testCollection"; + +private static final SimpleDateFormat df = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.US); +static { +df.setTimeZone(TimeZone.getTimeZone("GMT")); +} + +private SolrClient solrClient; + +@Before +public void setup() { + +try { + +// create an EmbeddedSolrServer for the processor to use +String relPath = getClass().getProtectionDomain().getCodeSource() +.getLocation().getFile() + "../../target"; + +solrClient = EmbeddedSolrServerFactory.create(EmbeddedSolrServerFactory.DEFAULT_SOLR_HOME, +DEFAULT_SOLR_CORE, relPath); + +for (int i = 0; i < 10; i++) { +SolrInputDocument doc = new SolrInputDocument(); +doc.addField("id", "doc" + i); +Date date = new Date(); +doc.addField("created", df.format(date)); +doc.addField("string_single", "single" + i + ".1"); +doc.addField("string_multi", "multi" + i + ".1"); +doc.addField("string_multi", "multi" + i + ".2"); +doc.addField("integer_single", i); +doc.addField("integer_multi", 1); +doc.addField("integer_multi", 2); +doc.addField("integer_multi", 3); +doc.addField("double_single", 0.5 + i); + +solrClient.add(doc); +System.out.println(doc.getField("created").getValue()); + +} +solrClient.commit(); +} catch (Exception e) { +e.printStackTrace(); +Assert.fail(e.getMessage()); +} +} + +@After +public void teardown() { +try { +solrClient.close(); +} catch (Exception e) { +} +} + +@Test +public void testAllFacetCategories() throws IOException { +final TestableProcessor proc = new TestableProcessor(solrClient); + +
[GitHub] nifi pull request #2517: NIFI-4516 FetchSolr Processor
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173345509 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/FetchSolr.java --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonWriter; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.response.FacetField; +import org.apache.solr.client.solrj.response.FieldStatsInfo; +import org.apache.solr.client.solrj.response.IntervalFacet; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.response.RangeFacet; +import org.apache.solr.client.solrj.response.RangeFacet.Count; +import org.apache.solr.common.params.FacetParams; +import org.apache.solr.common.params.MultiMapSolrParams; +import org.apache.solr.common.params.StatsParams; +import org.apache.solr.servlet.SolrRequestParsers; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD; +import static org.apache.nifi.processors.solr.SolrUtils.RECORD_WRITER; + +@Tags({"Apache", "Solr", "Get", "Query", "Records"})
[GitHub] nifi pull request #2517: NIFI-4516 FetchSolr Processor
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173345859 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/FetchSolr.java --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonWriter; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.response.FacetField; +import org.apache.solr.client.solrj.response.FieldStatsInfo; +import org.apache.solr.client.solrj.response.IntervalFacet; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.response.RangeFacet; +import org.apache.solr.client.solrj.response.RangeFacet.Count; +import org.apache.solr.common.params.FacetParams; +import org.apache.solr.common.params.MultiMapSolrParams; +import org.apache.solr.common.params.StatsParams; +import org.apache.solr.servlet.SolrRequestParsers; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD; +import static org.apache.nifi.processors.solr.SolrUtils.RECORD_WRITER; + +@Tags({"Apache", "Solr", "Get", "Query", "Records"})
[jira] [Commented] (NIFI-4516) Add FetchSolr processor
[ https://issues.apache.org/jira/browse/NIFI-4516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392254#comment-16392254 ] ASF GitHub Bot commented on NIFI-4516: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173346825 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestFetchSolr.java --- @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonReader; +import org.apache.nifi.json.JsonRecordSetWriter; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.common.SolrInputDocument; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.xmlunit.matchers.CompareMatcher; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.Locale; +import java.util.TimeZone; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; + +public class TestFetchSolr { +static final String DEFAULT_SOLR_CORE = "testCollection"; + +private static final SimpleDateFormat df = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.US); +static { +df.setTimeZone(TimeZone.getTimeZone("GMT")); +} + +private SolrClient solrClient; + +@Before +public void setup() { + +try { + +// create an EmbeddedSolrServer for the processor to use +String relPath = getClass().getProtectionDomain().getCodeSource() +.getLocation().getFile() + "../../target"; + +solrClient = EmbeddedSolrServerFactory.create(EmbeddedSolrServerFactory.DEFAULT_SOLR_HOME, +DEFAULT_SOLR_CORE, relPath); + +for (int i = 0; i < 10; i++) { +SolrInputDocument doc = new SolrInputDocument(); +doc.addField("id", "doc" + i); +Date date = new Date(); +doc.addField("created", df.format(date)); +doc.addField("string_single", "single" + i + ".1"); +doc.addField("string_multi", "multi" + i + ".1"); +doc.addField("string_multi", "multi" + i + ".2"); +doc.addField("integer_single", i); +doc.addField("integer_multi", 1); +doc.addField("integer_multi", 2); +doc.addField("integer_multi", 3); +doc.addField("double_single", 0.5 + i); + +solrClient.add(doc); +System.out.println(doc.getField("created").getValue()); + +} +solrClient.commit(); +} catch (Exception e) { +e.printStackTrace(); +Assert.fail(e.getMessage()); +} +} + +@After +public void teardown() { +try { +solrClient.close(); +} catch (Exception e) { +} +} + +@Test +public void testAllFacetCategories() throws IOException { +final TestableProcessor proc = new TestableProcessor(solrClient); + +
[GitHub] nifi pull request #2517: NIFI-4516 FetchSolr Processor
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173346459 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/FetchSolr.java --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonWriter; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.response.FacetField; +import org.apache.solr.client.solrj.response.FieldStatsInfo; +import org.apache.solr.client.solrj.response.IntervalFacet; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.response.RangeFacet; +import org.apache.solr.client.solrj.response.RangeFacet.Count; +import org.apache.solr.common.params.FacetParams; +import org.apache.solr.common.params.MultiMapSolrParams; +import org.apache.solr.common.params.StatsParams; +import org.apache.solr.servlet.SolrRequestParsers; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD; +import static org.apache.nifi.processors.solr.SolrUtils.RECORD_WRITER; + +@Tags({"Apache", "Solr", "Get", "Query", "Records"})
[jira] [Commented] (NIFI-4516) Add FetchSolr processor
[ https://issues.apache.org/jira/browse/NIFI-4516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392243#comment-16392243 ] ASF GitHub Bot commented on NIFI-4516: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173344378 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java --- @@ -66,6 +67,15 @@ public static final AllowableValue SOLR_TYPE_STANDARD = new AllowableValue( "Standard", "Standard", "A stand-alone Solr instance."); +public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor --- End diff -- I think this will work for now, but it would be great to have the Solr CRUD functions moved over to a controller service. > Add FetchSolr processor > --- > > Key: NIFI-4516 > URL: https://issues.apache.org/jira/browse/NIFI-4516 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Johannes Peter >Assignee: Johannes Peter >Priority: Major > Labels: features > > The processor shall be capable > * to query Solr within a workflow, > * to make use of standard functionalities of Solr such as faceting, > highlighting, result grouping, etc., > * to make use of NiFis expression language to build Solr queries, > * to handle results as records. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi pull request #2517: NIFI-4516 FetchSolr Processor
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173344830 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/FetchSolr.java --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonWriter; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.response.FacetField; +import org.apache.solr.client.solrj.response.FieldStatsInfo; +import org.apache.solr.client.solrj.response.IntervalFacet; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.response.RangeFacet; +import org.apache.solr.client.solrj.response.RangeFacet.Count; +import org.apache.solr.common.params.FacetParams; +import org.apache.solr.common.params.MultiMapSolrParams; +import org.apache.solr.common.params.StatsParams; +import org.apache.solr.servlet.SolrRequestParsers; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD; +import static org.apache.nifi.processors.solr.SolrUtils.RECORD_WRITER; + +@Tags({"Apache", "Solr", "Get", "Query", "Records"})
[GitHub] nifi pull request #2517: NIFI-4516 FetchSolr Processor
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173347024 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestFetchSolr.java --- @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonReader; +import org.apache.nifi.json.JsonRecordSetWriter; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.common.SolrInputDocument; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.xmlunit.matchers.CompareMatcher; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.Locale; +import java.util.TimeZone; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; + +public class TestFetchSolr { +static final String DEFAULT_SOLR_CORE = "testCollection"; + +private static final SimpleDateFormat df = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.US); +static { +df.setTimeZone(TimeZone.getTimeZone("GMT")); +} + +private SolrClient solrClient; + +@Before +public void setup() { + +try { + +// create an EmbeddedSolrServer for the processor to use +String relPath = getClass().getProtectionDomain().getCodeSource() +.getLocation().getFile() + "../../target"; + +solrClient = EmbeddedSolrServerFactory.create(EmbeddedSolrServerFactory.DEFAULT_SOLR_HOME, +DEFAULT_SOLR_CORE, relPath); + +for (int i = 0; i < 10; i++) { +SolrInputDocument doc = new SolrInputDocument(); +doc.addField("id", "doc" + i); +Date date = new Date(); +doc.addField("created", df.format(date)); +doc.addField("string_single", "single" + i + ".1"); +doc.addField("string_multi", "multi" + i + ".1"); +doc.addField("string_multi", "multi" + i + ".2"); +doc.addField("integer_single", i); +doc.addField("integer_multi", 1); +doc.addField("integer_multi", 2); +doc.addField("integer_multi", 3); +doc.addField("double_single", 0.5 + i); + +solrClient.add(doc); +System.out.println(doc.getField("created").getValue()); + +} +solrClient.commit(); +} catch (Exception e) { +e.printStackTrace(); +Assert.fail(e.getMessage()); +} +} + +@After +public void teardown() { +try { +solrClient.close(); +} catch (Exception e) { +} +} + +@Test +public void testAllFacetCategories() throws IOException { +final TestableProcessor proc = new TestableProcessor(solrClient); + +TestRunner runner = TestRunners.newTestRunner(proc); +runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_CLOUD.getValue()); +runner.setProperty(SolrUtils.SOLR_LOCATION,
[GitHub] nifi pull request #2517: NIFI-4516 FetchSolr Processor
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2517#discussion_r173345772 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/FetchSolr.java --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.solr; + +import com.google.gson.stream.JsonWriter; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.response.FacetField; +import org.apache.solr.client.solrj.response.FieldStatsInfo; +import org.apache.solr.client.solrj.response.IntervalFacet; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.response.RangeFacet; +import org.apache.solr.client.solrj.response.RangeFacet.Count; +import org.apache.solr.common.params.FacetParams; +import org.apache.solr.common.params.MultiMapSolrParams; +import org.apache.solr.common.params.StatsParams; +import org.apache.solr.servlet.SolrRequestParsers; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD; +import static org.apache.nifi.processors.solr.SolrUtils.RECORD_WRITER; + +@Tags({"Apache", "Solr", "Get", "Query", "Records"})
[jira] [Commented] (NIFI-4516) Add FetchSolr processor
[ https://issues.apache.org/jira/browse/NIFI-4516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392225#comment-16392225 ] ASF GitHub Bot commented on NIFI-4516: -- Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2517 You should consider making the output format configurable. The Solr projects I've worked on in the past have used JSON instead of XML. > Add FetchSolr processor > --- > > Key: NIFI-4516 > URL: https://issues.apache.org/jira/browse/NIFI-4516 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Johannes Peter >Assignee: Johannes Peter >Priority: Major > Labels: features > > The processor shall be capable > * to query Solr within a workflow, > * to make use of standard functionalities of Solr such as faceting, > highlighting, result grouping, etc., > * to make use of NiFis expression language to build Solr queries, > * to handle results as records. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi issue #2517: NIFI-4516 FetchSolr Processor
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2517 You should consider making the output format configurable. The Solr projects I've worked on in the past have used JSON instead of XML. ---
[GitHub] nifi issue #2519: NIFI-4944: Guard against race condition in Snappy for PutH...
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2519 +1 LGTM. Unit tests ran and the solution looks like a sensible and effective one. ---
[jira] [Commented] (NIFI-4949) Convert MongoDB lookup service unit tests to integration tests (where appropriate)
[ https://issues.apache.org/jira/browse/NIFI-4949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391928#comment-16391928 ] ASF GitHub Bot commented on NIFI-4949: -- Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2525 @pvillard31 Did the same thing here that I did with the MongoDB integration test ticket you closed today. > Convert MongoDB lookup service unit tests to integration tests (where > appropriate) > -- > > Key: NIFI-4949 > URL: https://issues.apache.org/jira/browse/NIFI-4949 > Project: Apache NiFi > Issue Type: Improvement >Reporter: Mike Thomsen >Assignee: Mike Thomsen >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi issue #2525: NIFI-4949 Converted nifi-mongodb-services' unit tests into...
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2525 @pvillard31 Did the same thing here that I did with the MongoDB integration test ticket you closed today. ---
[GitHub] nifi pull request #2526: NIFI-4951: Update convertToAvroObject to use the Da...
GitHub user derekstraka opened a pull request: https://github.com/apache/nifi/pull/2526 NIFI-4951: Update convertToAvroObject to use the DataTypeUtils conversion function The feature allows users to convert from non-integral types to the correct underlying type. The original behavior is maintained; however, now simple conversions take place automatically for some logical types (date, time, and timestamp). Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [X] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [X] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [X] Has your PR been rebased against the latest commit within the target branch (typically master)? - [X] Is your initial contribution a single, squashed commit? ### For code changes: - [X] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [X] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/derekstraka/nifi fix-misc-avro-conversions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2526.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2526 commit 28b50ff985d9db9a6ef0626a182e3fc0ab5e4bfd Author: Derek StrakaDate: 2018-03-08T21:10:02Z NIFI-4951: Update convertToAvroObject to use the DataTypeUtils conversion function The feature allows users to convert from non-integral types to the correct underlying type. The original behavior is maintained; however, now simple conversions take place automatically for some logical types (date, time, and timestamp). Signed-off-by: Derek Straka ---
[jira] [Created] (NIFI-4951) convertToAvroObject does not utilize AvroTypeUtil when trying to convert Date, Time, or Timestamps
Derek Straka created NIFI-4951: -- Summary: convertToAvroObject does not utilize AvroTypeUtil when trying to convert Date, Time, or Timestamps Key: NIFI-4951 URL: https://issues.apache.org/jira/browse/NIFI-4951 Project: Apache NiFi Issue Type: Improvement Reporter: Derek Straka Assignee: Derek Straka Currently, AvroTypeUtil::convertToAvroObject assumes that the logical types have already been converted into their internal representation. While this may sometimes be the case, it would be useful to allow String->Logical type conversion. The feature can be added by using the appropriate DataTypeUtils::to method rather than blindly attempting to use the underlying type. The DataTypeUtils already performs the integral type parsing, so the existing functionality remains intact. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (NIFI-4950) MergeContent: Defragment can improperly reassemble
Brandon DeVries created NIFI-4950: - Summary: MergeContent: Defragment can improperly reassemble Key: NIFI-4950 URL: https://issues.apache.org/jira/browse/NIFI-4950 Project: Apache NiFi Issue Type: Bug Components: Extensions Affects Versions: 1.5.0 Reporter: Brandon DeVries In Defragment mode, MergeContent can improperly reassemble the pieces of a split file. I understand this was previously discussed in NIFI-378, and the outcome was to update the documentation for fragment.index [1]: {quote} Applicable only if the property is set to Defragment. This attribute indicates the order in which the fragments should be assembled. This attribute must be present on all FlowFiles when using the Defragment Merge Strategy and must be a unique (i.e., unique across all FlowFiles that have the same value for the "fragment.identifier" attribute) integer between 0 and the value of the fragment.count attribute. If two or more FlowFiles have the same value for the "fragment.identifier" attribute and the same value for the "fragment.index" attribute, the behavior of this Processor is undefined. {quote} I believe this could (and probably should) be improved upon. Specifically, the discussion around NIFI-378 focused on the "improper" use of MergeContent, in using the same fragment.identifier to "pair up" files. The situation I've encountered isn't really unusual in any way... I have a file, being split and sent via PostHTTP to another nifi instance. If something "goes wrong", the sending NiFi may not get an acknowledgement of success even if the file made it to the receiving NiFi. It then sends the segment again. NiFi favors duplication over loss, so this is not unexpected. However, I now have a file broken into X fragments arriving on the other side as X+1 (or more). The reassembly may work... or both duplicates may be chosen, and result in an incorrectly recreated file. To satisfy the contract as it exists, you would need to use a DetectDuplicate before the MergeContent to filter these out. However, that could potentially incur a great of overhead. In contrast, simply checking that there are no duplicate fragment id's in a bin should be relatively straightforward. How to handle duplicates is a legitimate question... are they ignored, or are they discard (if they're actually the same)? If the duplicate id's aren't identical, what is the behavior? Personally, I would say if you have actual duplicates, drop one and continue with the merge... if you have unequal "duplicates", fail the bin. But there's room for discussion there. The point is, in this circumstance it is very easy for a user to do a very reasonable thing and end up with a corrupt file for reasons that are somewhat esoteric. Then, we would need to explain to them why "defragment" doesn't actually defragment, but just kind of sorts a bin of matching things. I think we can do better than that. [1] [http://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.5.0/org.apache.nifi.processors.standard.MergeContent/index.html] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4885) More granular restricted component categories
[ https://issues.apache.org/jira/browse/NIFI-4885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391892#comment-16391892 ] ASF GitHub Bot commented on NIFI-4885: -- Github user andrewmlim commented on the issue: https://github.com/apache/nifi/pull/2515 @mcgilman Looks good! Thanks for including a fix for MoveHDFS. > More granular restricted component categories > - > > Key: NIFI-4885 > URL: https://issues.apache.org/jira/browse/NIFI-4885 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework, Core UI >Reporter: Matt Gilman >Assignee: Matt Gilman >Priority: Major > > Update the Restricted annotation to support more granular categories. > Available categories will map to new access policies. Example categories and > their corresponding access policies may be > * read-filesystem (/restricted-components/read-filesystem) > * write-filesystem (/restricted-components/write-filesystem) > * code-execution (/restricted-components/code-execution) > * keytab-access (/restricted-components/keytab-access) > The hierarchical nature of the access policies will support backward > compatibility with existing installations where the policy of > /restricted-components was used to enforce all subcategories. Any users with > /restricted-components permissions will be granted access to all > subcategories. In order to leverage the new granular categories, an > administrator will need to use NiFi to update their access policies (remove a > user from /restricted-components and place them into the desired subcategory) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi issue #2515: NIFI-4885: Granular component restrictions
Github user andrewmlim commented on the issue: https://github.com/apache/nifi/pull/2515 @mcgilman Looks good! Thanks for including a fix for MoveHDFS. ---
[jira] [Closed] (NIFIREG-124) As a user I want the sidenav table sorting to persist when I open dialogs.
[ https://issues.apache.org/jira/browse/NIFIREG-124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Scott Aslan closed NIFIREG-124. --- Resolution: Fixed > As a user I want the sidenav table sorting to persist when I open dialogs. > -- > > Key: NIFIREG-124 > URL: https://issues.apache.org/jira/browse/NIFIREG-124 > Project: NiFi Registry > Issue Type: Bug >Reporter: Scott Aslan >Assignee: Scott Aslan >Priority: Major > > When editing a group, if the user sorts the listed users in the group table > to be descending, but then click Add User button, the sort order in the users > in group table switches back to ascending, even if I make no change in the > Add User dialog. This bug also exists in when editing a user as well as when > editing a bucket. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (NIFIREG-100) FDS theme SASS mixin
[ https://issues.apache.org/jira/browse/NIFIREG-100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Scott Aslan closed NIFIREG-100. --- Resolution: Fixed > FDS theme SASS mixin > > > Key: NIFIREG-100 > URL: https://issues.apache.org/jira/browse/NIFIREG-100 > Project: NiFi Registry > Issue Type: New Feature >Reporter: Scott Aslan >Assignee: Scott Aslan >Priority: Major > > As a developer I want the ability to programmatically change the theme of the > FDS NgModule. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4870) Upgrade ActiveMQ dependencies
[ https://issues.apache.org/jira/browse/NIFI-4870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391852#comment-16391852 ] ASF GitHub Bot commented on NIFI-4870: -- Github user mcgilman commented on the issue: https://github.com/apache/nifi/pull/2469 Will review... > Upgrade ActiveMQ dependencies > - > > Key: NIFI-4870 > URL: https://issues.apache.org/jira/browse/NIFI-4870 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Affects Versions: 1.5.0 >Reporter: Andy LoPresto >Assignee: Andy LoPresto >Priority: Major > Labels: activemq, dependencies, jms > > Upgrade the version of {{activemq-client}} to 5.14.x. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi issue #2469: NIFI-4870 Upgraded activemq-client and activemq-broker ver...
Github user mcgilman commented on the issue: https://github.com/apache/nifi/pull/2469 Will review... ---
[jira] [Updated] (NIFI-4835) Incorrect return type specified for registries/{registry-id}/buckets/{bucket-id}/flows
[ https://issues.apache.org/jira/browse/NIFI-4835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kevin Doran updated NIFI-4835: -- Resolution: Fixed Status: Resolved (was: Patch Available) > Incorrect return type specified for > registries/{registry-id}/buckets/{bucket-id}/flows > -- > > Key: NIFI-4835 > URL: https://issues.apache.org/jira/browse/NIFI-4835 > Project: Apache NiFi > Issue Type: Bug > Components: Flow Versioning >Affects Versions: 1.5.0 >Reporter: Charlie Meyer >Assignee: Kevin Doran >Priority: Major > Labels: swagger > Fix For: 1.6.0 > > > On > [https://github.com/apache/nifi/blob/b6117743d4c1c1a37a16ba746b9edbbdd276d69f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java#L1376] > {{response = BucketsEntity.class}} > should likely be > {{response = VersionedFlowsEntity.class}} > > same copy/paste error on line 1412 also for versions, although that should be > {{VersionedFlowSnapshotMetadataSetEntity.class}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (NIFI-4859) Swagger Spec VersionControlInformationDTO missing SYNC_FAILURE state
[ https://issues.apache.org/jira/browse/NIFI-4859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kevin Doran updated NIFI-4859: -- Resolution: Fixed Status: Resolved (was: Patch Available) > Swagger Spec VersionControlInformationDTO missing SYNC_FAILURE state > > > Key: NIFI-4859 > URL: https://issues.apache.org/jira/browse/NIFI-4859 > Project: Apache NiFi > Issue Type: Bug > Components: Flow Versioning >Affects Versions: 1.5.0 >Reporter: Daniel Chaffelson >Assignee: Kevin Doran >Priority: Minor > Labels: swagger > Fix For: 1.6.0 > > > It is possible to get a Versioned Process Group into a SYNC_FAILURE state, > but this is not an allowable state in the code generated from the swagger.json > This prevents versioned objects from being manipulated via the API in some > use cases. > {noformat} > @state.setter > def state(self, state): > """ > Sets the state of this VersionControlInformationDTO. > The current state of the Process Group, as it relates to the Versioned Flow > :param state: The state of this VersionControlInformationDTO. > :type: str > """ > allowed_values = ["LOCALLY_MODIFIED_DESCENDANT", "LOCALLY_MODIFIED", "STALE", > "LOCALLY_MODIFIED_AND_STALE", "UP_TO_DATE"] > if state not in allowed_values: > raise ValueError( > "Invalid value for `state` ({0}), must be one of {1}" > > .format(state, allowed_values) > ) > E ValueError: Invalid value for `state` (SYNC_FAILURE), must be one of > ['LOCALLY_MODIFIED_DESCENDANT', 'LOCALLY_MODIFIED', 'STALE', > 'LOCALLY_MODIFIED_AND_STALE', 'UP_TO_DATE'] > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4859) Swagger Spec VersionControlInformationDTO missing SYNC_FAILURE state
[ https://issues.apache.org/jira/browse/NIFI-4859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391831#comment-16391831 ] ASF GitHub Bot commented on NIFI-4859: -- Github user kevdoran closed the pull request at: https://github.com/apache/nifi/pull/2479 > Swagger Spec VersionControlInformationDTO missing SYNC_FAILURE state > > > Key: NIFI-4859 > URL: https://issues.apache.org/jira/browse/NIFI-4859 > Project: Apache NiFi > Issue Type: Bug > Components: Flow Versioning >Affects Versions: 1.5.0 >Reporter: Daniel Chaffelson >Assignee: Kevin Doran >Priority: Minor > Labels: swagger > Fix For: 1.6.0 > > > It is possible to get a Versioned Process Group into a SYNC_FAILURE state, > but this is not an allowable state in the code generated from the swagger.json > This prevents versioned objects from being manipulated via the API in some > use cases. > {noformat} > @state.setter > def state(self, state): > """ > Sets the state of this VersionControlInformationDTO. > The current state of the Process Group, as it relates to the Versioned Flow > :param state: The state of this VersionControlInformationDTO. > :type: str > """ > allowed_values = ["LOCALLY_MODIFIED_DESCENDANT", "LOCALLY_MODIFIED", "STALE", > "LOCALLY_MODIFIED_AND_STALE", "UP_TO_DATE"] > if state not in allowed_values: > raise ValueError( > "Invalid value for `state` ({0}), must be one of {1}" > > .format(state, allowed_values) > ) > E ValueError: Invalid value for `state` (SYNC_FAILURE), must be one of > ['LOCALLY_MODIFIED_DESCENDANT', 'LOCALLY_MODIFIED', 'STALE', > 'LOCALLY_MODIFIED_AND_STALE', 'UP_TO_DATE'] > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi pull request #2525: NIFI-4949 Converted nifi-mongodb-services' unit tes...
GitHub user MikeThomsen opened a pull request: https://github.com/apache/nifi/pull/2525 NIFI-4949 Converted nifi-mongodb-services' unit tests into integratio⦠â¦n tests so that the @Ignore annotation doesn't have to be removed to make them run. Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/MikeThomsen/nifi NIFI-4949 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2525.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2525 commit b09410d15538288b662df3e9b7a21332a0462397 Author: Mike ThomsenDate: 2018-03-08T19:46:25Z NIFI-4949 Converted nifi-mongodb-services' unit tests into integration tests so that the @Ignore annotation doesn't have to be removed to make them run. ---
[jira] [Commented] (NIFI-4949) Convert MongoDB lookup service unit tests to integration tests (where appropriate)
[ https://issues.apache.org/jira/browse/NIFI-4949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391835#comment-16391835 ] ASF GitHub Bot commented on NIFI-4949: -- GitHub user MikeThomsen opened a pull request: https://github.com/apache/nifi/pull/2525 NIFI-4949 Converted nifi-mongodb-services' unit tests into integratio… …n tests so that the @Ignore annotation doesn't have to be removed to make them run. Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/MikeThomsen/nifi NIFI-4949 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2525.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2525 commit b09410d15538288b662df3e9b7a21332a0462397 Author: Mike ThomsenDate: 2018-03-08T19:46:25Z NIFI-4949 Converted nifi-mongodb-services' unit tests into integration tests so that the @Ignore annotation doesn't have to be removed to make them run. > Convert MongoDB lookup service unit tests to integration tests (where > appropriate) > -- > > Key: NIFI-4949 > URL: https://issues.apache.org/jira/browse/NIFI-4949 > Project: Apache NiFi > Issue Type: Improvement >Reporter: Mike Thomsen >Assignee: Mike Thomsen >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4859) Swagger Spec VersionControlInformationDTO missing SYNC_FAILURE state
[ https://issues.apache.org/jira/browse/NIFI-4859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391827#comment-16391827 ] ASF GitHub Bot commented on NIFI-4859: -- Github user mcgilman commented on the issue: https://github.com/apache/nifi/pull/2479 Thanks @kevdoran! This has been merged to master. > Swagger Spec VersionControlInformationDTO missing SYNC_FAILURE state > > > Key: NIFI-4859 > URL: https://issues.apache.org/jira/browse/NIFI-4859 > Project: Apache NiFi > Issue Type: Bug > Components: Flow Versioning >Affects Versions: 1.5.0 >Reporter: Daniel Chaffelson >Assignee: Kevin Doran >Priority: Minor > Labels: swagger > Fix For: 1.6.0 > > > It is possible to get a Versioned Process Group into a SYNC_FAILURE state, > but this is not an allowable state in the code generated from the swagger.json > This prevents versioned objects from being manipulated via the API in some > use cases. > {noformat} > @state.setter > def state(self, state): > """ > Sets the state of this VersionControlInformationDTO. > The current state of the Process Group, as it relates to the Versioned Flow > :param state: The state of this VersionControlInformationDTO. > :type: str > """ > allowed_values = ["LOCALLY_MODIFIED_DESCENDANT", "LOCALLY_MODIFIED", "STALE", > "LOCALLY_MODIFIED_AND_STALE", "UP_TO_DATE"] > if state not in allowed_values: > raise ValueError( > "Invalid value for `state` ({0}), must be one of {1}" > > .format(state, allowed_values) > ) > E ValueError: Invalid value for `state` (SYNC_FAILURE), must be one of > ['LOCALLY_MODIFIED_DESCENDANT', 'LOCALLY_MODIFIED', 'STALE', > 'LOCALLY_MODIFIED_AND_STALE', 'UP_TO_DATE'] > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi issue #2479: NIFI-4859 NIFI-4835 Swagger spec fixes
Github user kevdoran commented on the issue: https://github.com/apache/nifi/pull/2479 Thanks! ---
[GitHub] nifi pull request #2479: NIFI-4859 NIFI-4835 Swagger spec fixes
Github user kevdoran closed the pull request at: https://github.com/apache/nifi/pull/2479 ---
[jira] [Commented] (NIFI-4859) Swagger Spec VersionControlInformationDTO missing SYNC_FAILURE state
[ https://issues.apache.org/jira/browse/NIFI-4859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391829#comment-16391829 ] ASF GitHub Bot commented on NIFI-4859: -- Github user kevdoran commented on the issue: https://github.com/apache/nifi/pull/2479 Thanks! > Swagger Spec VersionControlInformationDTO missing SYNC_FAILURE state > > > Key: NIFI-4859 > URL: https://issues.apache.org/jira/browse/NIFI-4859 > Project: Apache NiFi > Issue Type: Bug > Components: Flow Versioning >Affects Versions: 1.5.0 >Reporter: Daniel Chaffelson >Assignee: Kevin Doran >Priority: Minor > Labels: swagger > Fix For: 1.6.0 > > > It is possible to get a Versioned Process Group into a SYNC_FAILURE state, > but this is not an allowable state in the code generated from the swagger.json > This prevents versioned objects from being manipulated via the API in some > use cases. > {noformat} > @state.setter > def state(self, state): > """ > Sets the state of this VersionControlInformationDTO. > The current state of the Process Group, as it relates to the Versioned Flow > :param state: The state of this VersionControlInformationDTO. > :type: str > """ > allowed_values = ["LOCALLY_MODIFIED_DESCENDANT", "LOCALLY_MODIFIED", "STALE", > "LOCALLY_MODIFIED_AND_STALE", "UP_TO_DATE"] > if state not in allowed_values: > raise ValueError( > "Invalid value for `state` ({0}), must be one of {1}" > > .format(state, allowed_values) > ) > E ValueError: Invalid value for `state` (SYNC_FAILURE), must be one of > ['LOCALLY_MODIFIED_DESCENDANT', 'LOCALLY_MODIFIED', 'STALE', > 'LOCALLY_MODIFIED_AND_STALE', 'UP_TO_DATE'] > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4859) Swagger Spec VersionControlInformationDTO missing SYNC_FAILURE state
[ https://issues.apache.org/jira/browse/NIFI-4859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391825#comment-16391825 ] ASF subversion and git services commented on NIFI-4859: --- Commit fdca7d0a01000dae4c32792c4ae8e974ac97e673 in nifi's branch refs/heads/master from [~kdoran] [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=fdca7d0 ] NIFI-4859 Corrects Swagger Spec VersionedFlowState allowableValues > Swagger Spec VersionControlInformationDTO missing SYNC_FAILURE state > > > Key: NIFI-4859 > URL: https://issues.apache.org/jira/browse/NIFI-4859 > Project: Apache NiFi > Issue Type: Bug > Components: Flow Versioning >Affects Versions: 1.5.0 >Reporter: Daniel Chaffelson >Assignee: Kevin Doran >Priority: Minor > Labels: swagger > Fix For: 1.6.0 > > > It is possible to get a Versioned Process Group into a SYNC_FAILURE state, > but this is not an allowable state in the code generated from the swagger.json > This prevents versioned objects from being manipulated via the API in some > use cases. > {noformat} > @state.setter > def state(self, state): > """ > Sets the state of this VersionControlInformationDTO. > The current state of the Process Group, as it relates to the Versioned Flow > :param state: The state of this VersionControlInformationDTO. > :type: str > """ > allowed_values = ["LOCALLY_MODIFIED_DESCENDANT", "LOCALLY_MODIFIED", "STALE", > "LOCALLY_MODIFIED_AND_STALE", "UP_TO_DATE"] > if state not in allowed_values: > raise ValueError( > "Invalid value for `state` ({0}), must be one of {1}" > > .format(state, allowed_values) > ) > E ValueError: Invalid value for `state` (SYNC_FAILURE), must be one of > ['LOCALLY_MODIFIED_DESCENDANT', 'LOCALLY_MODIFIED', 'STALE', > 'LOCALLY_MODIFIED_AND_STALE', 'UP_TO_DATE'] > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi issue #2479: NIFI-4859 NIFI-4835 Swagger spec fixes
Github user mcgilman commented on the issue: https://github.com/apache/nifi/pull/2479 Thanks @kevdoran! This has been merged to master. ---
[jira] [Commented] (NIFI-4835) Incorrect return type specified for registries/{registry-id}/buckets/{bucket-id}/flows
[ https://issues.apache.org/jira/browse/NIFI-4835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391826#comment-16391826 ] ASF subversion and git services commented on NIFI-4835: --- Commit 79fa4ad46fceaa8393d8cdc22d25c1d6ac8b47dc in nifi's branch refs/heads/master from [~kdoran] [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=79fa4ad ] NIFI-4835 Corrects Swagger spec response types in FlowResource > Incorrect return type specified for > registries/{registry-id}/buckets/{bucket-id}/flows > -- > > Key: NIFI-4835 > URL: https://issues.apache.org/jira/browse/NIFI-4835 > Project: Apache NiFi > Issue Type: Bug > Components: Flow Versioning >Affects Versions: 1.5.0 >Reporter: Charlie Meyer >Assignee: Kevin Doran >Priority: Major > Labels: swagger > Fix For: 1.6.0 > > > On > [https://github.com/apache/nifi/blob/b6117743d4c1c1a37a16ba746b9edbbdd276d69f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java#L1376] > {{response = BucketsEntity.class}} > should likely be > {{response = VersionedFlowsEntity.class}} > > same copy/paste error on line 1412 also for versions, although that should be > {{VersionedFlowSnapshotMetadataSetEntity.class}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (NIFI-4949) Convert MongoDB lookup service unit tests to integration tests (where appropriate)
Mike Thomsen created NIFI-4949: -- Summary: Convert MongoDB lookup service unit tests to integration tests (where appropriate) Key: NIFI-4949 URL: https://issues.apache.org/jira/browse/NIFI-4949 Project: Apache NiFi Issue Type: Improvement Reporter: Mike Thomsen Assignee: Mike Thomsen -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4945) In Nifi 1.5, START_TLS in combination with LDAP will allow any password during auth
[ https://issues.apache.org/jira/browse/NIFI-4945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391744#comment-16391744 ] ASF GitHub Bot commented on NIFI-4945: -- Github user alopresto commented on the issue: https://github.com/apache/nifi/pull/2524 Reviewing... > In Nifi 1.5, START_TLS in combination with LDAP will allow any password > during auth > --- > > Key: NIFI-4945 > URL: https://issues.apache.org/jira/browse/NIFI-4945 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.5.0 > Environment: alpine docker, openjdk 8, jumpcloud ldp service >Reporter: Matthew Elder >Priority: Major > > In Nifi 1.5, START_TLS in combination with LDAP will allow any password > during auth > > This has to do with the login portion of the ldap integration and not the > groups aspect. > > START_TLS accepts any password (huge security hole!) > LDAPS,SIMPLE will not allow any password > > strange! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4870) Upgrade ActiveMQ dependencies
[ https://issues.apache.org/jira/browse/NIFI-4870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391743#comment-16391743 ] ASF GitHub Bot commented on NIFI-4870: -- Github user alopresto commented on the issue: https://github.com/apache/nifi/pull/2469 Updated copyright years to 2017 (as referenced in activemq [NOTICE](https://github.com/apache/activemq/blob/activemq-5.15.3/NOTICE)). > Upgrade ActiveMQ dependencies > - > > Key: NIFI-4870 > URL: https://issues.apache.org/jira/browse/NIFI-4870 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Affects Versions: 1.5.0 >Reporter: Andy LoPresto >Assignee: Andy LoPresto >Priority: Major > Labels: activemq, dependencies, jms > > Upgrade the version of {{activemq-client}} to 5.14.x. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi issue #2469: NIFI-4870 Upgraded activemq-client and activemq-broker ver...
Github user alopresto commented on the issue: https://github.com/apache/nifi/pull/2469 Updated copyright years to 2017 (as referenced in activemq [NOTICE](https://github.com/apache/activemq/blob/activemq-5.15.3/NOTICE)). ---
[GitHub] nifi issue #2524: NIFI-4945: Upgrading spring security version
Github user alopresto commented on the issue: https://github.com/apache/nifi/pull/2524 Reviewing... ---
[jira] [Commented] (NIFI-4945) In Nifi 1.5, START_TLS in combination with LDAP will allow any password during auth
[ https://issues.apache.org/jira/browse/NIFI-4945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391732#comment-16391732 ] ASF GitHub Bot commented on NIFI-4945: -- GitHub user mcgilman opened a pull request: https://github.com/apache/nifi/pull/2524 NIFI-4945: Upgrading spring security version NIFI-4945: - Upgrading spring security version. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mcgilman/nifi NIFI-4945 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2524.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2524 commit 55fb548c28752d4c7fb3a7a3084a67b91ca60a07 Author: Matt GilmanDate: 2018-03-08T18:27:10Z NIFI-4945: - Upgrading spring security version. > In Nifi 1.5, START_TLS in combination with LDAP will allow any password > during auth > --- > > Key: NIFI-4945 > URL: https://issues.apache.org/jira/browse/NIFI-4945 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.5.0 > Environment: alpine docker, openjdk 8, jumpcloud ldp service >Reporter: Matthew Elder >Priority: Major > > In Nifi 1.5, START_TLS in combination with LDAP will allow any password > during auth > > This has to do with the login portion of the ldap integration and not the > groups aspect. > > START_TLS accepts any password (huge security hole!) > LDAPS,SIMPLE will not allow any password > > strange! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi pull request #2524: NIFI-4945: Upgrading spring security version
GitHub user mcgilman opened a pull request: https://github.com/apache/nifi/pull/2524 NIFI-4945: Upgrading spring security version NIFI-4945: - Upgrading spring security version. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mcgilman/nifi NIFI-4945 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2524.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2524 commit 55fb548c28752d4c7fb3a7a3084a67b91ca60a07 Author: Matt GilmanDate: 2018-03-08T18:27:10Z NIFI-4945: - Upgrading spring security version. ---
[jira] [Updated] (NIFI-4948) MongoDB Lookup Service throws an exception if there is no match
[ https://issues.apache.org/jira/browse/NIFI-4948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matt Burgess updated NIFI-4948: --- Resolution: Fixed Status: Resolved (was: Patch Available) > MongoDB Lookup Service throws an exception if there is no match > --- > > Key: NIFI-4948 > URL: https://issues.apache.org/jira/browse/NIFI-4948 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.5.0 >Reporter: Pierre Villard >Assignee: Pierre Villard >Priority: Major > Fix For: 1.6.0 > > > The LookupRecord processor states that in case of no match in the lookup > service, the FF will be routed either to success or to unmatched (depending > on the strategy). At the moment the MongoDB lookup service throws an > exception in case there is no match. It causes the FF to be routed to failure. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (NIFI-4948) MongoDB Lookup Service throws an exception if there is no match
[ https://issues.apache.org/jira/browse/NIFI-4948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matt Burgess updated NIFI-4948: --- Fix Version/s: 1.6.0 > MongoDB Lookup Service throws an exception if there is no match > --- > > Key: NIFI-4948 > URL: https://issues.apache.org/jira/browse/NIFI-4948 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.5.0 >Reporter: Pierre Villard >Assignee: Pierre Villard >Priority: Major > Fix For: 1.6.0 > > > The LookupRecord processor states that in case of no match in the lookup > service, the FF will be routed either to success or to unmatched (depending > on the strategy). At the moment the MongoDB lookup service throws an > exception in case there is no match. It causes the FF to be routed to failure. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4948) MongoDB Lookup Service throws an exception if there is no match
[ https://issues.apache.org/jira/browse/NIFI-4948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391691#comment-16391691 ] ASF GitHub Bot commented on NIFI-4948: -- Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/2522 > MongoDB Lookup Service throws an exception if there is no match > --- > > Key: NIFI-4948 > URL: https://issues.apache.org/jira/browse/NIFI-4948 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.5.0 >Reporter: Pierre Villard >Assignee: Pierre Villard >Priority: Major > Fix For: 1.6.0 > > > The LookupRecord processor states that in case of no match in the lookup > service, the FF will be routed either to success or to unmatched (depending > on the strategy). At the moment the MongoDB lookup service throws an > exception in case there is no match. It causes the FF to be routed to failure. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4948) MongoDB Lookup Service throws an exception if there is no match
[ https://issues.apache.org/jira/browse/NIFI-4948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391690#comment-16391690 ] ASF subversion and git services commented on NIFI-4948: --- Commit 1f8af1bde394e2ab4df4191fcb251555817e470a in nifi's branch refs/heads/master from [~pvillard] [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=1f8af1b ] NIFI-4948 - MongoDB Lookup Service throws an exception if there is no match Signed-off-by: Matthew BurgessThis closes #2522 > MongoDB Lookup Service throws an exception if there is no match > --- > > Key: NIFI-4948 > URL: https://issues.apache.org/jira/browse/NIFI-4948 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.5.0 >Reporter: Pierre Villard >Assignee: Pierre Villard >Priority: Major > Fix For: 1.6.0 > > > The LookupRecord processor states that in case of no match in the lookup > service, the FF will be routed either to success or to unmatched (depending > on the strategy). At the moment the MongoDB lookup service throws an > exception in case there is no match. It causes the FF to be routed to failure. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi pull request #2522: NIFI-4948 - MongoDB Lookup Service throws an except...
Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/2522 ---
[jira] [Updated] (NIFI-4929) Convert existing MongoDB unit tests to integration tests
[ https://issues.apache.org/jira/browse/NIFI-4929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pierre Villard updated NIFI-4929: - Component/s: Tools and Build Extensions > Convert existing MongoDB unit tests to integration tests > > > Key: NIFI-4929 > URL: https://issues.apache.org/jira/browse/NIFI-4929 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions, Tools and Build >Reporter: Mike Thomsen >Assignee: Mike Thomsen >Priority: Minor > Fix For: 1.6.0 > > > Most of the existing MongoDB unit tests are actually integration tests that > require a live installation of MongoDB to run. They're marked with @Ignore > which is a very suboptimal solution for testing and makes it easy for a > reviewer to run the tests and think everything passed if they're not looking > for that. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (NIFI-4929) Convert existing MongoDB unit tests to integration tests
[ https://issues.apache.org/jira/browse/NIFI-4929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pierre Villard resolved NIFI-4929. -- Resolution: Fixed Fix Version/s: 1.6.0 > Convert existing MongoDB unit tests to integration tests > > > Key: NIFI-4929 > URL: https://issues.apache.org/jira/browse/NIFI-4929 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions, Tools and Build >Reporter: Mike Thomsen >Assignee: Mike Thomsen >Priority: Minor > Fix For: 1.6.0 > > > Most of the existing MongoDB unit tests are actually integration tests that > require a live installation of MongoDB to run. They're marked with @Ignore > which is a very suboptimal solution for testing and makes it easy for a > reviewer to run the tests and think everything passed if they're not looking > for that. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4948) MongoDB Lookup Service throws an exception if there is no match
[ https://issues.apache.org/jira/browse/NIFI-4948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391683#comment-16391683 ] ASF GitHub Bot commented on NIFI-4948: -- Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/2522 +1 LGTM, thanks to all for the reviews, and to Pierre for the improvement! Merging to master > MongoDB Lookup Service throws an exception if there is no match > --- > > Key: NIFI-4948 > URL: https://issues.apache.org/jira/browse/NIFI-4948 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.5.0 >Reporter: Pierre Villard >Assignee: Pierre Villard >Priority: Major > > The LookupRecord processor states that in case of no match in the lookup > service, the FF will be routed either to success or to unmatched (depending > on the strategy). At the moment the MongoDB lookup service throws an > exception in case there is no match. It causes the FF to be routed to failure. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4929) Convert existing MongoDB unit tests to integration tests
[ https://issues.apache.org/jira/browse/NIFI-4929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391682#comment-16391682 ] ASF GitHub Bot commented on NIFI-4929: -- Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/2508 > Convert existing MongoDB unit tests to integration tests > > > Key: NIFI-4929 > URL: https://issues.apache.org/jira/browse/NIFI-4929 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions, Tools and Build >Reporter: Mike Thomsen >Assignee: Mike Thomsen >Priority: Minor > Fix For: 1.6.0 > > > Most of the existing MongoDB unit tests are actually integration tests that > require a live installation of MongoDB to run. They're marked with @Ignore > which is a very suboptimal solution for testing and makes it easy for a > reviewer to run the tests and think everything passed if they're not looking > for that. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi issue #2522: NIFI-4948 - MongoDB Lookup Service throws an exception if ...
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/2522 +1 LGTM, thanks to all for the reviews, and to Pierre for the improvement! Merging to master ---
[GitHub] nifi pull request #2508: NIFI-4929 Converted the majority of MongoDB unit te...
Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/2508 ---
[jira] [Commented] (NIFI-4929) Convert existing MongoDB unit tests to integration tests
[ https://issues.apache.org/jira/browse/NIFI-4929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391679#comment-16391679 ] ASF subversion and git services commented on NIFI-4929: --- Commit dae2b73d94f51230ff3193fe7baae657ff10ce82 in nifi's branch refs/heads/master from [~mike.thomsen] [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=dae2b73 ] NIFI-4929 Converted the majority of MongoDB unit tests to integration tests so they can be reliably run with 'mvn -Pintegration-tests integration-test' Signed-off-by: Pierre VillardThis closes #2508. > Convert existing MongoDB unit tests to integration tests > > > Key: NIFI-4929 > URL: https://issues.apache.org/jira/browse/NIFI-4929 > Project: Apache NiFi > Issue Type: Improvement >Reporter: Mike Thomsen >Assignee: Mike Thomsen >Priority: Minor > > Most of the existing MongoDB unit tests are actually integration tests that > require a live installation of MongoDB to run. They're marked with @Ignore > which is a very suboptimal solution for testing and makes it easy for a > reviewer to run the tests and think everything passed if they're not looking > for that. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4929) Convert existing MongoDB unit tests to integration tests
[ https://issues.apache.org/jira/browse/NIFI-4929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391678#comment-16391678 ] ASF GitHub Bot commented on NIFI-4929: -- Github user pvillard31 commented on the issue: https://github.com/apache/nifi/pull/2508 +1, merging to master, thanks @MikeThomsen > Convert existing MongoDB unit tests to integration tests > > > Key: NIFI-4929 > URL: https://issues.apache.org/jira/browse/NIFI-4929 > Project: Apache NiFi > Issue Type: Improvement >Reporter: Mike Thomsen >Assignee: Mike Thomsen >Priority: Minor > > Most of the existing MongoDB unit tests are actually integration tests that > require a live installation of MongoDB to run. They're marked with @Ignore > which is a very suboptimal solution for testing and makes it easy for a > reviewer to run the tests and think everything passed if they're not looking > for that. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi issue #2508: NIFI-4929 Converted the majority of MongoDB unit tests to ...
Github user pvillard31 commented on the issue: https://github.com/apache/nifi/pull/2508 +1, merging to master, thanks @MikeThomsen ---
[jira] [Commented] (NIFI-4838) Make GetMongo support multiple commits and give some progress indication
[ https://issues.apache.org/jira/browse/NIFI-4838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391659#comment-16391659 ] ASF GitHub Bot commented on NIFI-4838: -- Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2448 @pvillard31 If you have time, could you take a look at this? > Make GetMongo support multiple commits and give some progress indication > > > Key: NIFI-4838 > URL: https://issues.apache.org/jira/browse/NIFI-4838 > Project: Apache NiFi > Issue Type: Improvement >Reporter: Mike Thomsen >Assignee: Mike Thomsen >Priority: Major > > It shouldn't wait until the end to do a commit() call because the effect is > that GetMongo looks like it has hung to a user who is pulling a very large > data set. > It should also have an option for running a count query to get the current > approximate count of documents that would match the query and append an > attribute that indicates where a flowfile stands in the total result count. > Ex: > query.progress.point.start = 2500 > query.progress.point.end = 5000 > query.count.estimate = 17,568,231 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi issue #2448: NIFI-4838 Added configurable progressive commits to GetMon...
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2448 @pvillard31 If you have time, could you take a look at this? ---
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391545#comment-16391545 ] ASF GitHub Bot commented on NIFI-4946: -- Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173211644 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -129,7 +185,13 @@ public void init(final ProcessorInitializationContext context) { List properties = new ArrayList<>(); properties.add(LIVY_CONTROLLER_SERVICE); +properties.add(IS_BATCH_JOB); +properties.add(PY_FILES); +//properties.add(JAR_FILES); +properties.add(MAIN_PY_FILE); +properties.add(NAME); properties.add(CODE); +//properties.add(ARGS); --- End diff -- Comments to be removed? > nifi-spark-bundle : Adding support for pyfiles, file, jars options > -- > > Key: NIFI-4946 > URL: https://issues.apache.org/jira/browse/NIFI-4946 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 > Environment: Ubuntu 16.04, IntelliJ >Reporter: Mageswaran >Priority: Major > Fix For: 1.6.0 > > Attachments: nifi-spark-options.png, nifi-spark.png > > > Adding support for submitting PySpark based Sparks jobs (which is normally > structured as modules) over Livy on existing "ExecuteSparkInteractive" > processor. > This is done by reading file paths for pyfiles and file and an option from > user whether the processor should trigger a batch job or not. > [https://livy.incubator.apache.org/docs/latest/rest-api.html] > *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 > )|https://github.com/apache/nifi/pull/2521]* > * Check whether the processor has to handle code or submit a Spark job > * Read incoming flow file > ** If batch == true > *** If flow file matches Livy `batches` JSON response through `wait` loop > Wait for Status Check Interval > Read the state > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > *** Else > Ignore the flow file > Trigger the Spark job over Livy `batches` endpoint > Read the state of the submitted job > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > ** Else: > *** Existing Logic to handle `Code` > > !nifi-spark-options.png! > !nifi-spark.png! > > Thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391551#comment-16391551 ] ASF GitHub Bot commented on NIFI-4946: -- Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173223531 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); -String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); -if (StringUtils.isEmpty(code)) { -try (InputStream inputStream = session.read(flowFile)) { -// If no code was provided, assume it is in the content of the incoming flow file -code = IOUtils.toString(inputStream, charset); -} catch (IOException ioe) { -log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); -flowFile = session.penalize(flowFile); -session.transfer(flowFile, REL_FAILURE); -return; -} -} -code = StringEscapeUtils.escapeJavaScript(code); -String payload = "{\"code\":\"" + code + "\"}"; + try { -final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); -log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); -if (result == null) { -session.transfer(flowFile, REL_FAILURE); -} else { + +if (isBatchJob) { + +String jsonResponse = null; + +if (StringUtils.isEmpty(jsonResponse)) { +try (InputStream inputStream = session.read(flowFile)) { +// If no code was provided, assume it is in the content of the incoming flow file +jsonResponse = IOUtils.toString(inputStream, charset); +} catch (IOException ioe) { +log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); +flowFile = session.penalize(flowFile); +session.transfer(flowFile, REL_FAILURE); +return; +} +} + +log.debug(" > jsonResponse: " + jsonResponse); + try { -final JSONObject output = result.getJSONObject("data"); -flowFile = session.write(flowFile, out -> out.write(output.toString().getBytes())); -flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -session.transfer(flowFile, REL_SUCCESS); -} catch (JSONException je) { -// The result doesn't contain the data, just send the output object as the flow file content to failure (after penalizing) -log.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)"); -flowFile = session.write(flowFile, out -> out.write(result.toString().getBytes())); + +final JSONObject jsonResponseObj = new JSONObject(jsonResponse); + +Mapheaders = new HashMap<>(); +headers.put("Content-Type", LivySessionService.APPLICATION_JSON); +headers.put("X-Requested-By", LivySessionService.USER); +headers.put("Accept", "application/json"); + +JSONObject jobInfo = readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, headers); + +flowFile = session.write(flowFile, out -> out.write(jobInfo.toString().getBytes())); flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -flowFile = session.penalize(flowFile); + +Thread.sleep(statusCheckInterval); + +String state = jobInfo.getString("state"); +log.debug(" > jsonResponseObj State: " + state); + +
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391552#comment-16391552 ] ASF GitHub Bot commented on NIFI-4946: -- Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173226045 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); -String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); -if (StringUtils.isEmpty(code)) { -try (InputStream inputStream = session.read(flowFile)) { -// If no code was provided, assume it is in the content of the incoming flow file -code = IOUtils.toString(inputStream, charset); -} catch (IOException ioe) { -log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); -flowFile = session.penalize(flowFile); -session.transfer(flowFile, REL_FAILURE); -return; -} -} -code = StringEscapeUtils.escapeJavaScript(code); -String payload = "{\"code\":\"" + code + "\"}"; + try { -final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); -log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); -if (result == null) { -session.transfer(flowFile, REL_FAILURE); -} else { + +if (isBatchJob) { + +String jsonResponse = null; + +if (StringUtils.isEmpty(jsonResponse)) { +try (InputStream inputStream = session.read(flowFile)) { +// If no code was provided, assume it is in the content of the incoming flow file +jsonResponse = IOUtils.toString(inputStream, charset); +} catch (IOException ioe) { +log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); +flowFile = session.penalize(flowFile); +session.transfer(flowFile, REL_FAILURE); +return; +} +} + +log.debug(" > jsonResponse: " + jsonResponse); + try { -final JSONObject output = result.getJSONObject("data"); -flowFile = session.write(flowFile, out -> out.write(output.toString().getBytes())); -flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -session.transfer(flowFile, REL_SUCCESS); -} catch (JSONException je) { -// The result doesn't contain the data, just send the output object as the flow file content to failure (after penalizing) -log.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)"); -flowFile = session.write(flowFile, out -> out.write(result.toString().getBytes())); + +final JSONObject jsonResponseObj = new JSONObject(jsonResponse); + +Mapheaders = new HashMap<>(); +headers.put("Content-Type", LivySessionService.APPLICATION_JSON); +headers.put("X-Requested-By", LivySessionService.USER); +headers.put("Accept", "application/json"); + +JSONObject jobInfo = readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, headers); + +flowFile = session.write(flowFile, out -> out.write(jobInfo.toString().getBytes())); flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -flowFile = session.penalize(flowFile); + +Thread.sleep(statusCheckInterval); + +String state = jobInfo.getString("state"); +log.debug(" > jsonResponseObj State: " + state); + +
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391553#comment-16391553 ] ASF GitHub Bot commented on NIFI-4946: -- Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173216684 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -83,6 +83,62 @@ .expressionLanguageSupported(true) .build(); +public static final PropertyDescriptor IS_BATCH_JOB = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-is_batch_job") +.displayName("Is Batch Job") +.description("If true, the `Code` part is ignored and the flow file from previous stage is considered " ++ "as a triggering event and not as code for Spark session. When `Wait` state is self routed" ++ "the livy json response flow file from previous Spark job is used to poll the job status" ++ "for sucess or failure") +.required(true) +.allowableValues("true", "false") +.defaultValue("false") +.build(); + +public static final PropertyDescriptor PY_FILES = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-pyfiles") +.displayName("pyFiles") +.description("Python files to be used in this batch session that includes *.py, *.zip files") +.required(false) +.addValidator(StandardValidators.createURLorFileValidator()) +.expressionLanguageSupported(false) +.build(); + +public static final PropertyDescriptor JAR_FILES = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-jarfiles") +.displayName("jars") +.description("jars to be used in this batch session") +.required(false) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.expressionLanguageSupported(false) +.build(); + +public static final PropertyDescriptor NAME = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-name") +.displayName("name") +.description("The name of this session") +.required(false) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.expressionLanguageSupported(false) +.build(); + +public static final PropertyDescriptor ARGS = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-args") +.displayName("args") +.description("The name of this session") +.required(false) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.expressionLanguageSupported(false) +.build(); + +public static final PropertyDescriptor MAIN_PY_FILE = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-main-py-file") +.displayName("file") --- End diff -- Same as the JARs case. Most of the `PropertyDescriptor` use all lowercase characters for `displayName`. Please change it. > nifi-spark-bundle : Adding support for pyfiles, file, jars options > -- > > Key: NIFI-4946 > URL: https://issues.apache.org/jira/browse/NIFI-4946 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 > Environment: Ubuntu 16.04, IntelliJ >Reporter: Mageswaran >Priority: Major > Fix For: 1.6.0 > > Attachments: nifi-spark-options.png, nifi-spark.png > > > Adding support for submitting PySpark based Sparks jobs (which is normally > structured as modules) over Livy on existing "ExecuteSparkInteractive" > processor. > This is done by reading file paths for pyfiles and file and an option from > user whether the processor should trigger a batch job or not. > [https://livy.incubator.apache.org/docs/latest/rest-api.html] > *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 > )|https://github.com/apache/nifi/pull/2521]* > * Check whether the processor has to handle code or submit a Spark job > * Read incoming flow file > ** If batch == true > *** If flow file matches Livy `batches` JSON response through `wait` loop > Wait for Status Check Interval > Read the state > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > ***
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391549#comment-16391549 ] ASF GitHub Bot commented on NIFI-4946: -- Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173213146 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); -String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); -if (StringUtils.isEmpty(code)) { -try (InputStream inputStream = session.read(flowFile)) { -// If no code was provided, assume it is in the content of the incoming flow file -code = IOUtils.toString(inputStream, charset); -} catch (IOException ioe) { -log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); -flowFile = session.penalize(flowFile); -session.transfer(flowFile, REL_FAILURE); -return; -} -} -code = StringEscapeUtils.escapeJavaScript(code); -String payload = "{\"code\":\"" + code + "\"}"; + try { -final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); -log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); -if (result == null) { -session.transfer(flowFile, REL_FAILURE); -} else { + +if (isBatchJob) { + +String jsonResponse = null; + +if (StringUtils.isEmpty(jsonResponse)) { +try (InputStream inputStream = session.read(flowFile)) { +// If no code was provided, assume it is in the content of the incoming flow file +jsonResponse = IOUtils.toString(inputStream, charset); +} catch (IOException ioe) { +log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); +flowFile = session.penalize(flowFile); +session.transfer(flowFile, REL_FAILURE); +return; +} +} + +log.debug(" > jsonResponse: " + jsonResponse); --- End diff -- Cosmetic change: It would be great if this log.debug message can be changed to something of a proper standard, like "JSON Response : " i.e. Remove > > nifi-spark-bundle : Adding support for pyfiles, file, jars options > -- > > Key: NIFI-4946 > URL: https://issues.apache.org/jira/browse/NIFI-4946 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 > Environment: Ubuntu 16.04, IntelliJ >Reporter: Mageswaran >Priority: Major > Fix For: 1.6.0 > > Attachments: nifi-spark-options.png, nifi-spark.png > > > Adding support for submitting PySpark based Sparks jobs (which is normally > structured as modules) over Livy on existing "ExecuteSparkInteractive" > processor. > This is done by reading file paths for pyfiles and file and an option from > user whether the processor should trigger a batch job or not. > [https://livy.incubator.apache.org/docs/latest/rest-api.html] > *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 > )|https://github.com/apache/nifi/pull/2521]* > * Check whether the processor has to handle code or submit a Spark job > * Read incoming flow file > ** If batch == true > *** If flow file matches Livy `batches` JSON response through `wait` loop > Wait for Status Check Interval > Read the state > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > *** Else > Ignore the flow file > Trigger the Spark job over Livy `batches` endpoint > Read the state of the submitted job > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > ** Else: > *** Existing Logic to handle `Code` > > !nifi-spark-options.png! > !nifi-spark.png! > > Thanks. --
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391550#comment-16391550 ] ASF GitHub Bot commented on NIFI-4946: -- Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173215603 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -83,6 +83,62 @@ .expressionLanguageSupported(true) .build(); +public static final PropertyDescriptor IS_BATCH_JOB = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-is_batch_job") +.displayName("Is Batch Job") +.description("If true, the `Code` part is ignored and the flow file from previous stage is considered " ++ "as a triggering event and not as code for Spark session. When `Wait` state is self routed" ++ "the livy json response flow file from previous Spark job is used to poll the job status" ++ "for sucess or failure") +.required(true) +.allowableValues("true", "false") +.defaultValue("false") +.build(); + +public static final PropertyDescriptor PY_FILES = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-pyfiles") +.displayName("pyFiles") +.description("Python files to be used in this batch session that includes *.py, *.zip files") +.required(false) +.addValidator(StandardValidators.createURLorFileValidator()) +.expressionLanguageSupported(false) +.build(); + +public static final PropertyDescriptor JAR_FILES = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-jarfiles") +.displayName("jars") --- End diff -- `displayName` is what will be rendered on the UI. So lets change it to JARs or Application JARs? > nifi-spark-bundle : Adding support for pyfiles, file, jars options > -- > > Key: NIFI-4946 > URL: https://issues.apache.org/jira/browse/NIFI-4946 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 > Environment: Ubuntu 16.04, IntelliJ >Reporter: Mageswaran >Priority: Major > Fix For: 1.6.0 > > Attachments: nifi-spark-options.png, nifi-spark.png > > > Adding support for submitting PySpark based Sparks jobs (which is normally > structured as modules) over Livy on existing "ExecuteSparkInteractive" > processor. > This is done by reading file paths for pyfiles and file and an option from > user whether the processor should trigger a batch job or not. > [https://livy.incubator.apache.org/docs/latest/rest-api.html] > *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 > )|https://github.com/apache/nifi/pull/2521]* > * Check whether the processor has to handle code or submit a Spark job > * Read incoming flow file > ** If batch == true > *** If flow file matches Livy `batches` JSON response through `wait` loop > Wait for Status Check Interval > Read the state > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > *** Else > Ignore the flow file > Trigger the Spark job over Livy `batches` endpoint > Read the state of the submitted job > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > ** Else: > *** Existing Logic to handle `Code` > > !nifi-spark-options.png! > !nifi-spark.png! > > Thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391546#comment-16391546 ] ASF GitHub Bot commented on NIFI-4946: -- Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173216966 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); -String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); -if (StringUtils.isEmpty(code)) { -try (InputStream inputStream = session.read(flowFile)) { -// If no code was provided, assume it is in the content of the incoming flow file -code = IOUtils.toString(inputStream, charset); -} catch (IOException ioe) { -log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); -flowFile = session.penalize(flowFile); -session.transfer(flowFile, REL_FAILURE); -return; -} -} -code = StringEscapeUtils.escapeJavaScript(code); -String payload = "{\"code\":\"" + code + "\"}"; + try { -final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); -log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); -if (result == null) { -session.transfer(flowFile, REL_FAILURE); -} else { + +if (isBatchJob) { + +String jsonResponse = null; + +if (StringUtils.isEmpty(jsonResponse)) { --- End diff -- This will be true all the time, right? > nifi-spark-bundle : Adding support for pyfiles, file, jars options > -- > > Key: NIFI-4946 > URL: https://issues.apache.org/jira/browse/NIFI-4946 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 > Environment: Ubuntu 16.04, IntelliJ >Reporter: Mageswaran >Priority: Major > Fix For: 1.6.0 > > Attachments: nifi-spark-options.png, nifi-spark.png > > > Adding support for submitting PySpark based Sparks jobs (which is normally > structured as modules) over Livy on existing "ExecuteSparkInteractive" > processor. > This is done by reading file paths for pyfiles and file and an option from > user whether the processor should trigger a batch job or not. > [https://livy.incubator.apache.org/docs/latest/rest-api.html] > *Current Work flow Logic ( [https://github.com/apache/nifi/pull/2521 > )|https://github.com/apache/nifi/pull/2521]* > * Check whether the processor has to handle code or submit a Spark job > * Read incoming flow file > ** If batch == true > *** If flow file matches Livy `batches` JSON response through `wait` loop > Wait for Status Check Interval > Read the state > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > *** Else > Ignore the flow file > Trigger the Spark job over Livy `batches` endpoint > Read the state of the submitted job > If state is `running` route it to `wait` or if it is `success` or > `dead` route it accordingly > ** Else: > *** Existing Logic to handle `Code` > > !nifi-spark-options.png! > !nifi-spark.png! > > Thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4946) nifi-spark-bundle : Adding support for pyfiles, file, jars options
[ https://issues.apache.org/jira/browse/NIFI-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16391547#comment-16391547 ] ASF GitHub Bot commented on NIFI-4946: -- Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173213566 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); -String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); -if (StringUtils.isEmpty(code)) { -try (InputStream inputStream = session.read(flowFile)) { -// If no code was provided, assume it is in the content of the incoming flow file -code = IOUtils.toString(inputStream, charset); -} catch (IOException ioe) { -log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); -flowFile = session.penalize(flowFile); -session.transfer(flowFile, REL_FAILURE); -return; -} -} -code = StringEscapeUtils.escapeJavaScript(code); -String payload = "{\"code\":\"" + code + "\"}"; + try { -final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); -log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); -if (result == null) { -session.transfer(flowFile, REL_FAILURE); -} else { + +if (isBatchJob) { + +String jsonResponse = null; + +if (StringUtils.isEmpty(jsonResponse)) { +try (InputStream inputStream = session.read(flowFile)) { +// If no code was provided, assume it is in the content of the incoming flow file +jsonResponse = IOUtils.toString(inputStream, charset); +} catch (IOException ioe) { +log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); +flowFile = session.penalize(flowFile); +session.transfer(flowFile, REL_FAILURE); +return; +} +} + +log.debug(" > jsonResponse: " + jsonResponse); + try { -final JSONObject output = result.getJSONObject("data"); -flowFile = session.write(flowFile, out -> out.write(output.toString().getBytes())); -flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -session.transfer(flowFile, REL_SUCCESS); -} catch (JSONException je) { -// The result doesn't contain the data, just send the output object as the flow file content to failure (after penalizing) -log.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)"); -flowFile = session.write(flowFile, out -> out.write(result.toString().getBytes())); + +final JSONObject jsonResponseObj = new JSONObject(jsonResponse); + +Mapheaders = new HashMap<>(); +headers.put("Content-Type", LivySessionService.APPLICATION_JSON); +headers.put("X-Requested-By", LivySessionService.USER); +headers.put("Accept", "application/json"); + +JSONObject jobInfo = readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, headers); + +flowFile = session.write(flowFile, out -> out.write(jobInfo.toString().getBytes())); flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -flowFile = session.penalize(flowFile); + +Thread.sleep(statusCheckInterval); + +String state = jobInfo.getString("state"); +log.debug(" > jsonResponseObj State: " + state); + +
[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...
Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173216966 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); -String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); -if (StringUtils.isEmpty(code)) { -try (InputStream inputStream = session.read(flowFile)) { -// If no code was provided, assume it is in the content of the incoming flow file -code = IOUtils.toString(inputStream, charset); -} catch (IOException ioe) { -log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); -flowFile = session.penalize(flowFile); -session.transfer(flowFile, REL_FAILURE); -return; -} -} -code = StringEscapeUtils.escapeJavaScript(code); -String payload = "{\"code\":\"" + code + "\"}"; + try { -final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); -log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); -if (result == null) { -session.transfer(flowFile, REL_FAILURE); -} else { + +if (isBatchJob) { + +String jsonResponse = null; + +if (StringUtils.isEmpty(jsonResponse)) { --- End diff -- This will be true all the time, right? ---
[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...
Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173213566 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); -String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); -if (StringUtils.isEmpty(code)) { -try (InputStream inputStream = session.read(flowFile)) { -// If no code was provided, assume it is in the content of the incoming flow file -code = IOUtils.toString(inputStream, charset); -} catch (IOException ioe) { -log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); -flowFile = session.penalize(flowFile); -session.transfer(flowFile, REL_FAILURE); -return; -} -} -code = StringEscapeUtils.escapeJavaScript(code); -String payload = "{\"code\":\"" + code + "\"}"; + try { -final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); -log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); -if (result == null) { -session.transfer(flowFile, REL_FAILURE); -} else { + +if (isBatchJob) { + +String jsonResponse = null; + +if (StringUtils.isEmpty(jsonResponse)) { +try (InputStream inputStream = session.read(flowFile)) { +// If no code was provided, assume it is in the content of the incoming flow file +jsonResponse = IOUtils.toString(inputStream, charset); +} catch (IOException ioe) { +log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); +flowFile = session.penalize(flowFile); +session.transfer(flowFile, REL_FAILURE); +return; +} +} + +log.debug(" > jsonResponse: " + jsonResponse); + try { -final JSONObject output = result.getJSONObject("data"); -flowFile = session.write(flowFile, out -> out.write(output.toString().getBytes())); -flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -session.transfer(flowFile, REL_SUCCESS); -} catch (JSONException je) { -// The result doesn't contain the data, just send the output object as the flow file content to failure (after penalizing) -log.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)"); -flowFile = session.write(flowFile, out -> out.write(result.toString().getBytes())); + +final JSONObject jsonResponseObj = new JSONObject(jsonResponse); + +Mapheaders = new HashMap<>(); +headers.put("Content-Type", LivySessionService.APPLICATION_JSON); +headers.put("X-Requested-By", LivySessionService.USER); +headers.put("Accept", "application/json"); + +JSONObject jobInfo = readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, headers); + +flowFile = session.write(flowFile, out -> out.write(jobInfo.toString().getBytes())); flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -flowFile = session.penalize(flowFile); + +Thread.sleep(statusCheckInterval); + +String state = jobInfo.getString("state"); +log.debug(" > jsonResponseObj State: " + state); + +switch (state) { +case "success": +log.debug(" > success State: " + state); +session.transfer(flowFile, REL_SUCCESS); +
[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...
Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r173216684 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -83,6 +83,62 @@ .expressionLanguageSupported(true) .build(); +public static final PropertyDescriptor IS_BATCH_JOB = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-is_batch_job") +.displayName("Is Batch Job") +.description("If true, the `Code` part is ignored and the flow file from previous stage is considered " ++ "as a triggering event and not as code for Spark session. When `Wait` state is self routed" ++ "the livy json response flow file from previous Spark job is used to poll the job status" ++ "for sucess or failure") +.required(true) +.allowableValues("true", "false") +.defaultValue("false") +.build(); + +public static final PropertyDescriptor PY_FILES = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-pyfiles") +.displayName("pyFiles") +.description("Python files to be used in this batch session that includes *.py, *.zip files") +.required(false) +.addValidator(StandardValidators.createURLorFileValidator()) +.expressionLanguageSupported(false) +.build(); + +public static final PropertyDescriptor JAR_FILES = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-jarfiles") +.displayName("jars") +.description("jars to be used in this batch session") +.required(false) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.expressionLanguageSupported(false) +.build(); + +public static final PropertyDescriptor NAME = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-name") +.displayName("name") +.description("The name of this session") +.required(false) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.expressionLanguageSupported(false) +.build(); + +public static final PropertyDescriptor ARGS = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-args") +.displayName("args") +.description("The name of this session") +.required(false) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.expressionLanguageSupported(false) +.build(); + +public static final PropertyDescriptor MAIN_PY_FILE = new PropertyDescriptor.Builder() +.name("exec-spark-iactive-main-py-file") +.displayName("file") --- End diff -- Same as the JARs case. Most of the `PropertyDescriptor` use all lowercase characters for `displayName`. Please change it. ---